Source code for hbp_nrp_cle.robotsim.RosControlAdapter

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
#!/usr/bin/env python
"""Implementation of the robot control adapter using ros and gazebo"""
from __future__ import division
from past.utils import old_div

from hbp_nrp_cle.robotsim.AsynchronousServiceProxy import AsynchonousRospyServiceProxy
from hbp_nrp_cle.robotsim.GazeboHelper import GazeboHelper
from hbp_nrp_cle.robotsim.RobotInterface import IRobotControlAdapter
import rospy
import math
# pylint: disable=E0611
from gazebo_msgs.srv import GetPhysicsProperties, GetWorldProperties, \
    SetPhysicsProperties, AdvanceSimulation
from std_srvs.srv import Empty
from geometry_msgs.msg import Point, Pose, Quaternion
import logging


logger = logging.getLogger(__name__)
# Info messages sent to this logger will be forwarded as notifications
user_notifications_logger = logging.getLogger('hbp_nrp_cle.user_notifications')

__author__ = 'NinoCauli'


[docs]class RosControlAdapter(IRobotControlAdapter): """ Represents a robot simulation adapter actually using ROS """ def __init__(self): self.__persistent_services = [] rospy.wait_for_service('/gazebo/get_physics_properties') self.__get_physics_properties = rospy.ServiceProxy( '/gazebo/get_physics_properties', GetPhysicsProperties, persistent=True) self.__persistent_services.append(self.__get_physics_properties) rospy.wait_for_service('/gazebo/get_world_properties') self.__get_world_properties = rospy.ServiceProxy( '/gazebo/get_world_properties', GetWorldProperties, persistent=True) self.__persistent_services.append(self.__get_world_properties) rospy.wait_for_service('/gazebo/set_physics_properties') self.__set_physics_properties = rospy.ServiceProxy( '/gazebo/set_physics_properties', SetPhysicsProperties, persistent=True) self.__persistent_services.append(self.__set_physics_properties) rospy.wait_for_service('/gazebo/pause_physics') self.__pause_client = rospy.ServiceProxy('/gazebo/pause_physics', Empty, persistent=True) self.__persistent_services.append(self.__pause_client) rospy.wait_for_service('/gazebo/reset_sim') self.__reset = rospy.ServiceProxy('/gazebo/reset_sim', Empty, persistent=True) self.__persistent_services.append(self.__reset) rospy.wait_for_service('/gazebo/end_world') self.__endWorld = rospy.ServiceProxy('/gazebo/end_world', Empty, persistent=True) self.__persistent_services.append(self.__endWorld) rospy.wait_for_service('/gazebo/advance_simulation') self.__advance_simulation = AsynchonousRospyServiceProxy( '/gazebo/advance_simulation', AdvanceSimulation, persistent=True) self.__persistent_services.append(self.__advance_simulation) self.__time_step = 0.0 self.__robots = {} self.__is_initialized = False self.gazebo_helper = GazeboHelper()
[docs] def initialize(self): """ Initializes the world simulation control adapter """ if not self.__is_initialized: physics = self.__get_physics_properties() paused = physics.pause if not paused: self.__pause_client() self.__time_step = physics.time_step self.__is_initialized = True logger.info("Robot control adapter initialized") return self.__is_initialized
@property def time_step(self): """ Gets the physics simulation time step in seconds :param dt: The physics simulation time step in seconds :return: The physics simulation time step in seconds """ return self.__time_step
[docs] def set_time_step(self, time_step): """ Sets the physics simulation time step in seconds :param time_step: The physics simulation time step in seconds :return: True, if the physics simulation time step is updated, otherwise False """ physics = self.__get_physics_properties() success = self.__set_physics_properties( time_step, physics.max_update_rate, physics.gravity, physics.ode_config) if success: self.__time_step = time_step logger.info("new time step = %f", self.__time_step) else: logger.warn("impossible to set the new time step") return success
@property def is_paused(self): """ Queries the current status of the physics simulation :return: True, if the physics simulation is paused, otherwise False """ physics = self.__get_physics_properties() paused = physics.pause return paused @property def is_alive(self): """ Queries the current status of the world simulation :return: True, if the world simulation is alive, otherwise False """ logger.debug("Getting the world properties to check if we are alive") world = self.__get_world_properties() success = world.success return success
[docs] def run_step_async(self, dt): """ Runs the world simulation for the given CLE time step in seconds :param dt: The CLE time step in seconds """ # implement modulo to avoid floating point precision quirks in math.fmod or from direct % # without this explicit fix, values such as 0.999 % 0.001 != 0.0 due to float accuracy # https://en.wikipedia.org/wiki/Modulo_operation : a - (n * int(a/n)) r = dt - (self.__time_step * math.floor(old_div(dt, self.__time_step))) if r == 0.0: steps = int(old_div(dt, self.__time_step)) logger.debug("Advancing simulation") return self.__advance_simulation(steps) else: logger.error("dt is not multiple of the physics time step") raise ValueError("dt is not multiple of the physics time step")
[docs] def run_step(self, dt): """ Runs the world simulation for the given CLE time step in seconds :param dt: The CLE time step in seconds """ return self.run_step_async(dt).result()
[docs] def shutdown(self): """ Shuts down the world simulation """ logger.info("Shutting down the world simulation") for service in self.__persistent_services: service.close() logger.info("Robot control adapter stopped")
# Do not call endWorld here, it makes Gazebo Stop !
[docs] def reset(self): """ Resets the physics simulation """ logger.info("Resetting the world simulation") self.__reset()
[docs] def set_robots(self, robots): """ Sets the list of robots """ self.__robots = robots
[docs] def reset_world(self, models, lights): """ Resets the world (robot excluded) to the state described by models and lights :param models: A dictionary containing pairs `(model_name: {'model_sdf': sdf, 'model_state_sdf': sdf})`. :param lights: A dictionary containing pairs (light_name: light sdf). """ logger.debug("Resetting the Environment") # MODELS # get the list of models' name currently the sim from gazebo world_properties = self.__get_world_properties() # robot model doesn't belong to the environment, so discard them from the active set # we assume that the robot name contains the 'robot' substring active_model_set = \ {m_name for m_name in world_properties.model_names if m_name not in self.__robots} original_model_set = frozenset(models.keys()) logger.debug("active_model_set: %s", active_model_set) logger.debug("original_model_set: %s", original_model_set) # LIGHTS # get from gazebo the name of the lights in the scene world_lights = self.gazebo_helper.get_lights_name_proxy() # filter out sun from the world lights active_lights_set = \ {l_name for l_name in world_lights.light_names if 'sun' not in l_name} original_lights_set = frozenset(lights) logger.debug("active_lights_set: %s", active_lights_set) logger.debug("original_lights_set: %s", original_lights_set) # delete LIGHTS for light_name in active_lights_set: user_notifications_logger.info("Deleting: %s", light_name) self.gazebo_helper.delete_light_proxy(light_name) # delete MODELS for model_name in active_model_set: user_notifications_logger.info("Deleting: %s", model_name) # delete_model temporarily disabled NRRPLT-7855 # self.gazebo_helper.delete_model_proxy(model_name) initial_pose = Pose() initial_pose.position = Point(0, 0, 0) initial_pose.orientation = Quaternion(0, 0, 0, 1) # respawn LIGHTS for light_name in original_lights_set: user_notifications_logger.info("Loading: %s", light_name) self.gazebo_helper.spawn_entity_proxy(light_name, lights[light_name], "", initial_pose, "") # respawn MODELS for model_name in original_model_set: user_notifications_logger.info("Loading: %s", model_name) self.gazebo_helper.spawn_entity_proxy(model_name, models[model_name]['model_sdf'], "", initial_pose, "")