# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module contains the class which encapsulates the dynamically loaded smach state machines
and provides the routines for their execution in parallel.
"""
# pylint: disable=redefined-builtin
from builtins import object
import sys
import os
import signal
import threading
import logging
import subprocess
import ast
import rospy
from cle_ros_msgs.srv import Trigger
from cle_ros_msgs.msg import CLEError
from hbp_nrp_excontrol.__internals import MAX_START_TIMEOUT, MAX_STOP_TIMEOUT, \
SERVICE_START, SERVICE_RESTART, SERVICE_STATE, SERVICE_STOP, SERVICE_RESULT
__author__ = 'Sebastian Krach, Georg Hinkel'
logger = logging.getLogger(__name__)
python_interpreter = sys.executable
if python_interpreter.endswith("uwsgi") \
or python_interpreter.endswith("uwsgi-core"): # pragma: no cover
# When started through uWSGI, sys.executable returns the uwsgi executable and not the python
# interpreter (see https://github.com/unbit/uwsgi/issues/670 for details)
home = os.environ.get('NRP_MODULE_HOME')
if home is not None:
# If NRP_MODULE_HOME is set, we take the python interpreter from there
python_interpreter = os.path.join(home, "bin", "python")
else:
# Otherwise, we assume that the interpreter is in the PATH
python_interpreter = "python"
[docs]class StateMachineInstance(object):
"""
The class that encapsulates experiment controlling state machines and manages their execution
in separate processes
"""
def __init__(self, sm_id, sim_id, sim_dir, cle_error_pub=None, source_path=None):
"""
Creates a new uninitialized ExperimentStateMachineInstance
:param sm_id: the unique identifier of the state machine
:param sim_id: the simulation id
:param sim_dir: absolute path to the simulation directory
:param source_path: path to the python source of the state machine.
"""
self.__sm_id = sm_id
self.sim_id = sim_id
self.sim_dir = sim_dir
self.error_publisher = cle_error_pub
self.has_error = False
self.__sm_path = source_path
self.__sm_process = None
self.__start_proxy = None
self.__stop_proxy = None
self.__state_proxy = None
self.__restart_proxy = None
self.__result_proxy = None
self.__result = None
self.__is_started = False
@property
def is_started(self):
"""
Gets a value indicating whether the state machine has been started
"""
if not self.is_running:
self.__is_started = False
return self.__is_started
@property
def sm_id(self):
"""
Gets the unique state machine identifier specified upon creation.
:return: The unique state machine identifier.
"""
return self.__sm_id
@property
def sm_path(self):
"""
Gets or sets the path to the state machine code
"""
return self.__sm_path
@sm_path.setter
def sm_path(self, value):
"""
Gets or sets the path to the state machine code
:param value: The new state machine code path
"""
if self.is_running:
self._check_syntax(value)
# If we came here, no syntax errors or type errors are in the file, so we can proceed
self.request_termination()
self.wait_termination()
self.__sm_path = value
self.initialize_sm()
else:
self.__sm_path = value
# check for error AFTER having set the sm path
# we need the path even if the code fails syntax checking
self._check_syntax(value)
self.has_error = False
def _check_syntax(self, sm_path):
"""
Checks the state machine code in the file at sm_path for syntax errors.
In case of errors, sets the has_error instance variable to True and
raises a SyntaxError exception.
:param sm_path: The state machine code path to be checked
:raise: SyntaxError
"""
try:
with open(sm_path) as source_file:
ast.parse(source_file.read(), sm_path)
except SyntaxError as e:
if self.error_publisher:
sm_id_str = str(self.__sm_id)
error_msg = "SyntaxError (Line {0})".format(e.lineno)
self.error_publisher.publish(
CLEError(severity=CLEError.SEVERITY_ERROR,
sourceType=CLEError.SOURCE_TYPE_STATE_MACHINE,
errorType="Compile", message=error_msg,
functionName=sm_id_str, lineNumber=e.lineno,
offset=e.offset, lineText=e.text, fileName=sm_path)
)
else:
logger.error(
"No Publisher available. Can't publish a SyntaxError for state machine %s",
self.__sm_id)
self.has_error = True
raise
@property
def is_running(self):
"""
Gets for the encapsulated state machine whether its process is running or not
:return: True if the state machine process is running, False otherwise.
"""
return self.__sm_process is not None and self.__sm_process.poll() is None
[docs] def poll(self):
"""
Queries the return code of the current state machine process
"""
if self.__sm_process is None:
return None
return self.__sm_process.poll()
@property
def result(self):
"""
Gets the result of the state machine
"""
if self.__result is None and self.is_running:
response = self.__result_proxy()
if response.success:
self.__result = response.message
return self.__result
@staticmethod
def __check_result(response):
"""
Checks the Trigger result tuple and logs any error message to the logging mechanism
:param response: The result tuple from a Trigger service
"""
if not response.success:
logger.error(response.message)
return response.success
[docs] def start_execution(self, fail_if_started=True):
"""
Starts the execution of the state machine in a separate thread.
:param fail_if_started: If True, the state machines raises an Exception in case the state
machine was started previously
"""
if not self.is_running:
raise Exception("State machine {0} must be initialized first.".format(self.sm_id))
if self.__is_started:
if fail_if_started:
raise Exception("State machine {0} is already started.".format(self.sm_id))
logger.info("State machine %s is already started.", self.sm_id)
return
if self.has_error:
raise Exception("State machine {0} has errors: it cannot be started".format(self.sm_id))
self.__is_started = self.__check_result(self.__start_proxy())
[docs] def initialize_sm(self):
"""
Loads the specified script file and initializes the encapsulated state machine.
"""
if self.is_running:
raise Exception("State machine is already initialized.")
if self.has_error:
raise Exception("State machine has errors: it cannot be initialized")
logger.info("Starting state machine process for state machine %s", self.sm_id)
cur_dir = os.path.split(__file__)[0]
runner = os.path.join(cur_dir, "StateMachineRunner.py")
args = [python_interpreter, runner, "--name", str(self.sm_id), "--source",
str(self.sm_path)] # Add this to enable pydevd debug: ,"-p","localhost","50003"]
if self.sim_id is not None:
args.append('--id')
args.append(str(self.sim_id))
resource_path = os.path.join(self.sim_dir, 'resources') + ':'
env_sm = os.environ.copy()
env_sm['PATH'] = resource_path + env_sm['PATH'] # TODO: do we really need it in PATH?
env_sm['PYTHONPATH'] = resource_path + env_sm['PYTHONPATH']
self.__sm_process = subprocess.Popen(
args, stdout=sys.stdout, stderr=sys.stderr, env=env_sm)
self.__start_proxy = rospy.ServiceProxy(SERVICE_START(self.sm_id), Trigger)
self.__state_proxy = rospy.ServiceProxy(SERVICE_STATE(self.sm_id), Trigger)
self.__stop_proxy = rospy.ServiceProxy(SERVICE_STOP(self.sm_id), Trigger)
self.__restart_proxy = rospy.ServiceProxy(SERVICE_RESTART(self.sm_id), Trigger)
self.__result_proxy = rospy.ServiceProxy(SERVICE_RESULT(self.sm_id), Trigger)
self.__start_proxy.wait_for_service(MAX_START_TIMEOUT)
self.__state_proxy.wait_for_service(MAX_START_TIMEOUT)
self.__stop_proxy.wait_for_service(MAX_START_TIMEOUT)
self.__restart_proxy.wait_for_service(MAX_START_TIMEOUT)
self.__result_proxy.wait_for_service(MAX_START_TIMEOUT)
self.__is_started = False
logger.info("State machine process for state machine %s started.", self.sm_id)
[docs] def restart(self):
"""
Restarts the state machine
"""
if not self.is_running:
self.initialize_sm()
return
response = self.__restart_proxy()
if not response.success:
logger.error(response.message)
self.request_termination()
self.wait_termination()
self.initialize_sm()
[docs] def request_termination(self):
"""
Requests termination of the encapsulated state machine. As the termination happens
asynchronously this method will return immediately after setting the request flag.
If the state machine has already finished execution the method simply returns.
"""
if self.is_running:
self.__check_result(self.__stop_proxy())
[docs] def wait_termination(self, timeout=MAX_STOP_TIMEOUT):
"""
Blocks until the encapsulated state machine has finished execution and a result is
available
This will wait until the state machine process has terminated or the timeout has passed.
If the timeout passed without the state machine process to terminate, a SIGINT is passed
to the process and we wait again. If this does not help, a SIGTERM is sent and we wait
again. Finally, we send a SIGKILL and wait for the process to complete.
:param timeout: Maximum waiting time in seconds
"""
if self.__sm_process is None or not self.is_running:
return
wait_thread = threading.Thread(target=self.__sm_process.wait)
wait_thread.start()
if wait_thread.isAlive():
# We are kindly asking you to stop, please
logger.info("Shutting down state machine %s. Sending SIGINT", self.sm_id)
self.__sm_process.send_signal(signal.SIGINT)
wait_thread.join(timeout)
if wait_thread.isAlive():
# Just stop now
logger.info("State machine process %s still alive. Sending SIGTERM", self.sm_id)
self.__sm_process.terminate()
wait_thread.join(timeout)
if wait_thread.isAlive():
logger.info("Killing the state machine process %s.", self.sm_id)
# Well, then take this one
self.__sm_process.kill()
wait_thread.join()
[docs] def shutdown(self):
"""
Shuts down this state machine
"""
if self.is_running:
self.request_termination()
[docs]def create_state_machine(sm_id, path):
"""
Creates a state machine instance descriptor for the given state machine
:param sm_id: The id of the state machine
:param path: The path to the state machine source code
:return: A state machine descriptor
"""
with open(path, "r") as sm_source_file:
sm_source = sm_source_file.readlines()
sm_source = "".join(sm_source)
return StateMachineInstance(sm_id, sm_source, None)