Source code for hbp_nrp_excontrol.StateMachineManager

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module contains a manager class for state machine
"""
# pylint: disable=redefined-builtin
from builtins import object

__author__ = 'Georg Hinkel'

import rospy
from cle_ros_msgs.msg import CLEError

from hbp_nrp_excontrol.StateMachineInstance import StateMachineInstance
from hbp_nrp_excontrol.__internals import MAX_STOP_TIMEOUT, ERROR_PUB


[docs]class StateMachineManager(object): """ This class represents a managing class for state machines """ def __init__(self): """ Creates a new state machine """ self.__machines = [] self.__error_publisher = rospy.Publisher( ERROR_PUB, CLEError, queue_size=10 ) @property def state_machines(self): """ Gets the state machines contained in this manager """ return self.__machines
[docs] def create_state_machine(self, name, sim_id, sim_dir): """ Create new StateMachineInstance with the given name and simulation id :param name: The state machine name :param sim_id: The simulation id under which the state machines should be run :param sim_dir: Absolute path to the folder where the simulation is running state machine sources :return: A new state machine """ sm = StateMachineInstance(name, sim_id, sim_dir, cle_error_pub=self.__error_publisher) self.__machines.append(sm) return sm
[docs] def get_state_machine(self, name): """ Gets the state machine with the given name or none :param name: The state machine name :return: The state machine with the given name """ for sm in self.state_machines: if sm.sm_id == name: return sm return None
[docs] def add_all(self, paths, sim_id, sim_dir): """ Adds all given paths to the state machines managed by this instance :param paths: A dictionary of unique state machine identifiers and paths to the :param sim_id: The simulation id under which the state machines should be run :param sim_dir: Absolute path to the folder where the simulation is running state machine sources """ for sm in self.__machines: if sm.sm_id in paths: raise Exception("The name {0} is already used by a state machine.".format(sm.sm_id)) for sm_id in paths: self.__machines.append(StateMachineInstance(sm_id, sim_id, sim_dir, cle_error_pub=self.__error_publisher, source_path=paths[sm_id]))
[docs] def terminate_all(self, timeout=MAX_STOP_TIMEOUT): """ Terminates all state machines """ for sm in self.__machines: if sm.is_running: try: sm.request_termination() finally: sm.wait_termination(timeout)
[docs] def shutdown(self): """ Shutdown the manager, terminating all state machines """ self.__error_publisher.unregister() if self.__machines: self.terminate_all()
[docs] def restart_all(self): """ Restarts all state machines """ for sm in self.__machines: sm.restart()
[docs] def start_all(self, fail_if_started=True): """ Starts all state machines :param fail_if_started: If True, the object raises an exception when any state machine was already started """ for sm in self.__machines: if not sm.is_running: sm.initialize_sm() sm.start_execution(fail_if_started)
[docs] def initialize_all(self): """ Initializes all state machines """ for sm in self.__machines: sm.initialize_sm()