Source code for hbp_nrp_simserver.server.simulation_server

# pylint: disable=too-many-lines
# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module implements the Simulation Server application.
It is managed by a :class:`.SimulationServerInstance`.
"""

from __future__ import annotations

import argparse
import json
import logging
import os
import signal
import sys
import threading
import time
from typing import List, Optional

import hbp_nrp_commons.timer as timer
import hbp_nrp_simserver.server as simserver
import hbp_nrp_simserver.server.experiment_configuration as exp_conf_utils
import hbp_nrp_simserver.server.simulation_server_lifecycle as simserver_lifecycle
from hbp_nrp_commons.workspace.settings import Settings
from hbp_nrp_commons.simulation_lifecycle import SimulationLifecycle
from hbp_nrp_simserver.server.mqtt_notifier import MQTTNotifier
from hbp_nrp_simserver.server.nrp_script_runner import NRPScriptRunner

from hbp_nrp_commons import set_up_logger

# Warning: We do not use __name__  here, since it translates to "__main__"
logger = logging.getLogger('hbp_nrp_simserver')


def __except_hook(ex_type, value, ex_traceback):
    """
    Logs the unhandled exception

    :param ex_type: The exception type
    :param value: The exception value
    :param ex_traceback: The traceback
    """
    logger.critical("Unhandled exception of type %s: %s", ex_type, value)
    logger.exception(ex_traceback)


[docs]class SimulationServer: STATUS_UPDATE_INTERVAL = 1.0 def __init__(self, sim_settings: simserver.SimulationSettings): """ Create the simulation server :param sim_settings: The simulation settings """ self.simulation_settings = sim_settings # set during initialization self.exp_config: Optional[exp_conf_utils.type_class] = None self._notifier: Optional[MQTTNotifier] = None self.__lifecycle: Optional[simserver_lifecycle.SimulationServerLifecycle] = None self.exit_state: Optional[str] = None self.__nrp_script_runner: Optional[NRPScriptRunner] = None self.__status_update_timer = timer.Timer(SimulationServer.STATUS_UPDATE_INTERVAL, self.publish_state_update, name="SimServerStatusUpdateTimer") @property def nrp_script_runner(self) -> Optional[NRPScriptRunner]: """ Gets the nrp_script_runner used by the server """ return self.__nrp_script_runner @property def lifecycle(self): """ Gets the lifecycle instance representing the current SimulationServer """ return self.__lifecycle @property def simulation_id(self) -> str: """ Gets the simulation ID, as configured in sim_settings, that is serviced. """ return self.simulation_settings.sim_id @property def simulation_time(self) -> float: """ :return: the simulation time if initialized, 0 otherwise """ return self.__nrp_script_runner.simulation_time if self.is_initialized else 0. @property def real_time(self) -> float: """ :return: the wall-clock time of the simulation if initialized, 0 otherwise """ return self.__nrp_script_runner.real_time if self.is_initialized else 0. @property def simulation_time_remaining(self) -> float: """ :return: the simulation time left until the timeout if initialized, 0 otherwise """ return self.__nrp_script_runner.simulation_time_remaining \ if self.is_initialized else 0. @property def is_initialized(self) -> bool: return (self._notifier is not None) and \ (self.__nrp_script_runner is not None) and \ (self.__lifecycle is not None)
[docs] def initialize(self, except_hook=None): """ Initialize the simulation server: - parse and validate the experiment configuration file - create the MQTT notifier - create NRPScriptRunner - create SimulationServerLifecycle - start the status update timer If anything goes wrong, the relative exception will be re-raised :param except_hook: A handler method for critical exceptions """ # the exception will be caught and logged by the caller self.exp_config = exp_conf_utils.validate( exp_conf_utils.parse(self.simulation_settings.exp_config_file)) # find MQTT broker address # from workspace.Settings (i.e. env var), otherwise from config if not Settings.is_mqtt_broker_default: broker_host, broker_port = Settings.mqtt_broker_host, Settings.mqtt_broker_port else: broker_host, broker_port = exp_conf_utils.mqtt_broker_host_port(self.exp_config) self.mqtt_topics_prefix = Settings.mqtt_topics_prefix logger.debug("Setting up simulation Notifier") self._notifier = MQTTNotifier(int(self.simulation_id), broker_hostname=broker_host, broker_port=int(broker_port), topics_prefix=Settings.mqtt_topics_prefix) try: logger.debug("Setting up a NRPScriptRunner") self.__nrp_script_runner = NRPScriptRunner(self.simulation_settings, self.exp_config, self.publish_error) logger.debug("Creating the simulation server lifecycle") self.__lifecycle = simserver_lifecycle.SimulationServerLifecycle( self, except_hook) except Exception: self._notifier.shutdown() raise self.__status_update_timer.start()
def _create_state_message(self): """ Creates a status message :return: A dictionary with status information """ return {'realTime': self.real_time, 'simulationTime': self.simulation_time, 'state': self.__lifecycle.state if self.__lifecycle else "", 'simulationTimeLeft': self.simulation_time_remaining }
[docs] def publish_state_update(self): """ Publish the simulation state and stats """ try: if not self.is_initialized: logger.debug( "Trying to publish state even though no simulation is active." " Simulation ID '%s'", self.simulation_id) return json_message = json.dumps(self._create_state_message()) # logger.debug("Sending status message: %s." # " Simulation ID '%s'", json_message, self.simulation_id) self._notifier.publish_status(json_message) # pylint: disable=broad-except except Exception as e: logger.exception(e)
[docs] def shutdown(self): if not self.is_initialized: logger.debug("Server un initialized. Can't shutdown. " "Simulation ID '%s'", self.simulation_id) return try: with self._notifier.task_notifier("Shutting down Simulation"): try: if self.__lifecycle.is_failed() or self.__lifecycle.is_stopped(): logger.debug( "Lifecycle state is already in a final state." " Simulation ID '%s'", self.simulation_id) else: # request and wait simulation stop # __lifecycle will initiate NRPScriptRunner shutdown logger.debug("Requesting lifecycle to stop. " "Simulation ID '%s'", self.simulation_id) self.__lifecycle.stopped() except Exception as e: logger.error("Exception while requesting lifecycle to stop. " "Simulation ID '%s'", self.simulation_id) logger.exception(e) else: logger.debug("Waiting for lifecycle to stop. " "Simulation ID '%s'", self.simulation_id) # TODO timeout and handle it self.__lifecycle.done_event.wait(10.) # NOTE Waiting point logger.debug("Lifecycle has stopped. " "Simulation ID '%s'", self.simulation_id) finally: self.exit_state = self.__lifecycle.state self.__lifecycle = None self.__nrp_script_runner = None # shutdown MQTTNotifier try: self._notifier.shutdown() except Exception as e: logger.error("The MQTT notifier could not be shut down. Simulation ID '%s'", self.simulation_id) logger.exception(e) finally: self._notifier = None finally: self.__status_update_timer.cancel_all()
[docs] def run(self): """ This method blocks the caller until the simulation is finished """ self.__lifecycle.done_event.wait() # NOTE Waiting point self.publish_state_update() # broadcast last status update before exiting time.sleep(1.0) logger.info( "Simulation Server main loop completed. Simulation ID '%s'", self.simulation_id)
[docs] def publish_error(self, msg: str, error_type: str, line_number: int = -1, offset: int = -1, line_text: str = ""): """ Sends an error message to clients (e.g. frontend) :param msg: The error message :param error_type: The error type, e.g. "Runtime" :param line_number: The line number where the error occurred :param offset: The offset :param line_text: The text of the line causing the error """ json_str = json.dumps({"sim_id": self.simulation_id, "msg": msg, "error_type": error_type, "fileName": self.simulation_settings.main_script_file, "line_number": line_number, "offset": offset, "line_text": line_text}) if self._notifier: self._notifier.publish_error(json_str) else: logger.warning("Publishing an Error but a Notifier is unavailable." " Simulation ID:' %s': '%s'", self.simulation_id, json_str)
[docs]def main(): # pragma: no cover sys.excepthook = __except_hook parser = argparse.ArgumentParser() parser.add_argument("-d", "--dir", dest="sim_dir", help="The path to simulation directory. Required", required=True) parser.add_argument("-s", "--script", dest="sim_script", help="The filename of script to run. Required", required=True) parser.add_argument("-c", "--config", dest="exp_config", required=True, help="The filename of the experiment configuration file. Required") parser.add_argument('-i', '--id', dest='sim_id', required=True, help="The simulation ID. Required") parser.add_argument('--logfile', dest='logfile', help='specify the state machine logfile') parser.add_argument("--verbose", dest="verbose_logs", help="Increase output verbosity", default=False, action="store_true") args = parser.parse_args() # Initialize root logger, any logger in this process will inherit the settings set_up_logger(name=None, logfile_name=args.logfile, level=logging.DEBUG if args.verbose_logs else logging.INFO) # Change working directory to experiment directory logger.info("Path is %s. Simulation ID '%s'", args.sim_dir, args.sim_id) os.chdir(args.sim_dir) sim_settings = simserver.SimulationSettings(sim_id=args.sim_id, sim_dir=args.sim_dir, exp_config_file=args.exp_config, main_script_file=args.sim_script) sim_server = SimulationServer(sim_settings) # pylint: disable=broad-except try: sim_server.initialize(except_hook=None) # TODO except_hook ?? except Exception as e: logger.error( "Simulation initialization failed. Simulation ID '%s'", sim_settings.sim_id) logger.exception(e) return simserver.ServerProcessExitCodes.INITIALIZATION_ERROR.value # Simulation server is now initialized # Event that signals when to terminate, gets set when: # - SIGTERM received # - after simserver.run has completed do_terminate_event = threading.Event() def termination_sig_handler(sig, _frame) -> None: print(f"Received '{sig}'. set do_terminate_event!. Simulation ID '{sim_settings.sim_id}'") do_terminate_event.set() # SimulationServerInstance uses SIGTERM to send shutdown requests # keep signal handler as simple as possible. # delegate actual work to terminator_thread # Shut down gracefully with SIGINT too. termination_signals = [signal.SIGINT, signal.SIGTERM] for signum in termination_signals: signal.signal(signum, termination_sig_handler) # unblock termination_signals signal.pthread_sigmask(signal.SIG_UNBLOCK, termination_signals) # use a separate thread to handle requests since simserver.run() is blocking thread_return_value: List[simserver.ServerProcessExitCodes] = [] terminator_thread = threading.Thread(target=_handle_shutdown, name="SimServerShutdownHandler", args=(sim_server, do_terminate_event, thread_return_value)) terminator_thread.start() try: sim_server.run() # Blocking call except Exception as e: logger.error( "Exception during simulation. Simulation ID '%s'", sim_settings.sim_id) logger.exception(e) finally: # sim_server.run() completed, let's shutdown do_terminate_event.set() terminator_thread.join() # TODO timeout? how long? # NOTE Waiting point return (thread_return_value[0] if thread_return_value else simserver.ServerProcessExitCodes.NO_ERROR ).value
def _handle_shutdown(sim_server: SimulationServer, terminate_event: threading.Event, return_value: List[simserver.ServerProcessExitCodes]) -> None: """ A thread that takes care of shutting down the simulation server The process starts when terminate_event gets set by the :code:`SIGTERM` handler or after :code:`SimulationServer.run()` termination. IF an error occurred while the simulation was running appends :code:``simserver.ServerProcessExitCodes.RUNNING_ERROR` to return_value. If an error occurs during shutdown :code:``SHUTDOWN_ERROR` is appended, otherwise :code:``NO_ERROR`. """ exit_codes = simserver.ServerProcessExitCodes sim_id = sim_server.simulation_settings.sim_id if not sim_server.is_initialized: logger.warning( "Shutdown. Simulation server not initialized. Simulation ID '%s'", sim_id) else: # wait on closed signal handler or SimulationServer.run() termination terminate_event.wait() # NOTE Waiting point logger.info("Shutdown. Initiating. Simulation ID '%s'", sim_id) try: # request the simulation server to shut down sim_server.shutdown() except Exception as e: logger.error( "Shutdown. An error occurred. Simulation ID '%s'", sim_id) logger.exception(e) return_value.append(exit_codes.SHUTDOWN_ERROR) return logger.info("Shutdown. Completed. Simulation ID '%s'", sim_id) if SimulationLifecycle.is_error_state(sim_server.exit_state): return_value.append(exit_codes.RUNNING_ERROR) return # happy path return_value.append(exit_codes.NO_ERROR) return if __name__ == "__main__": # pragma: no cover sys.exit(main())