# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module contains a manager class for state machine
"""
# pylint: disable=redefined-builtin
from builtins import object
__author__ = 'Georg Hinkel'
import rospy
from cle_ros_msgs.msg import CLEError
from hbp_nrp_excontrol.StateMachineInstance import StateMachineInstance
from hbp_nrp_excontrol.__internals import MAX_STOP_TIMEOUT, ERROR_PUB
[docs]class StateMachineManager(object):
"""
This class represents a managing class for state machines
"""
def __init__(self):
"""
Creates a new state machine
"""
self.__machines = []
self.__error_publisher = rospy.Publisher(
ERROR_PUB, CLEError, queue_size=10
)
@property
def state_machines(self):
"""
Gets the state machines contained in this manager
"""
return self.__machines
[docs] def create_state_machine(self, name, sim_id, sim_dir):
"""
Create new StateMachineInstance with the given name and simulation id
:param name: The state machine name
:param sim_id: The simulation id under which the state machines should be run
:param sim_dir: Absolute path to the folder where the simulation is running
state machine sources
:return: A new state machine
"""
sm = StateMachineInstance(name, sim_id, sim_dir, cle_error_pub=self.__error_publisher)
self.__machines.append(sm)
return sm
[docs] def get_state_machine(self, name):
"""
Gets the state machine with the given name or none
:param name: The state machine name
:return: The state machine with the given name
"""
for sm in self.state_machines:
if sm.sm_id == name:
return sm
return None
[docs] def add_all(self, paths, sim_id, sim_dir):
"""
Adds all given paths to the state machines managed by this instance
:param paths: A dictionary of unique state machine identifiers and paths to the
:param sim_id: The simulation id under which the state machines should be run
:param sim_dir: Absolute path to the folder where the simulation is running
state machine sources
"""
for sm in self.__machines:
if sm.sm_id in paths:
raise Exception("The name {0} is already used by a state machine.".format(sm.sm_id))
for sm_id in paths:
self.__machines.append(StateMachineInstance(sm_id, sim_id, sim_dir,
cle_error_pub=self.__error_publisher,
source_path=paths[sm_id]))
[docs] def terminate_all(self, timeout=MAX_STOP_TIMEOUT):
"""
Terminates all state machines
"""
for sm in self.__machines:
if sm.is_running:
try:
sm.request_termination()
finally:
sm.wait_termination(timeout)
[docs] def shutdown(self):
"""
Shutdown the manager, terminating all state machines
"""
self.__error_publisher.unregister()
if self.__machines:
self.terminate_all()
[docs] def restart_all(self):
"""
Restarts all state machines
"""
for sm in self.__machines:
sm.restart()
[docs] def start_all(self, fail_if_started=True):
"""
Starts all state machines
:param fail_if_started: If True, the object raises an exception when any state machine
was already started
"""
for sm in self.__machines:
if not sm.is_running:
sm.initialize_sm()
sm.start_execution(fail_if_started)
[docs] def initialize_all(self):
"""
Initializes all state machines
"""
for sm in self.__machines:
sm.initialize_sm()