# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Represents an implementation of the robot communication adapter actually
using ROS
"""
from __future__ import division
from hbp_nrp_cle.robotsim.RobotInterface import IRobotCommunicationAdapter, \
Topic, PreprocessedTopic, IRobotSubscribedTopic, IRobotPublishedTopic, \
Service, IRobotServiceProxy
from hbp_nrp_cle.tf_framework._TransferFunctionManager import TransferFunctionManager
import rospy
import rosservice
import logging
import rosgraph.masterapi as master
logger = logging.getLogger(__name__)
__author__ = 'GeorgHinkel'
# Receive buffer size for subscriber topics (in bytes). Same as ROS default.
DEFAULT_SUB_BUFFER_SIZE = 65536
# Queue size for subscriber topics.
# ROS docs suggest either 1 or None, the latter denoting an infinite queue.
DEFAULT_SUB_QUEUE_SIZE = 1
# Queue size for publisher topics.
DEFAULT_PUB_QUEUE_SIZE = 10
[docs]class RosCommunicationAdapter(IRobotCommunicationAdapter):
"""
Represents a robot communication adapter actually using ROS
"""
[docs] def get_topic_type(self, topic_name, update_when_miss=True):
"""
Gets the type for the given ROS topic
:param topic_name: The name of the topic
:param update_when_miss: If set, the function will try to update the cached topic
types if the topic cannot be found
"""
for name, type_str in self.__topic_types:
if name == topic_name:
t = "?"
mod = "?"
try:
mod, t = type_str.split('/')
return getattr(__import__(mod).msg, t)
except AttributeError:
raise Exception("The type {0} is not present in package {1}".format(t, mod))
except ValueError:
raise Exception("The format of the topic type {0} is not supported"
.format(type_str))
if update_when_miss:
self.__refresh_topic_types()
return self.get_topic_type(topic_name, False)
return None
def __init__(self):
"""
Create a new RosCommunicationAdapter
"""
IRobotCommunicationAdapter.__init__(self)
self.__topic_types = []
self.__refresh_topic_types()
def __refresh_topic_types(self):
"""
Updates the topic types
"""
m = master.Master('masterapi')
self.__topic_types = m.getTopicTypes()
[docs] def initialize(self, name):
"""
Initializes this robot communication adapter
:param name: The name of this node
"""
try:
rospy.init_node(name)
logger.info("Robot communication adapter initialized")
except rospy.exceptions.ROSException:
logger.warn("ROS node already initialized with another name")
[docs] def create_topic_publisher(self, topic, **config):
"""
Creates a publisher object for the given topic
:param topic: The topic
:param config: Additional configuration for the publisher
:param queue_size: ROS Publisher queue_size parameter, please refer to ROS documentation
:return: A publisher object
"""
if isinstance(topic, PreprocessedTopic):
return RosPublishedPreprocessedTopic(topic, **config)
elif isinstance(topic, str):
topic_type = self.get_topic_type(topic)
if topic_type is None:
raise Exception("The type of topic {0} is unknown. Please specify it explicitly"
.format(topic))
topic = Topic(topic, topic_type)
return RosPublishedTopic(topic, **config)
[docs] def create_topic_subscriber(self, topic, **config):
"""
Creates the subscription object for the given topic
:param topic: The topic
:param config: Additional configuration for the subscriber
:param queue_size: ROS Subscriber queue_size parameter, please refer to ROS documentation
:param buff_size: ROS Subscriber buff_size parameter, please refer to ROS documentation
:return: A subscription object
"""
if isinstance(topic, PreprocessedTopic):
return RosSubscribedPreprocessedTopic(topic,
config.get('initial_value', None),
**config)
elif isinstance(topic, str):
topic_type = self.get_topic_type(topic)
if topic_type is None:
raise Exception("The type of topic {0} is unknown. Please specify it explicitly"
.format(topic))
topic = Topic(topic, topic_type)
return RosSubscribedTopic(topic,
config.get('initial_value', None),
**config)
[docs] def create_service_proxy(self, service, **config):
"""
Creates a service proxy object for the given service
:param service: The service
:param config: Additional configuration for the service proxy
:return: A service proxy object
"""
return RosServiceProxy(service, **config)
@property
def is_alive(self): # pylint: disable=R0201
"""
Gets a value indicating whether the robot simulation is still alive
"""
return not rospy.is_shutdown()
[docs] def refresh_buffers(self, t):
"""
Resets the changed bit for all subscribers
:param t: The world simulation time in milliseconds
"""
for subscriber in self.subscribed_topics:
subscriber.reset_changed()
[docs] def shutdown(self):
"""
Closes any connections created by the adapter
"""
for publisher in self.published_topics:
publisher._unregister() # pylint: disable=protected-access
for subscriber in self.subscribed_topics:
subscriber._unregister() # pylint: disable=protected-access
[docs]class RosPublishedTopic(IRobotPublishedTopic):
"""
Represents a robot topic publisher actually using ROS
"""
def __init__(self, topic, **config):
"""
Creates a new robot topic publisher
:param topic: The topic where data should be sent to
:param config: Additional configuration for the publisher
:param **queue_size: ROS Publisher queue_size parameter, please refer to ROS documentation
"""
self.__lastSent = None
assert isinstance(topic, Topic)
logger.info("ROS publisher created: topic name = %s, topic type = %s",
topic.name, topic.topic_type)
self.__pub = rospy.Publisher(topic.name, topic.topic_type,
queue_size=config.get('queue_size', DEFAULT_PUB_QUEUE_SIZE))
[docs] def send_message(self, value):
"""
Sends a message
:param value: The message to be sent (the type must match the topic type)
"""
import struct
def test_float(val):
"""
tests if it can be packaged as float. This operation is performed by ROS
Publishers before publishing
:param val: input float number
"""
if isinstance(val, float):
try:
struct.pack('<f', val)
except OverflowError:
return False
return True
try:
state = []
for v in value.__getstate__():
# replaces v by maximum float which can be packaged. More info here:
# https://mail.python.org/pipermail/python-list/2000-March/058582.html
state += [v if test_float(v) else 3.4e38]
value.__setstate__(state)
except AttributeError:
value = value if test_float(value) else 3.4e38
# if value != self.__lastSent:
if self.__pub is not None:
self.__pub.publish(value)
self.__lastSent = value
logger.debug("ROS message published: topic value = %s",
value)
else:
logger.error("Trying to publish messages on an unregistered topic")
def _unregister(self):
"""
Unregister the Topic. After this call, nobody can publish
anymore.
"""
if self.__pub:
self.__pub.unregister()
self.__pub = None
[docs]class RosPublishedPreprocessedTopic(RosPublishedTopic):
"""
Represents a robot topic publisher actually using ROS
"""
def __init__(self, topic, **config):
"""
Creates a new robot topic publisher
:param topic: The topic where data should be sent to
:param config: Additional configuration for the subscriber
"""
super(RosPublishedPreprocessedTopic, self).__init__(topic, **config)
assert isinstance(topic, PreprocessedTopic)
self.__pre_processor = topic.pre_processor
[docs] def send_message(self, value):
"""
Sends a message
:param value: The message to be sent
"""
to_send = self.__pre_processor(value)
super(RosPublishedPreprocessedTopic, self).send_message(to_send)
[docs]class RosSubscribedTopic(IRobotSubscribedTopic):
"""
Represents a robot topic subscriber actually using ROS
"""
def __init__(self, topic, initial_value, **config):
"""
Initializes a new subscriber for the given topic
:param topic: The topic that is subscribed
:param initial_value: The initial value for the subscriber
:param config: Additional configuration for the subscriber
:param **queue_size: ROS Subscriber queue_size parameter, please refer to ROS documentation
:param **buff_size: ROS Subscriber buff_size parameter, please refer to ROS documentation
"""
self.__changed = False
self.__value = initial_value
self.__tfs = []
assert isinstance(topic, Topic)
self.__subscriber = rospy.Subscriber(topic.name, topic.topic_type,
self._callback,
queue_size=config.get('queue_size',
DEFAULT_SUB_QUEUE_SIZE),
buff_size=config.get('buff_size',
DEFAULT_SUB_BUFFER_SIZE))
logger.info("ROS subscriber created: topic name = %s, topic type = %s",
topic.name, topic.topic_type)
[docs] def register_tf_trigger(self, tf):
"""
Registers to trigger the provided TF in case a new value appears
:param tf: The transfer function
"""
if tf not in self.__tfs:
self.__tfs.append(tf)
def _callback(self, data):
"""
This method is called whenever new data is available from ROS
:param data: The incoming data on this topic
"""
logger.debug("ROS subscriber callback")
self.__changed = True
self.__value = data
t = rospy.get_time()
for tf in self.__tfs:
TransferFunctionManager.run_tf(tf, t)
@property
def changed(self):
"""
Indicates whether the current value of this subscriber has changed
since the last iteration
"""
return self.__changed
[docs] def reset_changed(self):
"""
Resets the changed status of the current subscriber
"""
self.__changed = False
@property
def value(self):
"""
Gets the last value received by this ROS subscribed topic
"""
return self.__value
[docs] def reset(self, transfer_function_manager):
"""
Gets a reset subscriber
:param transfer_function_manager: The transfer function manager in which the subscribed
topic is contained
"""
self.reset_changed()
self.__value = None
return self
def _unregister(self):
"""
Detaches the subscribed topic from ROS
"""
if self.__subscriber:
self.__subscriber.unregister()
self.__subscriber = None
[docs]class RosSubscribedPreprocessedTopic(RosSubscribedTopic):
"""
Represents a robot topic subscriber with a preprocessor
"""
def __init__(self, topic, initial_value, **config):
"""
Creates a new preprocessing topic subscriber
:param topic: The topic that is subscribed
:param initial_value: The initial value for the subscriber
:param config: Additional configuration for the subscriber
"""
super(RosSubscribedPreprocessedTopic, self).__init__(topic, initial_value, **config)
assert isinstance(topic, PreprocessedTopic)
self.__pre_processor = topic.pre_processor
def _callback(self, data):
"""
This method is called whenever new data is available from ROS
:param data: The incoming data
"""
pre_processed = self.__pre_processor(data)
super(RosSubscribedPreprocessedTopic, self)._callback(pre_processed)
[docs]class RosServiceProxy(IRobotServiceProxy):
"""
Represents a robot service proxy actually using ROS
"""
def __init__(self, service, initial_response=None, persistent=False, headers=None):
"""
Initializes a new service proxy for the given service
:param service: The service that is requested
:param initial_response: The initial value of the service proxy response
:param persistent: Persistence of Service.
Look at rospy.ServiceProxy documentation for details
:param headers: Service headers.
Look at rospy.ServiceProxy documentation for details
"""
self.__response_value = initial_response
self.__request_args = []
self.__request_kwargs = {}
assert isinstance(service, Service)
services = rosservice.get_service_list()
if service.name not in services:
raise Exception("Service {} not available".format(service.name))
self.__service_proxy = rospy.ServiceProxy(service.name, service.service_type,
persistent=persistent, headers=headers)
logger.info("ROS service responder created: service name = %s, service type = %s",
service.name, service.service_type)
def __call__(self, *args, **kwargs):
"""Calling the RosServiceProxy returns the service response"""
if args or kwargs:
self.set_request_args(*args, **kwargs)
return self.update_value()
[docs] def set_request_args(self, *args, **kwargs):
"""
Update service proxy request arguments
"""
self.__request_args = args
self.__request_kwargs = kwargs
[docs] def update_value(self, *args, **kwargs):
"""
Update service response value
:return: Returns Service Response
"""
if args or kwargs:
self.set_request_args(*args, **kwargs)
if self.__service_proxy is not None:
self.__response_value = \
self.__service_proxy.call(*self.__request_args, **self.__request_kwargs)
return self.__response_value
else:
logger.error("Trying to send request on an unavailable service")
return None
@property
def value(self):
"""
Gets the last value received by this ROS subscribed service
"""
return self.__response_value
[docs] def reset(self, transfer_function_manager):
"""
Gets a reset subscriber
:param transfer_function_manager: The transfer function manager in which the subscribed
service is contained
"""
self.__response_value = None
return self
def _unregister(self):
"""
Detaches the service
"""
if self.__service_proxy is not None:
self.__service_proxy.close()
self.__service_proxy = None