Source code for hbp_nrp_cle.cle.ClosedLoopEngine

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
Implementation of the closed loop engine.
"""

__author__ = 'Georg Hinkel'

import hbp_nrp_cle as cle
from hbp_nrp_cle.cle.DeterministicClosedLoopEngine import DeterministicClosedLoopEngine
import time
import logging
import threading
from hbp_nrp_cle.cle.CLEInterface import ForcedStopException

logger = logging.getLogger('hbp_nrp_cle')


# pylint: disable=R0902
# the attributes are reasonable in this case
[docs]class ClosedLoopEngine(DeterministicClosedLoopEngine): """ Implementation of the closed loop engine that runs Transfer Functions (TF), brain simulation (B) and world simulation (W) concurrently for best effort performance World simulation is run in parallel in a separate process; Transfer Functions and brain simulation are run as python Threads. Notes about synchronization: TF, B and W start at cle.clock == 0 The difference between clocks is within one CLE timestep: i.e. abs(TF_t - B_t) < timestep , abs(TF_t - W_t) < timestep e.g. rospy.get_time() in a Transfer Function will not return, very likely, the same t which the TF has been called with (the t parameter). What is guaranteed is that the time difference stays within one CLE timestep. In fact, the components are waited on for step completion at the end of a simulated step (run_step method) and their relative intra-step speed depends on their respective workload and scheduling. """ def __init__(self, robot_control_adapter, robot_comm_adapter, brain_control_adapter, brain_comm_adapter, transfer_function_manager, external_module_manager, dt): """ Create an instance of the cle. :param robot_control_adapter: an instance of IRobotContolAdapter :param robot_comm_adapter: an instance of IRobotCommunicationAdapter :param brain_control_adapter: an instance of IBrainContolAdapter :param brain_comm_adapter: an instance of IBrainCommunicationAdapter :param transfer_function_manager: an instance of ITransferFunctionManager :param dt: The CLE time step in seconds """ super(ClosedLoopEngine, self).__init__(robot_control_adapter, robot_comm_adapter, brain_control_adapter, brain_comm_adapter, transfer_function_manager, external_module_manager, dt) self.__tf_thread = None self.__tf_start_event = threading.Event() self.__tf_done_event = threading.Event() # indicates that the simulation is shutting down self.shutdown_event = threading.Event() self.shutdown_event.clear() # indicates that the TFs loop is over self.tfs_stopped_event = threading.Event() self.tfs_stopped_event.clear()
[docs] def run_step(self, timestep_s): """ Runs simulations and TFs for the given time step in seconds. :param timestep_s: simulation time, in seconds :return: Updated simulation time, otherwise -1 """ self.__tf_start_event.set() # robot simulation self.rca_future = self.rca.run_step_async(timestep_s) self.rcm.refresh_buffers(cle.clock) # brain simulation logger.debug("Run step: Brain simulation") start = time.time() self.bca.run_step(timestep_s * 1000.0) self._bca_step_time = time.time() - start self.bcm.refresh_buffers(cle.clock) self._bca_elapsed_time += time.time() - start # Wait for all threads to finish # - World simulator logger.debug("Run step: wait on robot simulation.") try: f = self.rca_future f.result() self._rca_elapsed_time += f.end - f.start except ForcedStopException: logger.warn("Run step: Simulation was brutally stopped.") return -1 # - Transfer Functions logger.debug("Run step: wait on Transfer functions.") self.__tf_done_event.wait() self.__tf_done_event.clear() # update clock cle.clock += timestep_s logger.debug("Run_step: done !") return cle.clock
def __run_tfs(self): """ Runs the Transfer Functions. To be executed in a separate thread """ # simulation loop, return when done while not self.shutdown_event.isSet(): # step loop while not self.stop_event.isSet(): # wait for step start self.__tf_start_event.wait() self.__tf_start_event.clear() try: self.tfm.run_tfs(cle.clock) finally: self.__tf_done_event.set() # step is over self.tfs_stopped_event.set() # wait for the sim to be started again self.tfs_stopped_event.wait()
[docs] def start(self): """ Starts the orchestrated simulations """ if super(ClosedLoopEngine, self).start() and not self.__tf_thread: if not self.__tf_thread: self.__tf_thread = threading.Thread(target=self.__run_tfs, name="TFs_THREAD") self.__tf_thread.setDaemon(True) self.__tf_thread.start() # start/resume the TFs thread self.tfs_stopped_event.clear()
[docs] def shutdown(self): """ Shuts down the simulation. """ self.shutdown_event.set() super(ClosedLoopEngine, self).shutdown()