Source code for hbp_nrp_commons.simulation_lifecycle

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This package defines the simulation lifecycle used in the NRP Backend and Simulation Server components.
"""

import os
import json
import logging
import time
from typing import Optional, List

from hbp_nrp_commons.workspace.settings import Settings
from transitions import MachineError
from transitions.extensions import LockedMachine as Machine
import paho.mqtt.client as mqtt

__author__ = 'NRP software team, Georg Hinkel, Ugo Albanese'

logger = logging.getLogger(__name__)

# NOTE
# transition package logging is quite verbose, add one level to the parent one:
# i.e. parent_level + 10
logging.getLogger("transitions").setLevel(logger.getEffectiveLevel() + 10)

[docs]class SimulationLifecycle: """ Defines the lifecycle of a simulation in terms of state machine defined using the `pytransitions <https://github.com/pytransitions/transitions>`_ package. A Simulation is created in the :code:`created` initial state; the :code:`initialized` trigger makes it transition to :code:`paused`. The :code:`started` trigger makes it move to :code:`started` state, then :code:`completed` to :code:`completed` and finally, :code:`stopped` to the :code:`stopped` final state. From any running state, the :code:`failed` trigger will result in the final :code:`failed` state. After a transition, state changes may be propagated on the :code:`synchronization_topic` to other instances of :class:`.SimulationLifecycle`. See :ref:`"NRP Backend's life-cycle state machine" <life-cycles>` in :code:`hbp_nrp_backend` docs for more info. """ STATES: List[str] = [ 'created', 'paused', # Simulation resources have been initialized and waiting to start 'started', # Simulation is advancing 'completed', # Simulation has completed, waiting to be stopped 'stopped', # Simulation has been stopped, resources have been released. 'failed'] # Simulation has failed, resources have been released. INITIAL_STATE: str = 'created' FINAL_STATES: List[str] = ['stopped', 'failed'] RUNNING_STATES: List[str] = ['created', 'paused', 'started', 'completed'] ERROR_STATES: List[str] = ['failed'] TRIGGERS: List[str] = ['initialized', 'paused', 'started', 'completed', 'stopped', 'failed']
[docs] @staticmethod def is_state(state: str) -> bool: return state in SimulationLifecycle.STATES
[docs] @staticmethod def is_final_state(state: str) -> bool: return state in SimulationLifecycle.FINAL_STATES
[docs] @staticmethod def is_error_state(state: str) -> bool: return state in SimulationLifecycle.ERROR_STATES
[docs] @staticmethod def is_initial_state(state: str) -> bool: return state == SimulationLifecycle.INITIAL_STATE
def __after_state_change_callback(self, state_change): """ Callback to be executed after any state change. According to transitions' docs, it will be executed after 'transition.after' callbacks https://github.com/pytransitions/transitions#callback-execution-order """ self.__propagate_state_change(state_change) self.__shut_down_on_final_state(state_change) def __shut_down_on_final_state(self, state_change): """ Shuts self down if the destination state is one of FINAL_STATES. """ source_state = state_change.transition.source dest_state = state_change.transition.dest if not SimulationLifecycle.is_final_state(dest_state): logger.debug("Not a final state. Don't shutdown.") return if source_state != dest_state: # not a self transition time.sleep(1) logger.debug("Final state '%s' reached. Shutdown.", dest_state) self.shutdown(state_change) def __propagate_state_change(self, state_change): """ Propagates the state change to other simulation lifecycle implementations. :param state_change: The event that caused the state change From this, the trigger that caused the event is available as state_change.event.name The source state of the transition is state_change.transition.source The target state of the transition is state_change.transition.dest """ source_state = state_change.transition.source dest_state = state_change.transition.dest if 'silent' not in state_change.kwargs or not state_change.kwargs['silent']: # Publish the state change message. logger.debug("Propagating simulation lifecycle state change from '%s' to '%s'", source_state, dest_state) # For the transition out of the initial state (i.e. initialized), setting # the retain flag is crucial since it allows to create the two lifecycles asynchronously. # In fact, the Backend lifecycle transitions to initialized before the creation of # the simulation server lifecycle. # It's not a problem, since, upon connection to the broker, # the retained initialization message will be delivered to simulation server lifecycle # allowing it to transition correctly should_retain = self.is_initial_state(source_state) self.__mqtt_client.publish(topic=self.synchronization_topic, payload=json.dumps({"source_node": self.mqtt_client_id, "source_state": source_state, "event": state_change.event.name, "target_state": dest_state}), retain=should_retain) def __synchronized_lifecycle_changed(self, _client, _userdata, message): """ Gets called when the lifecycle of the simulation changed in another MQTT node: i.e. it's the state change MQTT message received callback :param message: received message. message.payload is a JSON with format: string source_node # The mqtt node from which the simulation lifecycle change was initiated string source_state # The source state of the lifecycle string event # The event that caused the state change string target_state # The target state name """ if not (decoded_msg := str(message.payload.decode("utf-8", "ignore"))): # ignore empty messages return try: state_change = json.loads(decoded_msg) except json.JSONDecodeError: logger.debug("[%s] Received malformed lifecycle synchronization message. Ignoring", self.mqtt_client_id) return source_state = state_change["source_state"] try: # don't recevive message from myself if self.mqtt_client_id == state_change["source_node"]: # receiver same as sender return logger.debug("[%s] Received lifecycle synchronization message: %s", self.mqtt_client_id, state_change) if self.state != source_state: logger.warning("The local simulation lifecycle and the remote version " "have diverged.") logger.warning("Moving to selected source state now") self.__machine.set_state(source_state) # pylint: disable=broad-except try: self.__machine.events[state_change["event"]].trigger(silent=True) except Exception as e: self.__machine.set_state(state_change["target_state"]) logger.exception( "Error while synchronizing the lifecycle: %s", str(e)) self.failed() except Exception as e2: logger.exception( "Error failing the simulation (this should never happen): %s", str(e2)) def __on_connect(self, client, _userdata: dict, _flags, _rc): logger.debug("Connected to MQTT broker with id '%s'", self.mqtt_client_id) # clear the topic from stale retained msgs if required self._clear_synchronization_topic() client.subscribe(self.synchronization_topic) logger.debug("Subscribed to %s MQTT topic", self.synchronization_topic) def _clear_synchronization_topic(self): """ Clear the synchronization_topic from retained messages if clear_synchronization_topic is True """ if self.clear_synchronization_topic: self.__mqtt_client.publish(topic=self.synchronization_topic, payload="", retain=True) def __init__(self, synchronization_topic: str, initial_state: str = INITIAL_STATE, propagated_destinations: Optional[List[str]] = STATES, mqtt_client_id: Optional[str] = None, mqtt_broker_host: str = Settings.mqtt_broker_host, mqtt_broker_port: int = Settings.mqtt_broker_port, mqtt_topics_prefix: str = Settings.mqtt_topics_prefix, clear_synchronization_topic=False): """ Creates a new synchronization lifecycle for the given topic :param synchronization_topic: The topic name used to synchronize the simulation lifecycles :param initial_state: The initial state of the lifecycle :param propagated_destinations: States for which change events should be propagated to other lifecycles :param mqtt_client_id: the MQTT Client ID of this lifecycle :param mqtt_broker_host: the host where to find the MQTT broker :param mqtt_broker_port: the port, on mqtt_broker_host, at which the MQTT broker is available :param mqtt_prefix: The prefix used to scope MQTT topics :param clear_synchronization_topic: Whether to clean on connect the synchronization topic of retained messages """ propagated_destinations = propagated_destinations \ if propagated_destinations is not None else SimulationLifecycle.STATES # mqtt self.mqtt_client_id = mqtt_client_id self.mqtt_broker_host = mqtt_broker_host self.mqtt_broker_port = mqtt_broker_port self.mqtt_topics_prefix = mqtt_topics_prefix # states for which change events should NOT be propagated to other lifecycles self._silent_destinations = frozenset(self.STATES) - frozenset(propagated_destinations) self.synchronization_topic = synchronization_topic self.clear_synchronization_topic = clear_synchronization_topic if self.mqtt_topics_prefix: self.synchronization_topic = f"{self.mqtt_topics_prefix}/{synchronization_topic}" # prefix mqtt_client_id with mqtt_topics_prefix # TODO it should be prefixed by a globally unique sim_id (NRRPLT-8917) self.mqtt_client_id = f"{self.mqtt_topics_prefix}_{mqtt_client_id}" # Transitions adds some members based on the STATES and transitions # We assign them dummy values here to avoid pylint warnings self.state = initial_state self.failed = lambda: None # create StateMachine and setup transitions self.__machine = Machine(model=self, states=SimulationLifecycle.STATES, initial=initial_state, after_state_change=self.__after_state_change_callback, send_event=True) self._add_transition(trigger='initialized', source='created', dest='paused', before='initialize') self._add_transition(trigger='started', source='paused', dest='started', before='start') self._add_transition(trigger='paused', source='started', dest='paused', before='pause') self._add_transition(trigger='completed', source='started', dest='completed') self._add_transition(trigger='stopped', source=SimulationLifecycle.RUNNING_STATES, dest='stopped', before='stop') self._add_transition(trigger='failed', source=['paused', 'started', 'completed'], dest='failed', after='fail') self._add_transition(trigger='failed', source='created', dest='failed', before='stop') # TODO reset support # NOTE MQTTv5 requires clean_start=True parameter to connect # instead of clean_session=True here self.__mqtt_client: Optional[mqtt.Client] = mqtt.Client(self.mqtt_client_id, clean_session=True) self.__mqtt_client.on_connect = self.__on_connect self.__mqtt_client.message_callback_add(self.synchronization_topic, self.__synchronized_lifecycle_changed) logger.debug("Connecting to the MQTT broker at %s:%s", str(self.mqtt_broker_host), str(self.mqtt_broker_port)) self.__mqtt_client.connect(host=self.mqtt_broker_host, port=self.mqtt_broker_port) self.__mqtt_client.loop_start() # start message processing thread def _add_transition(self, trigger, source, dest, before: Optional[str] = None, after: Optional[str] = None): """ Registers a new transition in the simulation lifecycle :param trigger: The trigger that should be used to activate the transition :param source: The source state, either as state name or list of STATES :param dest: The destination state name :param before: The method that should be run before the transition is applied and propagated :param after: The method that should be run after the transition has been applied successfully, yet still before state propagation """ if (dest != source) and (dest not in source): # add idempotent self transitions to avoid raising MachineError # in case of duplicated request of transitions self.__machine.add_transition(trigger=trigger, source=dest, dest=dest, before='set_silent') elif trigger in self.__machine.events: event = self.__machine.events[trigger] if dest in event.transitions: del event.transitions[dest] before_list = [before] if before is not None else [] if dest in self._silent_destinations: before_list += ['set_silent'] self.__machine.add_transition(trigger=trigger, source=source, dest=dest, before=before_list, after=after)
[docs] def accept_command(self, command): """ Accepts the given command for the simulation lifecycle. Any error during the execution of the command, results in a state transition to failed so to perform a cleanup. :param command: the command that should be activated :raise: Propagate any exception coming from the execution of the command :raise: ValueError: command is not valid for the current state """ # pylint: disable=broad-except try: self.__machine.events[command].trigger() except MachineError as m_e: raise ValueError from m_e except Exception as ex: logger.error("Error trying to execute command '%s'", command) logger.exception(ex) try: self.failed() except Exception as ex2: logger.error( "Error trying to perform cleanup operation for command '%s'", command) logger.exception(ex2) raise ex2 raise ex
[docs] @staticmethod def set_silent(state_change): """ Specifies that the given state change should not be propagated to other synchronized lifecycles :param state_change: The state change that should not be propagated """ state_change.kwargs['silent'] = True
[docs] def shutdown(self, _shutdown_event): """ Shuts down this simulation lifecycle instance :param _shutdown_event: The event that caused the shutdown """ if self.__mqtt_client is None: logger.debug("Double shutdown of %s lifecycle", self.mqtt_client_id) return # clear retained msg if required self._clear_synchronization_topic() self.__mqtt_client.unsubscribe(self.synchronization_topic) self.__mqtt_client.loop_stop() self.__mqtt_client.disconnect() self.__mqtt_client = None
# These methods will be overridden in the derived classes, thus we need to exclude them # from pylint
[docs] def initialize(self, state_change): """ Gets called when the simulation should be initialized. :param state_change: The state change that caused the simulation to initialize. """ raise NotImplementedError( "This state transition needs to be implemented in a concrete lifecycle")
[docs] def start(self, state_change): """ Gets called when the simulation needs to be started. :param state_change: The state change that caused the simulation to start. """ raise NotImplementedError( "This state transition needs to be implemented in a concrete lifecycle")
[docs] def pause(self, state_change): """ Gets called when the simulation needs to be paused. :param state_change: The state change that caused the simulation to pause. """ raise NotImplementedError( "This state transition needs to be implemented in a concrete lifecycle")
[docs] def stop(self, state_change): """ Gets called when the simulation needs to be stopped; it releases any simulation resource. :param state_change: The state change that caused the simulation to stop """ raise NotImplementedError( "This state transition needs to be implemented in a concrete lifecycle")
[docs] def fail(self, state_change): """ Gets called when the simulation fails. :param state_change: The state change that caused the simulation to fail. """ raise NotImplementedError( "This state transition needs to be implemented in a concrete lifecycle")
# TODO reset support # def reset(self, state_change): # """ # Gets called when the simulation is reset. # :param state_change: The state change that caused the simulation to reset. # """ # raise NotImplementedError( # "This state transition needs to be implemented in a concrete lifecycle")