Source code for hbp_nrp_excontrol.StateMachineInstance

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module contains the class which encapsulates the dynamically loaded smach state machines
 and provides the routines for their execution in parallel.
"""
# pylint: disable=redefined-builtin
from builtins import object

import sys
import os
import signal
import threading
import logging
import subprocess
import ast
import rospy

from cle_ros_msgs.srv import Trigger
from cle_ros_msgs.msg import CLEError

from hbp_nrp_excontrol.__internals import MAX_START_TIMEOUT, MAX_STOP_TIMEOUT, \
    SERVICE_START, SERVICE_RESTART, SERVICE_STATE, SERVICE_STOP, SERVICE_RESULT

__author__ = 'Sebastian Krach, Georg Hinkel'

logger = logging.getLogger(__name__)

python_interpreter = sys.executable
if python_interpreter.endswith("uwsgi") \
        or python_interpreter.endswith("uwsgi-core"):  # pragma: no cover
    # When started through uWSGI, sys.executable returns the uwsgi executable and not the python
    # interpreter (see https://github.com/unbit/uwsgi/issues/670 for details)
    home = os.environ.get('NRP_MODULE_HOME')
    if home is not None:
        # If NRP_MODULE_HOME is set, we take the python interpreter from there
        python_interpreter = os.path.join(home, "bin", "python")
    else:
        # Otherwise, we assume that the interpreter is in the PATH
        python_interpreter = "python"


[docs]class StateMachineInstance(object): """ The class that encapsulates experiment controlling state machines and manages their execution in separate processes """ def __init__(self, sm_id, sim_id, sim_dir, cle_error_pub=None, source_path=None): """ Creates a new uninitialized ExperimentStateMachineInstance :param sm_id: the unique identifier of the state machine :param sim_id: the simulation id :param sim_dir: absolute path to the simulation directory :param source_path: path to the python source of the state machine. """ self.__sm_id = sm_id self.sim_id = sim_id self.sim_dir = sim_dir self.error_publisher = cle_error_pub self.has_error = False self.__sm_path = source_path self.__sm_process = None self.__start_proxy = None self.__stop_proxy = None self.__state_proxy = None self.__restart_proxy = None self.__result_proxy = None self.__result = None self.__is_started = False @property def is_started(self): """ Gets a value indicating whether the state machine has been started """ if not self.is_running: self.__is_started = False return self.__is_started @property def sm_id(self): """ Gets the unique state machine identifier specified upon creation. :return: The unique state machine identifier. """ return self.__sm_id @property def sm_path(self): """ Gets or sets the path to the state machine code """ return self.__sm_path @sm_path.setter def sm_path(self, value): """ Gets or sets the path to the state machine code :param value: The new state machine code path """ if self.is_running: self._check_syntax(value) # If we came here, no syntax errors or type errors are in the file, so we can proceed self.request_termination() self.wait_termination() self.__sm_path = value self.initialize_sm() else: self.__sm_path = value # check for error AFTER having set the sm path # we need the path even if the code fails syntax checking self._check_syntax(value) self.has_error = False def _check_syntax(self, sm_path): """ Checks the state machine code in the file at sm_path for syntax errors. In case of errors, sets the has_error instance variable to True and raises a SyntaxError exception. :param sm_path: The state machine code path to be checked :raise: SyntaxError """ try: with open(sm_path) as source_file: ast.parse(source_file.read(), sm_path) except SyntaxError as e: if self.error_publisher: sm_id_str = str(self.__sm_id) error_msg = "SyntaxError (Line {0})".format(e.lineno) self.error_publisher.publish( CLEError(severity=CLEError.SEVERITY_ERROR, sourceType=CLEError.SOURCE_TYPE_STATE_MACHINE, errorType="Compile", message=error_msg, functionName=sm_id_str, lineNumber=e.lineno, offset=e.offset, lineText=e.text, fileName=sm_path) ) else: logger.error( "No Publisher available. Can't publish a SyntaxError for state machine %s", self.__sm_id) self.has_error = True raise @property def is_running(self): """ Gets for the encapsulated state machine whether its process is running or not :return: True if the state machine process is running, False otherwise. """ return self.__sm_process is not None and self.__sm_process.poll() is None
[docs] def poll(self): """ Queries the return code of the current state machine process """ if self.__sm_process is None: return None return self.__sm_process.poll()
@property def result(self): """ Gets the result of the state machine """ if self.__result is None and self.is_running: response = self.__result_proxy() if response.success: self.__result = response.message return self.__result @staticmethod def __check_result(response): """ Checks the Trigger result tuple and logs any error message to the logging mechanism :param response: The result tuple from a Trigger service """ if not response.success: logger.error(response.message) return response.success
[docs] def start_execution(self, fail_if_started=True): """ Starts the execution of the state machine in a separate thread. :param fail_if_started: If True, the state machines raises an Exception in case the state machine was started previously """ if not self.is_running: raise Exception("State machine {0} must be initialized first.".format(self.sm_id)) if self.__is_started: if fail_if_started: raise Exception("State machine {0} is already started.".format(self.sm_id)) logger.info("State machine %s is already started.", self.sm_id) return if self.has_error: raise Exception("State machine {0} has errors: it cannot be started".format(self.sm_id)) self.__is_started = self.__check_result(self.__start_proxy())
[docs] def initialize_sm(self): """ Loads the specified script file and initializes the encapsulated state machine. """ if self.is_running: raise Exception("State machine is already initialized.") if self.has_error: raise Exception("State machine has errors: it cannot be initialized") logger.info("Starting state machine process for state machine %s", self.sm_id) cur_dir = os.path.split(__file__)[0] runner = os.path.join(cur_dir, "StateMachineRunner.py") args = [python_interpreter, runner, "--name", str(self.sm_id), "--source", str(self.sm_path)] # Add this to enable pydevd debug: ,"-p","localhost","50003"] if self.sim_id is not None: args.append('--id') args.append(str(self.sim_id)) resource_path = os.path.join(self.sim_dir, 'resources') + ':' env_sm = os.environ.copy() env_sm['PATH'] = resource_path + env_sm['PATH'] # TODO: do we really need it in PATH? env_sm['PYTHONPATH'] = resource_path + env_sm['PYTHONPATH'] self.__sm_process = subprocess.Popen( args, stdout=sys.stdout, stderr=sys.stderr, env=env_sm) self.__start_proxy = rospy.ServiceProxy(SERVICE_START(self.sm_id), Trigger) self.__state_proxy = rospy.ServiceProxy(SERVICE_STATE(self.sm_id), Trigger) self.__stop_proxy = rospy.ServiceProxy(SERVICE_STOP(self.sm_id), Trigger) self.__restart_proxy = rospy.ServiceProxy(SERVICE_RESTART(self.sm_id), Trigger) self.__result_proxy = rospy.ServiceProxy(SERVICE_RESULT(self.sm_id), Trigger) self.__start_proxy.wait_for_service(MAX_START_TIMEOUT) self.__state_proxy.wait_for_service(MAX_START_TIMEOUT) self.__stop_proxy.wait_for_service(MAX_START_TIMEOUT) self.__restart_proxy.wait_for_service(MAX_START_TIMEOUT) self.__result_proxy.wait_for_service(MAX_START_TIMEOUT) self.__is_started = False logger.info("State machine process for state machine %s started.", self.sm_id)
[docs] def restart(self): """ Restarts the state machine """ if not self.is_running: self.initialize_sm() return response = self.__restart_proxy() if not response.success: logger.error(response.message) self.request_termination() self.wait_termination() self.initialize_sm()
[docs] def request_termination(self): """ Requests termination of the encapsulated state machine. As the termination happens asynchronously this method will return immediately after setting the request flag. If the state machine has already finished execution the method simply returns. """ if self.is_running: self.__check_result(self.__stop_proxy())
[docs] def wait_termination(self, timeout=MAX_STOP_TIMEOUT): """ Blocks until the encapsulated state machine has finished execution and a result is available This will wait until the state machine process has terminated or the timeout has passed. If the timeout passed without the state machine process to terminate, a SIGINT is passed to the process and we wait again. If this does not help, a SIGTERM is sent and we wait again. Finally, we send a SIGKILL and wait for the process to complete. :param timeout: Maximum waiting time in seconds """ if self.__sm_process is None or not self.is_running: return wait_thread = threading.Thread(target=self.__sm_process.wait) wait_thread.start() if wait_thread.isAlive(): # We are kindly asking you to stop, please logger.info("Shutting down state machine %s. Sending SIGINT", self.sm_id) self.__sm_process.send_signal(signal.SIGINT) wait_thread.join(timeout) if wait_thread.isAlive(): # Just stop now logger.info("State machine process %s still alive. Sending SIGTERM", self.sm_id) self.__sm_process.terminate() wait_thread.join(timeout) if wait_thread.isAlive(): logger.info("Killing the state machine process %s.", self.sm_id) # Well, then take this one self.__sm_process.kill() wait_thread.join()
[docs] def shutdown(self): """ Shuts down this state machine """ if self.is_running: self.request_termination()
[docs]def create_state_machine(sm_id, path): """ Creates a state machine instance descriptor for the given state machine :param sm_id: The id of the state machine :param path: The path to the state machine source code :return: A state machine descriptor """ with open(path, "r") as sm_source_file: sm_source = sm_source_file.readlines() sm_source = "".join(sm_source) return StateMachineInstance(sm_id, sm_source, None)