# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module contains the start script of a state machine process
"""
# pylint: disable=redefined-builtin
from builtins import object
import argparse
import os
import signal
import sys
import traceback
import logging
from threading import Thread
import importlib.util
import importlib.machinery
from warnings import warn
import rospy
from cle_ros_msgs.srv import Trigger
from cle_ros_msgs.msg import CLEError
from hbp_nrp_excontrol.__internals import SERVICE_STATE, SERVICE_RESTART, SERVICE_START, \
SERVICE_STOP, SERVICE_RESULT, ERROR_PUB, MAX_STOP_TIMEOUT
__author__ = 'Georg Hinkel'
logger = logging.getLogger(__name__)
[docs]class StateMachineRunner(object):
"""
Class encapsulating a running application to expose a state machine
"""
def __init__(self, sm_id, path):
"""
Initializes the current application with the given state machine id and source path
:param sm_id: The id of the current state machine
:param path: The path to the python source code
"""
self.__sm_id = sm_id
self.__sm_path = path
self.__sm_start_service = None
self.__sm_stop_service = None
self.__sm_restart_service = None
self.__sm_state_service = None
self.__sm_result_service = None
self.__error_publisher = None
self.__sm = None
self.__sm_result = None
self.__sm_thread = None
self.__result = None
[docs] def init(self):
"""
Initializes the current state machine runner
"""
self.__sm_start_service = rospy.Service(
SERVICE_START(self.__sm_id), Trigger,
self.start
)
self.__sm_stop_service = rospy.Service(
SERVICE_STOP(self.__sm_id), Trigger,
self.stop
)
self.__sm_restart_service = rospy.Service(
SERVICE_RESTART(self.__sm_id), Trigger,
self.restart
)
self.__sm_state_service = rospy.Service(
SERVICE_STATE(self.__sm_id), Trigger,
self.get_state
)
self.__sm_result_service = rospy.Service(
SERVICE_RESULT(self.__sm_id), Trigger,
self.get_result
)
self.__error_publisher = rospy.Publisher(ERROR_PUB, CLEError, queue_size=10)
logger.info("State machine services running for '%s'", self.__sm_id)
[docs] def send_error(self, msg, error_type,
severity=CLEError.SEVERITY_ERROR,
line_number=-1, offset=-1, line_text=""):
"""
Sends an error message to clients
:param msg: The error message
:param severity: The severity of the error
:param error_type: The error type, e.g. "Compile"
:param line_number: The line number where the error occurred
:param offset: The offset
:param line_text: The text of the line causing the error
"""
if severity >= CLEError.SEVERITY_ERROR:
logger.exception("Error in State Machines (%s): %s", error_type, msg)
self.__error_publisher.publish(
CLEError(severity, CLEError.SOURCE_TYPE_STATE_MACHINE, error_type, msg,
self.__sm_id, line_number, offset, line_text, self.__sm_path))
# pylint: disable=unused-argument
[docs] def initialize_sm(self, request):
"""
Initializes the current state machine
:param request: The mandatory ROS request parameter
"""
logger.info("Loading state machine code")
try:
loader = importlib.machinery.SourceFileLoader("sm_" + self.__sm_id, self.__sm_path)
spec = importlib.util.spec_from_loader(loader.name, loader)
sm_module = importlib.util.module_from_spec(spec)
loader.exec_module(sm_module)
except SyntaxError as e:
self.send_error(msg="SyntaxError in {} (Line {}): {}"
.format(self.__sm_id, e.lineno, str(e)),
error_type="Compile", line_number=e.lineno,
offset=e.offset, line_text=e.text)
return [False, str(e)]
except NameError as e:
try:
cl, _, tb = sys.exc_info()
error_class = cl.__name__
lineno = traceback.extract_tb(tb)[-1][1]
finally:
del tb # as recommended in the docs
self.send_error(msg="{0} in {1} (Line {2}): {3}"
.format(error_class, self.__sm_id, lineno, str(e)),
error_type="Compile", line_number=lineno)
return [False, str(e)]
except AttributeError as e:
try:
cl, _, tb = sys.exc_info()
error_class = cl.__name__
lineno = traceback.extract_tb(tb)[-1][1]
finally:
del tb # as recommended in the docs
self.send_error(msg="{0} in {1} (Line {2}): {3}"
.format(error_class, self.__sm_id, lineno, str(e)),
error_type="Compile", line_number=lineno)
return [False, str(e)]
except Exception as e: # pylint:disable=broad-except
self.send_error(msg="Error loading state machine {0}: {1}"
.format(self.__sm_id, str(e)),
error_type="Loading")
return [False, str(e)]
if hasattr(sm_module, "state_machine"):
self.__sm = sm_module.state_machine
elif hasattr(sm_module, "sm"):
self.__sm = sm_module.sm
else:
self.__sm = None
return [False, "State machine could not be found"]
return [True, ""]
def __execute(self):
"""
Executes the state machine catching exceptions
"""
# pylint: disable=broad-except
logger.info("Executing state machine")
try:
self.__result = str(self.__sm.execute())
except Exception as e:
logger.exception(e)
self.send_error(msg=str(e), error_type="Runtime")
# pylint: disable=unused-argument
[docs] def start(self, request):
"""
Starts the current state machine
:param request: The mandatory ROS request parameter
"""
if self.__sm is None:
logger.warning("State machine has not been initialized yet. Initializing it now.")
warn("State machine has not been initialized yet. Initializing it now.")
(result, message) = self.initialize_sm(request)
if not result:
return [False, message]
if self.__sm is None:
logger.warning("State machine could not be found. Cancelling start request")
return [False, "State machine could not be found. Cancelling start request"]
logger.info("Starting state machine")
if self.__sm_thread is not None and self.__sm_thread.isAlive():
self.__sm_thread.join(MAX_STOP_TIMEOUT)
if self.__sm_thread.isAlive():
return [False, "Cannot start state machine: "
"Failed to terminate previous state machine"]
self.__sm_thread = Thread(target=self.__execute)
self.__sm_thread.setDaemon(True)
self.__sm_thread.start()
return [True, ""]
def __halt(self):
"""
Halts the current state machine
:param request: The mandatory ROS request parameter
"""
logger.info("Stopping state machine '%s'", self.__sm_id)
if self.__sm is not None:
self.__sm.request_preempt()
self.__sm_thread.join(MAX_STOP_TIMEOUT)
if self.__sm_thread.isAlive():
return False
return True
# pylint: disable=unused-argument
[docs] def stop(self, request):
"""
Stops the current state machine
:param request: The mandatory ROS request parameter
"""
logger.info("Shutting down state machine services for '%s'", self.__sm_id)
self.__sm_start_service.shutdown()
self.__sm_stop_service.shutdown()
self.__sm_restart_service.shutdown()
self.__sm_state_service.shutdown()
self.__sm_result_service.shutdown()
self.__error_publisher.unregister()
if self.__halt():
return [True, ""]
return [False, "State machine did not terminate"]
# pylint: disable=unused-argument
[docs] def restart(self, request):
"""
Restarts the current state machine
:param request: The mandatory ROS request parameter
"""
if not self.__halt():
logger.warning("Stopping SM %s didn't work. Process must be killed.", self.__sm_id)
return [False, "Could not restart state machine. "
"Kill this process and create a new one!"]
self.initialize_sm(request)
self.start(request)
return [True, ""]
# pylint: disable=unused-argument
[docs] @staticmethod
def get_state(request):
"""
Gets the current state machines current state
:param request: The mandatory ROS request parameter
"""
raise NotImplementedError()
[docs] def get_result(self, request):
"""
Gets the result of the state machine
"""
result = self.__result
return [result is not None, str(result)]
def __except_hook(ex_type, value, ex_traceback):
"""
Logs the unhandled exception
:param ex_type: The exception type
:param value: The exception value
:param ex_traceback: The traceback
"""
logger.critical("Unhandled exception of type %s: %s", ex_type, value)
logger.exception(ex_traceback)
[docs]def set_up_logger(logfile_name, verbose=False):
"""
Configure the root logger of the CLE application
:param: logfile_name: name of the file created to collect logs
:param: verbose: increase logging verbosity
"""
# We initialize the logging in the startup of the whole CLE application.
# This way we can access the already set up logger in the children modules.
# Also the following configuration can later be easily stored in an external
# configuration file (and then set by the user).
log_format = '%(asctime)s [%(threadName)-12.12s] [%(name)-12.12s] [%(levelname)s] %(message)s'
try:
file_handler = logging.FileHandler(logfile_name)
file_handler.setFormatter(logging.Formatter(log_format))
logging.root.addHandler(file_handler)
except (AttributeError, IOError, TypeError) as _:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(log_format))
logging.root.addHandler(console_handler)
logger.warning("Could not write to specified logfile or no logfile specified, "
"logging to stdout now!")
logging.root.setLevel(logging.DEBUG if verbose else logging.INFO)
sys.excepthook = __except_hook
# pylint: disable=unused-argument
[docs]def print_full_stack_trace(sig, frame): # pragma: no cover
"""
Log the stack trace of all the threads
:param sig: The received signal
:param frame: The current stack frame
"""
logger.warning("*** STACKTRACE - START ***")
code = []
# pylint: disable=protected-access
for threadId, stack in sys._current_frames().items():
code.append("# ThreadID: %s" % threadId)
for filename, lineno, name, line in traceback.extract_stack(stack):
code.append('File: "%s", line %d, in %s' % (filename,
lineno, name))
if line:
code.append(" %s" % (line.strip()))
for line in code:
logger.warning(line)
logger.warning("*** STACKTRACE - END ***")
[docs]def main(): # pragma: no cover
"""
Runs the state machine runner
"""
if os.environ["ROS_MASTER_URI"] == "":
raise Exception("You should run ROS first.")
signal.signal(signal.SIGUSR1, print_full_stack_trace)
parser = argparse.ArgumentParser()
parser.add_argument("-n", "--name", dest="name", help="specify experiment id")
parser.add_argument("-s", "--source", dest="source", help="The path to the"
" state machine source code")
parser.add_argument('--logfile', dest='logfile', help='specify the state machine logfile')
parser.add_argument('-i', '--id', dest='id',
help="specify the id under which the state machine is run")
parser.add_argument("-v", "--verbose", help="Increase output verbosity", dest="verbose")
parser.add_argument('-p', '--pycharm',
dest='pycharm',
help='debug with pyCharm. IP adress and port are needed.',
nargs='+')
args = parser.parse_args()
if args.pycharm:
# pylint: disable=import-error
import pydevd
pydevd.settrace(args.pycharm[0],
port=int(args.pycharm[1]),
stdoutToServer=True,
stderrToServer=True)
import hbp_nrp_excontrol
if args.id:
hbp_nrp_excontrol.sim_id = args.id
hbp_nrp_excontrol.sm_id = args.name
# pylint: disable=global-statement
global logger
logger = logging.getLogger(args.name)
log_level = rospy.INFO
if args.verbose:
log_level = rospy.DEBUG
rospy.init_node("sm_" + args.name, log_level=log_level)
set_up_logger(args.logfile, args.verbose)
# pylint: disable=broad-except
try:
sm = StateMachineRunner(args.name, args.source)
sm.init()
hbp_nrp_excontrol.error_cb = sm.send_error
rospy.spin()
except Exception as e:
logger.exception(str(e))
if __name__ == "__main__": # pragma: no cover
main()