Source code for hbp_nrp_backend.cle_interface.ROSCLEClient

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
Takes care of making the appropriate ROS call(s) to control a simulation.
On the other side of ROS, the calls are handled by ROSCLEServer.py
"""

from builtins import object
import logging
import rospy
# This package comes from the catkin package ROSCLEServicesDefinitions
# in the GazeboRosPackages repository.
from cle_ros_msgs import srv, msg
from hbp_nrp_backend.cle_interface import SERVICE_SIM_RESET_ID, \
    SERVICE_GET_TRANSFER_FUNCTIONS, SERVICE_EDIT_TRANSFER_FUNCTION, \
    SERVICE_ACTIVATE_TRANSFER_FUNCTION, SERVICE_ADD_TRANSFER_FUNCTION, \
    SERVICE_DELETE_TRANSFER_FUNCTION, SERVICE_SET_BRAIN, SERVICE_GET_BRAIN, \
    SERVICE_GET_POPULATIONS, SERVICE_SET_POPULATIONS, \
    SERVICE_GET_CSV_RECORDERS_FILES, SERVICE_SIM_EXTEND_TIMEOUT_ID, \
    SERVICE_SIMULATION_RECORDER, \
    SERVICE_CONVERT_TRANSFER_FUNCTION_RAW_TO_STRUCTURED, \
    SERVICE_ADD_ROBOT, SERVICE_GET_ROBOTS, SERVICE_DEL_ROBOT, SERVICE_SET_EXC_ROBOT_POSE, \
    SERVICE_PREPARE_CUSTOM_MODEL

import hbp_nrp_commons

import sys
python_version_major = sys.version_info.major

python3 = python_version_major > 2

__author__ = "Lorenzo Vannucci, Daniel Peppicelli, Georg Hinkel"
logger = logging.getLogger(__name__)


[docs]def fallback_retval(val): """ This function is intended to be used as a decorator for the methods calling CLE services. It intercepts a ROSCLEClientException during the execution of a ROSServiceWrapper and returns the desired value instead. :param val: the fallback return value for the decorated method """ def decorator(func): """ The decorator function returned once the wanted fallback return value is set :param func: the function to decorate. """ def f(*args, **kwargs): """ The function actually executed instead of the decorated one. """ try: return func(*args, **kwargs) except ROSCLEClientException: return val return f return decorator
[docs]class ROSCLEClientException(Exception): """ Exception within the CLE client """ def __init__(self, *args, data=None): super().__init__(*args) self.data = data
[docs]class ROSCLEServiceWrapper(object): """ Wraps the behaviour of a standard ROS service, throwing a detailed ROSCLEClientException in case of invalid client or ROS exceptions. """ ROS_SERVICE_TIMEOUT = 180 def __init__(self, service_name, service_class, ros_cle_client, invalidate_on_failure=True): """ :param service_name: the name of the ROS service. :param service_class: the class of the ROS service parameter. :param ros_cle_client: the ROSCLEClient instance creating the wrapper. :param invalidate_on_failure (default=False): a boolean value deciding whether the ROSCLEClient should be invalidated in case of failure (True) or not (False). """ self.__handler = None self.__ros_cle_client = ros_cle_client self.__invalidate_on_failure = invalidate_on_failure try: logger.info("Connecting to ROS service %s", service_name) self.__handler = rospy.ServiceProxy(service_name, service_class) self.__handler.wait_for_service(timeout=self.ROS_SERVICE_TIMEOUT) except rospy.ROSException: # According to the documentation, only a timeout will raise a generic 'ROSException'. # http://docs.ros.org/api/rospy/html/rospy-module.html#wait_for_service message = f"Timeout while connecting to the CLE (waiting on {service_name})." logger.error(message) raise ROSCLEClientException(message) def __call__(self, *args, **kwargs): try: return self.__handler(*args, **kwargs) except (rospy.ServiceException, rospy.exceptions.ROSInterruptException) as e: if self.invalidate_on_failure: self.__ros_cle_client.valid = False self.__ros_cle_client.invalid_reason = "a previous communication error" message = f"Error executing service \"{self.__handler.resolved_name}\"," \ f"unable to communicate with the CLE.\nError: {str(e)}" logger.error(message) raise ROSCLEClientException(message) @property def handler(self): """ Property getter for the __handler attribute """ return self.__handler @property def ros_cle_client(self): """ Property getter for the __ros_cle_client attribute """ return self.__ros_cle_client @property def invalidate_on_failure(self): """ Property getter for the __invalidate_on_failure attribute """ return self.__invalidate_on_failure
[docs]class ROSCLEClient(object): """ Client around the ROS controlled Closed Loop Engine. """ def __init__(self, sim_id): """ Create the wrapper client :param sim_id: The simulation id """ self.valid = True self.__cle_reset = ROSCLEServiceWrapper( SERVICE_SIM_RESET_ID(sim_id), srv.ResetSimulation, self) self.__cle_extend_timeout = ROSCLEServiceWrapper( SERVICE_SIM_EXTEND_TIMEOUT_ID(sim_id), srv.ExtendTimeout, self) self.__cle_get_transfer_functions = ROSCLEServiceWrapper( SERVICE_GET_TRANSFER_FUNCTIONS(sim_id), srv.GetTransferFunctions, self) self.__cle_add_transfer_function = ROSCLEServiceWrapper( SERVICE_ADD_TRANSFER_FUNCTION(sim_id), srv.AddTransferFunction, self) self.__cle_activate_transfer_function = ROSCLEServiceWrapper( SERVICE_ACTIVATE_TRANSFER_FUNCTION(sim_id), srv.ActivateTransferFunction, self) self.__cle_edit_transfer_function = ROSCLEServiceWrapper( SERVICE_EDIT_TRANSFER_FUNCTION(sim_id), srv.EditTransferFunction, self) self.__cle_convert_transfer_function_raw_to_structured = ROSCLEServiceWrapper( SERVICE_CONVERT_TRANSFER_FUNCTION_RAW_TO_STRUCTURED(sim_id), srv.ConvertTransferFunctionRawToStructured, self) self.__cle_delete_transfer_function = ROSCLEServiceWrapper( SERVICE_DELETE_TRANSFER_FUNCTION(sim_id), srv.DeleteTransferFunction, self) self.__cle_get_brain = ROSCLEServiceWrapper(SERVICE_GET_BRAIN(sim_id), srv.GetBrain, self) self.__cle_set_brain = ROSCLEServiceWrapper(SERVICE_SET_BRAIN(sim_id), srv.SetBrain, self) self.__cle_get_populations = ROSCLEServiceWrapper(SERVICE_GET_POPULATIONS(sim_id), srv.GetPopulations, self) self.__cle_set_populations = ROSCLEServiceWrapper(SERVICE_SET_POPULATIONS(sim_id), srv.SetPopulations, self) self.__cle_get_CSV_recorders_files = ROSCLEServiceWrapper( SERVICE_GET_CSV_RECORDERS_FILES(sim_id), srv.GetCSVRecordersFiles, self) self.__simulation_recorder = ROSCLEServiceWrapper( SERVICE_SIMULATION_RECORDER(sim_id), srv.SimulationRecorder, self) self.__cle_get_robots = ROSCLEServiceWrapper( SERVICE_GET_ROBOTS(sim_id), srv.GetRobots, self) self.__cle_add_robot = ROSCLEServiceWrapper(SERVICE_ADD_ROBOT(sim_id), srv.AddRobot, self) self.__cle_del_robot = ROSCLEServiceWrapper( SERVICE_DEL_ROBOT(sim_id), srv.DeleteRobot, self) self.__cle_set_robot_init_pose = ROSCLEServiceWrapper( SERVICE_SET_EXC_ROBOT_POSE(sim_id), srv.ChangePose, self) self.__cle_prepare_custom_model = ROSCLEServiceWrapper( SERVICE_PREPARE_CUSTOM_MODEL(sim_id), srv.Resource, self) self.__stop_reason = None
[docs] def stop_communication(self, reason): """ Tells the client to stop all communication to the simulation server because of the given reason :param reason: The reason why no more communication should be performed """ self.__stop_reason = reason
[docs] def reset(self, reset_type, world_sdf=None, brain_path=None, populations=None): """ Reset the simulation. :param reset_type: Denotes the kind of reset the user wants to perform, details about :param world_sdf: The world sdf :param brain_path: The brain path for the reset :param populations: To populations for the reset reset types and details are given in the ResetSimulation service request message. """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) # TODO: Uniform response from ROS CLE services so that this could be done directly # in the wrapper class # cancel any simulation recording in progress, discard all if this is a full reset # since the recorders/playback will break with simulator time reset self.__simulation_recorder(srv.SimulationRecorderRequest.CANCEL) if reset_type == srv.ResetSimulationRequest.RESET_FULL: self.__simulation_recorder(srv.SimulationRecorderRequest.RESET) if populations is not None: for pop in populations: _x = pop.name if python3 or isinstance(_x, str): _x = _x.encode('utf-8') else: _x = str(_x) pop.name = _x if pop.step <= 0: pop.step = 1 resp = self.__cle_reset(reset_type=reset_type, world_sdf=world_sdf if world_sdf is not None else "", brain_path=brain_path if brain_path is not None else "", populations=populations if populations is not None else []) if not resp.success: raise ROSCLEClientException(resp.error_message)
[docs] def get_simulation_brain(self): """ Get the brain of the running simulation :return: dict with brain_data, brain_type, brain_populations and data_type """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_get_brain()
[docs] def set_simulation_brain(self, brain_type='py', data=None, data_type='text', brain_populations=None): """ Set the brain of the running simulation (will pause the simulation) :param brain_type: Type of the brain file ('h5' or 'py') :param data: Contents of the brain file. Encoding given in field data_type :param data_type: Type of the data field ('text' or 'base64') :return: response of the cle """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_set_brain(brain_type, data_type, data, brain_populations)
ReplaceBehaviorEnum = hbp_nrp_commons.enum('ASK_USER', 'REPLACE', 'NO_REPLACE')
[docs] def set_simulation_populations(self, brain_type, brain_populations, data_type='text', change_population=ReplaceBehaviorEnum.REPLACE): """ Set the populations of the running simulation :param brain_type: Type of the brain file ('h5' or 'py') :param brain_populations: Contents of the brain file. Encoding given in field data_type :param data_type: Type of the data field ('text' or 'base64') :param change_population: a flag to select an action on population name change, currently possible values are: 0 = ReplaceBehaviorEnum.ASK_USER ask permission to replace 1 = ReplaceBehaviorEnum.REPLACE replace old name with a new one 2 = ReplaceBehaviorEnum.NO_REPLACE proceed with no replace action :return: response of the cle """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_set_populations(brain_type, brain_populations, data_type, change_population)
[docs] @fallback_retval(([], [])) def get_simulation_transfer_functions(self): """ Get the simulation transfer functions and their activation status :returns: A tuple of parallel arrays of: - strings containing the source code of the transfer functions - their activation status """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) response = self.__cle_get_transfer_functions() return response.transfer_functions, response.active
[docs] @fallback_retval(False) def delete_simulation_transfer_function(self, transfer_function_name): """ Delete the given transfer function :param transfer_function_name: Name of the transfer function to delete :return: True if the call to ROS is successful, False otherwise """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_delete_transfer_function(transfer_function_name).success
[docs] def edit_simulation_transfer_function(self, transfer_function_name, transfer_function_source): """ Edit the simulation transfer function's source code. :param transfer_function_name: Name of the transfer function to modify :param transfer_function_source: Source code of the transfer function :returns: "" if the call to ROS is successful, a string containing an error message otherwise """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_edit_transfer_function(transfer_function_name, transfer_function_source) \ .error_message
[docs] def add_simulation_transfer_function(self, transfer_function_source): """ Add a new simulation transfer function source code. :param transfer_function_source: Source code of the transfer function :returns: "" if the call to ROS is successful, a string containing an error message otherwise """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_add_transfer_function(transfer_function_source).error_message
[docs] def activate_simulation_transfer_function(self, transfer_function_name, activate_transfer_function): # pragma: no cover """ Set the activation status of a transfer function :param transfer_function_name: the name of the tf to be set :param activate_transfer_function: the desired new activation status :return: """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__cle_activate_transfer_function(transfer_function_name, activate_transfer_function).error_message
[docs] def convert_transfer_function_raw_to_structured(self, transfer_function): """ Convert raw tf to structured tf :param transfer_function: the transfer function in raw format :return: a strucuted TF """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) result = self.__cle_convert_transfer_function_raw_to_structured(transfer_function.name, transfer_function.source) return result
[docs] def get_populations(self): """ Gets the neurons of the brain in the simulation :return: populations as dict """ populations = self.__cle_get_populations() result = { 'populations': [ { 'name': p.name, 'neuron_model': p.neuron_model, 'parameters': ROSCLEClient.__convert_parameter_list(p.parameters), 'gids': p.gids, 'indices': p.indices } for p in populations.neurons ] } return result
@staticmethod def __convert_parameter_list(parameters): """ Gets the specified parameter list as dictionary :param parameters: A list of NeuronParameters :return: neuron parameters as list """ return [ { 'parameterName': p.parameterName, 'value': p.value } for p in parameters ]
[docs] @fallback_retval([]) def get_simulation_CSV_recorders_files(self): """ Get the file paths of simulation's CSV recorders :returns: An array of dictionaries {name: 'my_recorder_file.csv', temporary_path: 'my_recorder_file_path'} """ return self.__cle_get_CSV_recorders_files().files
[docs] def extend_simulation_timeout(self, timeout): """ Extend the simulation timeout :param timeout: new new simulation timeout :returns: "" if the call to ROS is successful """ return self.__cle_extend_timeout(str(timeout)).success
[docs] def command_simulation_recorder(self, command): """ Issue a request/command to the simulation recorder :param command: the command to issue, see SimulationRecorder.srv definition for types :return: boolean value/success and message if provided """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) return self.__simulation_recorder(command)
[docs] @fallback_retval([]) def get_simulation_robots(self): """ Get the robots in a simulation :returns: List of robots in the simulation """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) response = self.__cle_get_robots() return response.robots
[docs] @fallback_retval((False, "An error occurred while processing request.")) def add_simulation_robot(self, robot_id, robot_path, robot_model, is_custom=False, initial_pose=None): """ Add a new robot with the given information in the simulation :param robot_id: id of the robot to be added :param robot_path: name of the root folder :param robot_model: if custom, it says the name of the custom model template name if not custom, it is a relative path to the SDF or ZIP file :param is_custom: path to a custom robot (zip) :param initial_pose: initial pose of the robot :return: True if the call to ROS is successful, False otherwise """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) pose = None if initial_pose: pose = msg.Pose(x=initial_pose.get('x', 0), y=initial_pose.get('y', 0), z=initial_pose.get('z', 0), roll=initial_pose.get('roll', 0), pitch=initial_pose.get('pitch', 0), yaw=initial_pose.get('yaw', 0)) response = self.__cle_add_robot( msg.RobotInfo(robot_id=robot_id, robot_path=robot_path, robot_model=robot_model, is_custom=is_custom, pose=pose)) return response.success, response.error_message
[docs] @fallback_retval((False, "An error occurred while processing request.")) def delete_simulation_robot(self, robot_id): """ Deletes a robot from the simulation :param robot_id: id of the robot to be added :return: True if the call to ROS is successful, False otherwise """ if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) response = self.__cle_del_robot(robot_id) return response.success, response.error_message
[docs] @fallback_retval((False, "An error occurred while processing request.")) def set_simulation_robot_initial_pose(self, robot_id, new_pose): """ Update initial robot pose of a give robot :param robot_id: id of the robot to be added :param new_pose: new initial pose of the robot :return: True if the call to ROS is successful, False otherwise """ if not new_pose: return False, "New pose cannot be None" if self.__stop_reason is not None: raise ROSCLEClientException(self.__stop_reason) pose = msg.Pose(x=new_pose.get('x', 0), y=new_pose.get('y', 0), z=new_pose.get('z', 0), roll=new_pose.get('roll', 0), pitch=new_pose.get('pitch', 0), yaw=new_pose.get('yaw', 0)) response = self.__cle_set_robot_init_pose(robot_id, pose) return response.success, response.error_message
[docs] @fallback_retval((False, "An error occurred while processing request.")) def download_file_in_simdir(self, resource_type, file_rel_path): """ Download a file into the simulation directory :param resource_type: type of the file. robots, environments, or brain :param file_rel_path: relative path to the file :return: True if the call to ROS is successful, False otherwise """ response = self.__cle_prepare_custom_model(resource_type, file_rel_path, "download") return response.success, response.error_message