Source code for hbp_nrp_cleserver.server.CLEGazeboSimulationAssembly

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module contains the abstract base class of a simulation assembly using the CLE and Gazebo
"""
import io

from RestrictedPython import compile_restricted
from hbp_nrp_backend import NRPServicesGeneralException
from hbp_nrp_backend.storage_client_api.StorageClient import Model
from hbp_nrp_commons.sim_config.SimConfig import ResourceType
from hbp_nrp_commons.workspace.SimUtil import SimUtil
from hbp_nrp_cleserver.server.GazeboSimulationAssembly import GazeboSimulationAssembly
from hbp_nrp_cle.externalsim.ExternalModuleManager import ExternalModuleManager
from hbp_nrp_commons.ZipUtil import ZipUtil
from hbp_nrp_backend import get_date_and_time_string

# These imports start NEST.
from hbp_nrp_cleserver.server.ROSCLEServer import ROSCLEServer
from hbp_nrp_cle.cle.ClosedLoopEngine import DeterministicClosedLoopEngine, ClosedLoopEngine
import hbp_nrp_cle.tf_framework as tfm
import hbp_nrp_cle.brainsim.config as brainconfig
from cle_ros_msgs import srv
from hbp_nrp_cle.cle.DeterministicClosedLoopEngineProfiler import \
    DeterministicClosedLoopEngineProfiler

import logging
import os
import sys
import subprocess

logger = logging.getLogger(__name__)


[docs]class CLEGazeboSimulationAssembly(GazeboSimulationAssembly): # pylint: disable=no-self-use """ This class assembles the simulation using the CLE """ def __init__(self, sim_config): """ Creates a new simulation assembly to simulate an experiment using the CLE and Gazebo :param sim_config: config of the simulation to be managed """ super(CLEGazeboSimulationAssembly, self).__init__(sim_config) self.cle_server = None self._profilerBaseDir = 'profiler_data' self._simProfilerDir = '' self._sim_profiler_mode = sim_config.profiler def _initialize(self, except_hook): """ Internally initialize the simulation :param environment: The environment that should be simulated :param except_hook: A method that should be called when there is a critical error """ # pylint: disable=too-many-locals # create the CLE server and lifecycle first to report any failures properly # initialize the cle server and services logger.info("Creating CLE Server") self.cle_server = ROSCLEServer(self.sim_config.sim_id, self.sim_config.timeout, self.sim_config.timeout_type, self.gzserver, self.ros_notificator) self.cle_server.setup_handlers(self) # Put the resources folder into the sys path for import try: SimUtil.makedirs(self._simResourcesDir) with open(os.path.join(self._simResourcesDir, '__init__.py'), 'w+'): pass # make sure the __init__.py exists except IOError as err: logger.info( "Failed to setup resource directory due to %s", err) sys.path.insert(0, self._simResourcesDir) # start Gazebo simulator and bridge self._start_gazebo(extra_models=self.simAssetsDir + ':' + self.sim_dir) # load user textures in Gazebo self._load_textures() # load environment and robot models models, lights = self._load_environment( self.sim_config.world_model.resource_path.abs_path) # find robot self.robotManager.set_robot_dict(self.sim_config.robot_models) self._load_robot() robot_poses = {} for rid, robot in list(self.robotManager.get_robot_dict().items()): robot_poses[rid] = robot.pose # load robot adapters robotcomm, robotcontrol = self._create_robot_adapters() # load the brain braincontrol, braincomm, brainfile, brainconf = self._load_brain() # load external modules external_module_manager = ExternalModuleManager() # initialize the cle server and services logger.info("Preparing CLE Server") self.cle_server.cle = self.__load_cle(robotcontrol, robotcomm, braincontrol, braincomm, brainfile, brainconf, external_module_manager, robot_poses, models, lights) self.cle_server.prepare_simulation(except_hook) # load transfer functions self.__load_tfs() # Wait for the backend rendering environment to load (for any sensors/cameras) self._notify("Waiting for Gazebo simulated sensors to be ready") self.robotManager.scene_handler().wait_for_backend_rendering() # Spawns a new thread for the csv logger # pylint: disable=protected-access self.cle_server._csv_logger.initialize() def _load_environment(self, world_file_abs_path): """ Loads the environment and robot in Gazebo :param world_file_abs_path: Path to the world sdf """ self._notify("Loading experiment environment") return self.robotManager.scene_handler().parse_gazebo_world_file(world_file_abs_path) def _load_textures(self): """ Loads custom textures in Gazebo """ self._notify("Loading textures") try: textures = self.storage_client.get_textures_list( self.sim_config.experiment_id, self.sim_config.token) except Exception: # pylint: disable=broad-except logger.info("Non-existent textures or folder!") return # ignore missing textures or texture folder try: if textures: self.robotManager.scene_handler().load_textures(textures) except Exception: # pylint: disable=broad-except logger.info("Timeout while trying to load textures.") # pylint: disable-msg=too-many-branches def _load_robot(self): """ Loads robots defined in the bibi and initializes any external controller """ # Set retina config for the robotManager if self.sim_config.retina_config: self._notify("Configuring Retina Camera Plugin") self.robotManager.retina_config = self.sim_config.retina_config self._notify("Loading robots") self._prepare_simconfig_robot_models() for robot in self.robotManager.get_robot_dict().values(): self.robotManager.initialize(robot) # load external robot controller if self.sim_config.ext_robot_controller is not None: robot_controller_filepath = SimUtil.find_file_in_paths( self.sim_config.ext_robot_controller, self.sim_config.model_paths) if not os.path.isfile(robot_controller_filepath) and self.sim_dir is not None: robot_controller_filepath = os.path.join(self.sim_dir, self.sim_config.ext_robot_controller) if os.path.isfile(robot_controller_filepath): self._notify("Loading external robot controllers") # +1 res = subprocess.call([robot_controller_filepath, 'start']) if res > 0: logger.error( "The external robot controller could not be loaded") self.shutdown() return def _create_robot_adapters(self): # pragma: no cover """ Creates the adapter components for the robot side :return: A tuple of the communication and control adapter for the robot side """ raise NotImplementedError( "This method must be overridden in an implementation") def _load_brain(self): """ Loads the neural simulator, interfaces, and configuration """ # Create interfaces to brain self._notify("Loading neural simulator") brainconfig.rng_seed = self.rng_seed braincomm, braincontrol = self._create_brain_adapters() self._notify("Loading brain and population configuration") if not self.sim_config.brain_model: return braincontrol, braincomm, None, None if self.sim_config.brain_model.model: self._extract_brain_zip() brain_abs_path = self.sim_config.brain_model.resource_path.abs_path brain_rel_path = self.sim_config.brain_model.resource_path.rel_path if not os.path.exists(brain_abs_path): logger.info( "Cannot find specified brain file %s in %s. Searching in default directories %s", brain_rel_path, self.sim_dir, str(self.sim_config.model_paths)) brain_abs_path = SimUtil.find_file_in_paths( brain_rel_path, self.sim_config.model_paths) if brain_abs_path: self.sim_config.brain_model.resource_path.abs_path = brain_abs_path else: raise NRPServicesGeneralException( "Could not find brain file: {}".format(brain_rel_path), "CLE Error") neurons_config = self.sim_config.get_populations_dict() return braincontrol, braincomm, brain_abs_path, neurons_config def _extract_brain_zip(self): """ Checks for validity, and extracts a zipped brain. First we make sure that the zip referenced in the bibi exists in the list of user brains, then we unzip it on the fly in the temporary simulation directory. After the extraction we also make sure to copy the .py from the experiment folder cause the user may have modified it """ # pylint: disable=too-many-locals brain = Model( self.sim_config.brain_model.model, ResourceType.BRAIN) storage_brain_zip_data = self._storageClient.get_model( self.sim_config.token, self.sim_config.ctx_id, brain) if storage_brain_zip_data: # Extract and flatten # FixME: not sure exactly why flattening is required ZipUtil.extractall(zip_target=io.BytesIO(storage_brain_zip_data), extract_to=self.sim_dir, overwrite=True, flatten=False) # copy back the .py from the experiment folder, cause we don't want the one # in the zip, cause the user might have made manual changes # TODO: verify if this still required and why only one file is copied brain_name = os.path.basename( self.sim_config.brain_model.resource_path.rel_path) self._storageClient.clone_file( brain_name, self.sim_config.token, self.sim_config.experiment_id) # if the zip is not there, prompt the user to check his uploaded models else: raise NRPServicesGeneralException( "Could not find selected brain model {name} in the list of uploaded models. " "Please make sure that it has been uploaded correctly".format( name=self.sim_config.brain_model.model), "Zipped model retrieval failed") def _create_brain_adapters(self): # pragma: no cover """ Creates the adapter components for the neural simulator :return: A tuple of the communication and control adapter for the neural simulator """ raise NotImplementedError( "This method must be overridden in an implementation") # pylint: disable=too-many-arguments, too-many-locals def __load_cle(self, ros_control, ros_comm, brain_control, brain_comm, brain_file_path, neurons_config, external_module_manager, robots_poses, models, lights): """ Load the ClosedLoopEngine and initializes all interfaces :param ros_control: Robot Control Adapter to use :param ros_comm: Robot Communication Adapter to use :param brain_control: Brain Control Adapter to use :param brain_comm: Brain Communication Adapter to use :param brain_file_path: Accessible path to brain file :param neurons_config: Neuron configuration specified in the BIBI :param external_module_manager: External IBA Module Manager :param robots_poses: Initial poses of the robots in the scene :param models: Initial models loaded into the environment :param lights: Initial lights loaded into the environment """ # Needed in order to cleanup global static variables self._notify("Connecting brain simulator to robot") tfm.start_new_tf_manager() # Create transfer functions manager tf_manager = tfm.config.active_node # set adapters tf_manager.robot_adapter = ros_comm tf_manager.brain_adapter = brain_comm # integration timestep between simulators, convert from ms to s (default to CLE value) timestep_s = (ClosedLoopEngine.DEFAULT_TIMESTEP if self.sim_config.timestep is None else self.sim_config.timestep) ros_control.set_robots(self.robotManager.get_robot_dict()) # initialize CLE self._notify("Initializing CLE") if self._sim_profiler_mode == srv.CreateNewSimulationRequest.PROFILER_CLE_STEP or \ self._sim_profiler_mode == srv.CreateNewSimulationRequest.PROFILER_CPROFILE: # creates a folder in sim_dir/self._profilerBaseDir # where to store profiler data and passes it to the CLE self._simProfilerDir = '_'.join(['profiler_data', get_date_and_time_string()]) profile_tmp_dir = os.path.join(self.sim_dir, self._profilerBaseDir, self._simProfilerDir) SimUtil.makedirs(profile_tmp_dir) cle = DeterministicClosedLoopEngineProfiler(ros_control, ros_comm, brain_control, brain_comm, tf_manager, external_module_manager, timestep_s, self._sim_profiler_mode, profile_tmp_dir) else: cle = DeterministicClosedLoopEngine(ros_control, ros_comm, brain_control, brain_comm, tf_manager, external_module_manager, timestep_s) if brain_file_path: cle.initialize(brain_file_path, neurons_config) else: cle.initialize() # Set initial poses cle.initial_robots_poses = robots_poses # Set initial models and lights cle.initial_models = models cle.initial_lights = lights return cle def __load_tfs(self): """ Loads and connects all transfer functions """ self._notify("Loading transfer functions") for tf in self.sim_config.transfer_functions: self._notify("Loading transfer function: {}".format(tf.name)) # tf.code = correct_indentation(tf.code, 0) tf.code = tf.code.strip() + "\n" logger.debug("TF: %s\n%s\n", tf.name, tf.code) try: new_code = compile_restricted(tf.code, '<string>', 'exec') # pylint: disable=broad-except except Exception as e: logger.error("Error while compiling the transfer function %s in restricted " "mode with error %s", tf.name, str(e)) tfm.set_flawed_transfer_function(tf.code, tf.name, e) continue try: tfm.set_transfer_function( tf.code, new_code, tf.name, tf.active, tf.priority) except tfm.TFLoadingException as loading_e: logger.error(loading_e) tfm.set_flawed_transfer_function(tf.code, tf.name, loading_e) def _handle_gazebo_shutdown(self): """ Handles the case that Gazebo was shut down """ super(CLEGazeboSimulationAssembly, self)._handle_gazebo_shutdown() if self.cle_server is not None and self.cle_server.lifecycle is not None: # Set the simulation to halted self.cle_server.lifecycle.failed() # If not already stopped, free simulation resources self.cle_server.lifecycle.stopped()
[docs] def run(self): """ Runs the simulation """ self.cle_server.run()
def _shutdown(self, notifications): """ Shutdown the CLE and any hooks before shutting down Gazebo :param notifications: A flag indicating whether notifications should be attempted to send """ try: if notifications: self.ros_notificator.update_task( "Shutting down Closed Loop Engine", update_progress=True, block_ui=False) self.robotManager.shutdown() self.cle_server.shutdown() # copy cProfile stats to Storage if self._simProfilerDir: sim_profiler_path_from_exp = os.path.join(self._profilerBaseDir, self._simProfilerDir) # create profiler data base directory if not existing _ = self.storage_client.create_folder(self.sim_config.token, self.sim_config.experiment_id, self._profilerBaseDir) # create profiler data directory if not existing folder_uuid = self.storage_client.create_folder(self.sim_config.token, self.sim_config.experiment_id, sim_profiler_path_from_exp)['uuid'] profile_tmp_dir = os.path.join(self.sim_dir, sim_profiler_path_from_exp) for f_name in os.listdir(profile_tmp_dir): f_path = os.path.join(profile_tmp_dir, f_name) with open(f_path, 'rb') as f: self.storage_client.create_or_update( self.sim_config.token, folder_uuid, f_name, f.read(), 'application/octet-stream') # pylint: disable=broad-except except Exception as e: logger.error("The cle server could not be shut down") logger.exception(e) finally: # Restore sys path. Make sure that all instances are removed (if present) sys.path[:] = ( path for path in sys.path if path is not self._simResourcesDir)