# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module represents the configuration of a running simulation
"""
from __future__ import division
from builtins import object
import os
import logging
from enum import Enum
from pyxb import ValidationError, NamespaceError
from hbp_nrp_cle.robotsim.RobotManager import Robot
from hbp_nrp_commons.sim_config.SimConfUtil import SimConfUtil
from hbp_nrp_commons.workspace.Settings import Settings
from hbp_nrp_commons.generated import bibi_api_gen as bibi_parser, exp_conf_api_gen as exc_parser
from hbp_nrp_cleserver.bibi_config.bibi_configuration_script import (get_all_neurons_as_dict)
from hbp_nrp_cleserver.bibi_config.bibi_configuration_script import (generate_tf, get_tf_name)
from cle_ros_msgs.srv import CreateNewSimulationRequest
__author__ = 'Hossain Mahmud'
logger = logging.getLogger('SimConfig')
[docs]class Version(Enum): # pragma: no cover
"""
Enumeration for exc and bibi version
"""
CURRENT = "Current"
[docs]class SimulationType(Enum): # pragma: no cover
"""
Enumeration for different simulation type
"""
NEST_SYNC = 0x11000001
NEST_DIST = 0x11000002
SPINNAKER_SYNC = 0x11000003
NENGO_SYNC = 0x11000004
NENGO_LOIHI_SYNC = 0x11000005
ROBOT_ROS_SYNC = 0x11000006
NEST_DIRECT_SYNC = 0x11000007
[docs]class ResourceType(Enum): # pragma: no cover
"""
Enumeration for model types. Robot resources are enhanced in a different structure
"""
MODEL = 0x11100001
ROBOT = 0x11100002
BRAIN = 0x11100003
ENVIRONMENT = 0x11100005
[docs]class ResourcePath(object): # pragma: no cover
"""
Path place holder
"""
def __init__(self, rel_path, located_at=None):
"""
Initializes a resource path
:param string rel_path: relative path
:param string located_at: from where the rel_path is relative
"""
self.rel_path = rel_path
self.abs_path = None if located_at is None else os.path.join(located_at, rel_path)
[docs]class ExperimentResource(object): # pragma: no cover
"""
Generalized model description of experiment resources. Can be BRAIN, WORLD, or any other MODEL
"""
def __init__(self, resource_type, resource_rel_path, model_name=None, located_at=None):
"""
Initializes an experiment resource
:param string resource_type: type of the resource
:param string resource_rel_path: relative path of the resource
:param string located_at: location of the resources (where relative paths are valid)
"""
self.resource_type = resource_type
self.resource_path = ResourcePath(resource_rel_path, located_at)
self.model = model_name
class _TF(object): # pragma: no cover
"""
Model to store TF information
"""
def __init__(self, name, code, src=None, active=False, priority=0):
"""
Initialize a transfer function object
:param name: name of the transfer function
:param code: compiled (restricted) source code of transfer function
:param src: source code (plain text) of transfer function
:param active: active state of the transfer function
:param priority: specifies execution order of the transfer function. Transfer functions with
higher priority are executed first.
"""
self.name = name
self.code = code
self.src = src
self.active = active
try:
self.priority = int(priority)
except (ValueError, TypeError):
self.priority = 0
[docs]class SimConfig(object):
"""
Abstraction of exc and bibi configuration
"""
def __init__(self, exc_abs_path, **params):
"""
Initialize a sim config object
:param exc_abs_path: absolute path to experiment configuration (exc) file
:param par: dictionary of any other required params
"""
# ground truth
self._sim_dir = os.path.dirname(exc_abs_path)
self._exc_path = ResourcePath(os.path.basename(exc_abs_path), self.sim_dir)
# base assembly
self._exc_dom = None
self._bibi_dom = None
self._sim_id = params.get('sim_id', None)
self._token = params.get('token')
self._ctx_id = params.get('ctx_id')
# gazebo assembly
self._timeout = params.get('timeout', None)
self._timeout_type = params.get('timeout_type', None)
self._gzserver_host = params.get('gzserver_host', 'local')
self._experiment_id = params.get('experiment_id', None)
self._reservation = params.get('reservation', None)
self._rng_seed = params.get('rng_seed', None)
self._playback_path = params.get('playback_path', None)
self._gazebo_lockstep = False
self._physics_engine = None
self._gzweb = None
self._ros_launcher = None
self._gazebo_recorder = None
# exc info
self.exc_version = None
# bibi info
self._populations_dict = {}
self._tfs = []
# paths
self._bibi_path = None
self._brain_abs_path = None
self._robot_models = {}
self._brain_model = None
self._world_model = None
self._simulation_type = None
self._num_brain_processes = params.get('brain_processes', None)
self._profiler = params.get('profiler', CreateNewSimulationRequest.PROFILER_DISABLED)
# paths from system config
self._model_paths = [self._sim_dir, os.path.join(self._sim_dir, 'assets')]
# rostopic recording
self._record_ros_topics = None
self.initialize()
[docs] def initialize(self):
"""
Initialize data members of the sim config
"""
self._read_exc_and_bibi_dom_objects()
self._read_dom_data()
self._model_paths.append(os.path.join(self._sim_dir, 'robots'))
if Settings.nrp_models_directory is not None:
self._model_paths.append(Settings.nrp_models_directory)
def _read_exc_and_bibi_dom_objects(self):
"""
Parse experiment and bibi and return the DOM objects
"""
# Read exc
with open(self._exc_path.abs_path) as excFile:
try:
self._exc_dom = exc_parser.CreateFromDocument(excFile.read())
except ValidationError as ve:
raise Exception("Could not parse experiment config {0} due to validation "
"error: {1}".format(self._exc_path.abs_path, str(ve)))
self._bibi_path = ResourcePath(
self._exc_dom.bibiConf.src, self.sim_dir)
logger.info("Bibi absolute path: %s", self._bibi_path.abs_path)
# Read bibi
with open(self._bibi_path.abs_path) as bibiFile:
try:
self._bibi_dom = bibi_parser.CreateFromDocument(
bibiFile.read())
except ValidationError as ve:
raise Exception("Could not parse brain configuration {0:s} due to validation "
"error: {1:s}".format(self._bibi_path.abs_path, str(ve)))
except NamespaceError as ne:
# first check to see if the BIBI file appears to have a valid namespace
namespace = str(ne).split(" ", 1)[0]
if not namespace.startswith("http://schemas.humanbrainproject.eu/SP10") or \
not namespace.endswith("BIBI"):
raise Exception("Unknown brain configuration file format for: {0:s} with "
"namespace: {1:s}".format(self._bibi_path.abs_path, namespace))
# notify the user that their file is out of date
raise Exception("The BIBI file for the requested experiment is out of date and no "
"longer supported. Please contact neurorobotics@humanbrainproject."
"eu with the following information for assistance in updating this"
" file.\n\nExperiment Configuration:\n\tName: {0:s}\n\tBIBI: {1:s}"
"\n\tVersion: {2:s}".format(self._exc_dom.name,
self._bibi_path.abs_path, namespace))
# set config version based of something
self.exc_version = Version.CURRENT
[docs] def get_world_model(self):
"""
it gets the data necessary to create a world model
"""
model_name = None
rel_path = self._exc_dom.environmentModel.src
if self._exc_dom.environmentModel.model:
model_name = self._exc_dom.environmentModel.model
rel_path = os.path.join(model_name, rel_path)
self._world_model = ExperimentResource(
resource_type=ResourceType.ENVIRONMENT,
resource_rel_path=rel_path,
model_name=model_name,
located_at=self._sim_dir
)
[docs] def get_brain_model(self):
"""
it gets the data necessary to create the brain model
"""
if self._bibi_dom.brainModel:
brain_model_name = None
if self._bibi_dom.brainModel.model:
brain_model_name = self._bibi_dom.brainModel.model
rel_path = os.path.join(brain_model_name, self._bibi_dom.brainModel.file)
else:
rel_path = self._bibi_dom.brainModel.file
self._brain_model = ExperimentResource(
resource_type=ResourceType.BRAIN,
resource_rel_path=rel_path,
model_name=brain_model_name,
located_at=self.sim_dir
)
def _read_dom_data(self):
"""
Populate different models
"""
# if user did not provide the number of brain processes in ROSPY request, use from bibi
if self._num_brain_processes is None:
self._num_brain_processes = self._exc_dom.bibiConf.processes
try:
self._simulation_type = {
None: SimulationType.NEST_SYNC, # default
bibi_parser.SimulationMode.SynchronousNestSimulation: SimulationType.NEST_SYNC,
bibi_parser.SimulationMode.SynchronousSpinnakerSimulation:
SimulationType.SPINNAKER_SYNC,
bibi_parser.SimulationMode.SynchronousNengoSimulation: SimulationType.NENGO_SYNC,
bibi_parser.SimulationMode.SynchronousNengoLoihiSimulation: \
SimulationType.NENGO_LOIHI_SYNC,
bibi_parser.SimulationMode.SynchronousRobotRosNest: SimulationType.ROBOT_ROS_SYNC,
bibi_parser.SimulationMode.SynchronousDirectNestSimulation:
SimulationType.NEST_DIRECT_SYNC,
}[self._bibi_dom.mode]
except KeyError:
raise Exception("Unsupported multi-process simulation mode requested: {}"
.format(str(self._bibi_dom.mode)))
# Rename multi-process NEST for convenience
if self._num_brain_processes > 1 and self._simulation_type is SimulationType.NEST_SYNC:
self._simulation_type = SimulationType.NEST_DIST
# Environment model
self.get_world_model()
# gazebo's lockstep update feature
self._gazebo_lockstep = (self._exc_dom.gazeboLockstep
if self._exc_dom.gazeboLockstep is not None else False)
logger.info("Gazebo's lockstep mode set to %s", self._gazebo_lockstep)
# physics engine
self._physics_engine = (self._exc_dom.physicsEngine
if self._exc_dom.physicsEngine is not None else 'ode')
logger.info("Using physics engine %s", self._physics_engine)
# Robot model. Read all bodyModel tag(s)
self._read_robot_models()
# Brain model
self.get_brain_model()
# Populations
if self._bibi_dom.brainModel and self._bibi_dom.brainModel.populations:
self._populations_dict = get_all_neurons_as_dict(
self._bibi_dom.brainModel.populations)
# Transfer functions
for _tf in self._bibi_dom.transferFunction:
code = generate_tf(_tf, self.sim_dir)
name = get_tf_name(code)
src = _tf.src if _tf.src else None # must be not None and not ""
priority = _tf.priority if _tf.priority else 0
active = bool(_tf.active) if _tf.active else False
self._tfs.append(_TF(name, code, src, active, priority))
def _read_robot_models(self):
"""
Populate _robot_models. To be used in the robot manager
"""
for elem in self._bibi_dom.bodyModel:
if elem.robotId is None:
elem.robotId = 'robot'
elif not elem.robotId or elem.robotId in self._robot_models:
raise Exception(
"Multiple bodyModels with no or same robot id found")
pose = None
for rpose in self._exc_dom.environmentModel.robotPose:
if (not rpose.robotId) or rpose.robotId == elem.robotId:
pose = SimConfUtil.convertXSDPosetoPyPose(rpose)
if not hasattr(elem, "model"):
raise Exception(
"No robotModelName is provided in bibi.bodyModel.model")
robot_model = elem.model
# Robot has specialized info. Create direct Robot object instead of ExperimentResource
self._robot_models[elem.robotId] = Robot(
rid=elem.robotId,
# store the relative path for the time being
sdf_abs_path=elem.value(),
display_name=elem.robotId,
pose=pose,
roslaunch_abs_path=None,
model=robot_model) # gets updated during the Assembly initialization
[docs] def get_populations_dict(self):
"""
:return: dict containing all the populations
"""
return self._populations_dict
[docs] def gzbridge_setting(self, name, default):
"""
Gets the gzbridge setting
"""
try:
s = self._exc_dom.gzbridgesettings
val = getattr(s, name)
val = type(default)(val)
# pylint: disable=broad-except
except Exception:
val = default
return repr(val)
@property
def exc_dom(self):
"""
Restrict external access of exc_dom
:raises exception: while accessing DOM object directly
"""
# pylint: disable=no-self-use
raise AttributeError(
"Direct use of exc DOM object inside CLE is forbidden")
@property
def bibi_dom(self):
"""
Restrict external access of bibi_dom
:raises exception: while accessing DOM object directly
"""
# pylint: disable=no-self-use
raise AttributeError(
"Direct use of bibi DOM object inside CLE is forbidden")
@property
def profiler(self):
"""
Returns the profiler mode for this simulation
"""
return self._profiler
@property
def token(self):
"""
Returns the token assigned to this simulation
"""
return self._token
@property
def ctx_id(self):
"""
Returns the context id assigned to this simulation
"""
return self._ctx_id
@property
def sim_id(self):
"""
Gets the simulation id
"""
return self._sim_id
@property
def sim_dir(self):
"""
Gets the simulation directory
"""
return self._sim_dir
@property
def timeout(self):
"""
Gets the simulation directory
"""
return self._timeout
@property
def timeout_type(self):
"""
Gets the simulation directory
"""
return self._timeout_type
@property
def exc_path(self):
"""
Gets the simulation exc file path
"""
return self._exc_path
@property
def bibi_path(self):
"""
Gets the simulation bibi file ResourcePath
"""
return self._bibi_path
@property
def playback_path(self):
"""
Gets the playback path
"""
return self._playback_path
@property
def simulation_type(self):
"""
Gets the playback path
"""
return self._simulation_type
@property
def gzserver_host(self):
"""
Gets the gzserver host location
"""
return self._gzserver_host
@property
def reservation(self):
"""
Gets the reservation for cluster (typically a ssh-able hostname, etc.)
"""
return self._reservation
@property
def experiment_id(self):
"""
Gets the experiment id (typically the folder name in the storage server)
"""
return self._experiment_id
@property
def model_paths(self):
"""
Gets the model paths
"""
return self._model_paths
@property
def world_model(self):
"""
Gets the environment model info
"""
return self._world_model
@property
def robot_models(self):
"""
Gets the robot models
"""
return self._robot_models
@property
def brain_model(self):
"""
Gets the robot models
"""
return self._brain_model
@property
def transfer_functions(self):
"""
Gets the list of transfer functions
"""
return self._tfs
@property
def gazebo_lockstep(self):
"""
Gets gazebo_lockstep argument
"""
return self._gazebo_lockstep
@property
def physics_engine(self):
"""
Gets the physics_engine
"""
return self._physics_engine
@property
def ros_launch_abs_path(self):
"""
Gets the ros_launch absolute path
"""
return self._exc_dom.rosLaunch.src if self._exc_dom.rosLaunch else None
@property
def ext_robot_controller(self):
"""
Gets the ext_robot_controller relative path
"""
return self._bibi_dom.extRobotController
@property
def num_brain_processes(self):
"""
Gets the number of brain processes
"""
return self._num_brain_processes
@property
def rng_seed(self):
"""
Gets the number of brain processes
"""
return self._rng_seed if self._rng_seed else self._exc_dom.rngSeed
@property
def retina_config(self):
"""
Gets the retina config file (if provided)
"""
confs = [
conf.src for conf in self._bibi_dom.configuration if conf.type == 'retina']
if confs:
return confs[0]
return None
@property
def timestep(self):
"""
Gets the number of brain processes
"""
return None if self._bibi_dom.timestep is None else float(self._bibi_dom.timestep) / 1000.0
@property
def record_ros_topics(self):
"""
Gets the rostopics that shall be recorded with rosbag in addition to the common ones
"""
return self._exc_dom.recordRosTopics