# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Implementation of the closed loop engine.
"""
__author__ = 'LorenzoVannucci'
import time
import logging
import threading
from concurrent.futures import Future
from hbp_nrp_cle.cle.CLEInterface import IClosedLoopControl, ForcedStopException
from hbp_nrp_cle.cle.CLEInterface import BrainTimeoutException
from hbp_nrp_cle.cle.__helper import get_tf_elapsed_times
from hbp_nrp_cle.robotsim.GazeboHelper import GazeboHelper
import hbp_nrp_cle as cle
logger = logging.getLogger('hbp_nrp_cle')
# pylint: disable=R0902
# the attributes are reasonable in this case
[docs]class DeterministicClosedLoopEngine(IClosedLoopControl):
"""
Implementation of the closed loop engine that runs Transfer functions (TF) sequentially
to brain simulation (B) and world simulation (W). Therefore, we are to some degree sure that
the TFs work on a reproducible state.
W is run in parallel with B. TF sequentially to those. i.e. seq(par(W,B), TF)
Notes about synchronization:
W, B start at cle.clock == 0 and they are asked to simulate one timestep.
After their completion, TF is run with t == cle.clock + timestep
so to be in sync with B and W, which have already simulated one timestep.
Thus, from the TF point of view, TF_t == B_t == W_t.
e.g. rospy.get_time() (which is in sync with W_t) should be equal to TF_t.
"""
# default simulation timestep in seconds (20 ms)
DEFAULT_TIMESTEP = 0.02
def __init__(self,
robot_control_adapter,
robot_comm_adapter,
brain_control_adapter,
brain_comm_adapter,
transfer_function_manager,
external_module_manager,
dt):
"""
Create an instance of the cle.
:param robot_control_adapter: an instance of IRobotContolAdapter
:param robot_comm_adapter: an instance of IRobotCommunicationAdapter
:param brain_control_adapter: an instance of IBrainContolAdapter
:param brain_comm_adapter: an instance of IBrainCommunicationAdapter
:param transfer_function_manager: an instance of ITransferFunctionManager
:param dt: The CLE time step in seconds
"""
self.rca = robot_control_adapter
self.rca_future = None
self.rcm = robot_comm_adapter
self.bca = brain_control_adapter
self.bcm = brain_comm_adapter
self.tfm = transfer_function_manager
self.ema = external_module_manager
# default timestep in seconds
self.timestep_s = dt
# stop event ask the main loop to stop
self.stop_event = threading.Event()
self.stop_event.clear()
# stopped event indicates that the main loop has stopped
self.stopped_event = threading.Event()
self.stopped_event.set()
# global clock
cle.clock = 0.0
self.initialized = False
# time measurements
# The real time is measured this way: each time play is called
# start_time is set to the current clock time. Each time the stop
# function is called (pausing the simulation) elapsed_time is
# incremented with (stop time - start_time). Thus, the real time is
# elapsed_time if paused
# elapsed_time + (current time - start_time) if running
self.start_time = 0.0
self.elapsed_time = 0.0
self._rca_elapsed_time = 0.0
self._bca_elapsed_time = 0.0
self._bca_step_time = 0.0
self.__network_file = None
self.__network_configuration = None
self.__initial_robots_poses = None
# This is to be eventually removed, as communications towards gazebo should
# be done from inside the RosControlAdapter
self.gazebo_helper = GazeboHelper()
self.__start_thread = None
self.__start_future = None
self.start_cb = lambda f: None
self.initial_models = None
self.initial_lights = None
self.sim_timeout_callback = lambda: None
[docs] def initialize(self, brain_file=None, configuration=None):
"""
Initializes the closed loop engine.
:param brain_file: A python PyNN script containing the neural network definition
:param configuration: A set of populations
"""
self.rca.initialize()
self.bca.initialize()
self.tfm.initialize('tfnode')
self.ema.initialize()
cle.clock = 0.0
self.start_time = 0.0
self.elapsed_time = 0.0
self.initialized = True
if brain_file:
self.__network_file = brain_file
self.__network_configuration = configuration
try:
self.load_brain(brain_file, configuration)
except BrainTimeoutException:
logger.info(
"Timeout occurs during loading the brain: %r", brain_file)
except Exception as e: # pylint: disable=broad-except
logger.exception(
"Compiling Error during loading the brain(%s): %r", brain_file, e)
@property
def is_initialized(self):
"""
Returns True if the simulation is initialized, False otherwise.
"""
return self.initialized
[docs] def load_populations(self, populations):
"""
load new populations into the brain
:param populations: A dictionary indexed by population names and
containing neuron indices. Neuron indices can be defined by
lists of integers or slices. Slices are either python slices or
dictionaries containing 'from', 'to' and 'step' values.
"""
if self.initialized:
if self.running:
self.stop()
if self.bca.is_alive():
self.bca.shutdown()
logger.info("Loading new populations")
self.bca.load_populations(populations)
self.tfm.hard_reset_brain_devices()
[docs] def load_brain(self, brain_file, brain_populations=None):
"""
Creates a new brain in the running simulation
:param brain_file: A python PyNN script or an h5 file
containing the neural network definition
:param brain_populations: A (optional) dictionary indexed by population names and
containing neuron indices. Neuron indices can be defined by
lists of integers or slices. Slices are either python slices or
dictionaries containing 'from', 'to' and 'step' values.
"""
if self.initialized:
if self.running:
self.stop()
if self.bca.is_alive():
self.bca.shutdown()
logger.info("Recreating brain from file %s", brain_file)
self.bca.load_brain(brain_file, brain_populations)
logger.info("Resetting TFs")
self.tfm.hard_reset_brain_devices()
[docs] def run_step(self, timestep_s):
"""
Runs both simulations for the given time step in seconds.
:param timestep_s: simulation time, in seconds
:return: Updated simulation time, otherwise -1
"""
# robot simulation
logger.debug("Run step: Robot simulation.")
self.rca_future = self.rca.run_step_async(timestep_s)
self.rcm.refresh_buffers(cle.clock)
# brain simulation
logger.debug("Run step: Brain simulation")
start = time.time()
self.bca.run_step(timestep_s * 1000.0)
self._bca_step_time = time.time() - start
self.bcm.refresh_buffers(cle.clock)
self._bca_elapsed_time += time.time() - start
# wait for all thread to finish
logger.debug("Run step: waiting on Control thread")
try:
f = self.rca_future
f.result()
self._rca_elapsed_time += f.end - f.start
except ForcedStopException:
logger.warn("Run step: Simulation was brutally stopped.")
return -1
self.ema.run_step()
# transfer functions
logger.debug("Run step: Transfer functions")
self.tfm.run_tfs(cle.clock + timestep_s)
# update clock
cle.clock += timestep_s
logger.debug("Run step: done!")
return cle.clock
[docs] def shutdown(self):
"""
Shuts down both simulations.
"""
self.stop_event.set()
self.stopped_event.wait(5)
self.rcm.shutdown()
self.bcm.shutdown()
self.rca.shutdown()
self.bca.shutdown()
self.ema.shutdown()
[docs] def start(self):
"""
Starts the orchestrated simulations
"""
if self.__start_future is None:
self.__start_future = Future()
if self.start_cb is not None:
self.start_cb(self.__start_future)
self.__start_thread = threading.Thread(target=self.loop, name='CLE_THREAD')
self.__start_thread.setDaemon(True)
self.__start_future.set_running_or_notify_cancel()
self.__start_thread.start()
return True
else:
logger.warning("CLE is already running")
return False
[docs] def loop(self):
"""
Starts the orchestrated simulations.
This function does not return (starts an infinite loop).
"""
logger.info("Simulation loop started")
try:
try:
self.stop_event.clear()
self.stopped_event.clear()
self.start_time = time.time()
while not self.stop_event.isSet():
self.run_step(self.timestep_s)
self.sim_timeout_callback()
self.__start_future.set_result(None)
finally:
logger.info("Simulation loop ended")
self.elapsed_time += time.time() - self.start_time
self.__start_thread = None
self.stopped_event.set()
# pylint: disable=broad-except
# we change the order intentionally (first finally then catch) because
# before throwing an custom exception (brainRuntimeException), the stopped event
# has to be set in order to finish the stop function flow.
except Exception as e:
logger.exception(e)
self.__start_future.set_exception(e)
finally:
self.__start_future = None
[docs] def stop(self, forced=False, in_loop=False):
"""
Stops the orchestrated simulations. Also waits for the current
simulation step to end.
Must be called on a separate thread from the start one, as an example
by a threading.Timer (except when in_loop is set).
:param forced: If set, the CLE instance cancels pending tasks
:param in_loop: If set, do not wait for multi-threading signals of pending tasks.
It must be set if and only if the function is called by the CLE main loop's thread.
:except Exception: throws an exception if the CLE was forced to stop but could not be
stopped
"""
self.stop_event.set()
if in_loop:
# since the method has been called after run_step() inside loop(), we are sure that
# the bca and rca are not running.
return
if forced:
if self.rca_future is not None and self.rca_future.running():
self.stopped_event.wait(5)
if self.rca_future.running():
self.rca_future.set_exception(ForcedStopException())
self.wait_step(timeout=self.bca.get_Timeout())
if not self.stopped_event.isSet():
raise Exception("The simulation loop could not be completed")
else:
self.wait_step()
[docs] def reset(self):
"""
Reset the orchestrated simulations (stops them before resetting).
"""
self.stop()
self.rca.reset()
self.bca.reset()
self.tfm.reset()
cle.clock = 0.0
self.start_time = 0.0
self.elapsed_time = 0.0
self._rca_elapsed_time = 0.0
self._bca_elapsed_time = 0.0
logger.info("CLE reset")
[docs] def reset_world(self, sdf_world_string=""):
"""
Reset the world to a given configuration, or to the initial one if none is passed.
:param sdf_world_string: the new environment to be set in the world simulator
(default value is the empty string for compatibility with the ROS message).
"""
if sdf_world_string:
models, lights = self.gazebo_helper.parse_world_string(sdf_world_string)
else:
# backward compatibility
# when working without collab, reset to the initial state
models = self.initial_models
lights = self.initial_lights
self.stop()
self.rca.reset_world(models, lights)
logger.info("CLE world reset")
# TODO: reset brain should just reset the brain, not allow to load a new one.
# For that purpose there is already load_brain.
[docs] def reset_brain(self, brain_file=None, populations=None):
"""
Reloads the brain and resets the transfer function.
If no parameter is specified, it reloads the initial brain.
:param brain_file: A python PyNN script containing the neural network definition
:param populations: A set of populations
"""
if not self.initialized:
return
if brain_file is not None:
self.load_brain(brain_file, populations)
elif self.__network_file is not None:
self.load_brain(self.__network_file, self.__network_configuration)
logger.info("CLE Brain reset")
@property
def simulation_time(self): # -> float64
"""
Get the current simulation time.
"""
return cle.clock
@property
def running(self):
"""
Gets a future indicating whether the simulation is running
"""
return self.__start_future is not None and self.__start_future.running()
@property
def real_time(self): # -> float64
"""
Get the total execution time.
"""
if self.running:
return self.elapsed_time + time.time() - self.start_time
return self.elapsed_time
@property
def initial_robots_poses(self):
"""
Get the robots poses at the beginning of the simulation.
"""
return self.__initial_robots_poses
@initial_robots_poses.setter
def initial_robots_poses(self, value):
"""
Set the robots poses at the beginning of the simulation.
Usually performed after CLE initialization.
:param value: The new value for the initial robot pose.
"""
self.__initial_robots_poses = value
[docs] def reset_robot_poses(self):
"""
Set the robots in the simulation to their initial pose.
"""
for rid, pose in self.__initial_robots_poses.items():
self.gazebo_helper.set_model_pose(str(rid), pose)
self.tfm.reset()
[docs] def tf_elapsed_time(self):
"""
Gets the time share of the Transfer Functions
"""
return get_tf_elapsed_times(self.tfm)
[docs] def brainsim_elapsed_time(self):
"""
Gets the time share of the brain simulation
"""
return self._bca_elapsed_time
[docs] def robotsim_elapsed_time(self):
"""
Gets the time share of the robot simulation
"""
return self._rca_elapsed_time
[docs] def wait_step(self, timeout=None):
"""
Wait for the currently running simulation step to end.
:param timeout: The maximum amount of time (in seconds) to wait for the end of this step
"""
self.stopped_event.wait(timeout)