Source code for pynrp.rosbridge_handler

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
An interface to handle rosbridge communication
"""

import roslibpy
import logging
from multiprocessing import Process, Pipe
from threading import Thread
import time

_CONNECTION_TIMEOUT = 10  # seconds before stop waiting for rosbridge connection


[docs]class RosBridgeConnectError(Exception): """ Type of error risen when there are connection issues """ pass
# TODO: add unit tests
[docs]class RosBridgeHandler(object): """ Provides a convenient interface to connect to RosBridge and handle properly connection error, disconnections, etc """ def __init__(self): """ Creates an instance of this class """ self._logger = logging.getLogger('RosBridgeHandler') self._is_init = False self._was_closed = False self._client = None self._topics = {} @property def is_init(self): """ Returns True if the handler has been initialized """ return self._is_init @is_init.setter def is_init(self, value): """ Setter method for is_init property :param value: boolean value for is_init property """ self._is_init = value
[docs] def initialize(self, host, port=None): """ Initializes the handler if it is not initialized yet and it was not initialized before. Creates a RosBridge client and connects it to the server. Raises RosBridgeConnectError in case of connection errors. :param host: string representing the ip of the RosBridge server :param port: (optional) integer corresponding to the RosBridge server port. None by default. """ # Check flags if self._is_init: self._logger.info('Rosbridge client already initialized') return elif self._was_closed: self._logger.info('Re-initializing a Rosbridge client is not allowed. it is not ' 'recommended to initialized any other client in the same process. ' 'Use class RosBridgeHandlerProcess if you need to do that') return has_failed = False # Connect to rosbridge server try: self._client = roslibpy.Ros(host=host, port=port) self._client.run(_CONNECTION_TIMEOUT) # pylint: disable=broad-except except Exception: has_failed = True else: if not self._client.is_connected: has_failed = True if has_failed: raise RosBridgeConnectError('Rosbridge client could not connect to server') self._is_init = True
[docs] def subscribe_topic(self, name, ros_type, callback): """ Subscribes a callback to a Ros topic :param name: string corresponding to the topic address :param ros_type: string corresponding to the topic type :param callback: function object to be called upon new message """ if not self._is_init: self._logger.info('Rosbridge client was not correctly initialized. Could not subscribe ' 'to: %s', name) return self._topics[name] = roslibpy.Topic(self._client, name, ros_type) self._topics[name].subscribe(callback)
[docs] def unsubscribe_topic(self, name): """ Unsubscribes a callback to a Ros topic :param name: string corresponding to the topic address """ if name in self._topics: self._topics[name].unsubscribe() else: self._logger.info('RosBridge client was not subscribed to: %s', name)
[docs] def close(self): """ Closes the RosBridge connection. After this the handler can not be initialized again. Furthermore, it is not recommended to initialized any other client in the same process. Use class RosBridgeHandlerProcess if you need to do that. """ if not self._is_init: self._logger.info('Rosbridge client was not correctly initialized. Cannot be closed') return for name in self._topics: self.unsubscribe_topic(name) self._client.close() self._was_closed = True self._is_init = False self._logger.info('RosBridge client was closed correctly')
# TODO: add unit tests
[docs]class RosBridgeHandlerProcess(RosBridgeHandler): """ Extension of RosBridgeHandler that runs the RosBridge client in a separate process. This is convenient when starting more than one client from the same process is required. RosBridgeHandler can give problems in this case due to limitations in roslibpy / twisted for handling re-connections and multiple connections. """ def __init__(self): """ Creates an instance of this class """ super().__init__() self._pipe = None self._rb_process = None self._rb_proxy = None self._in_process = False self._callbacks = {}
[docs] def initialize(self, host, port=None): """ Initializes the handler if it is not initialized yet and it was not initialized before. Creates a RosBridge client and connects it to the server. Raises RosBridgeConnectError in case of connection errors. :param host: string representing the ip of the RosBridge server :param port: (optional) integer corresponding to the RosBridge server port. None by default. """ if self.is_init: self._logger.info('Rosbridge client already initialized') return self._pipe = Pipe() # starts process with rosbridge client self._rb_process = Process(target=self._event_loop_process, args=(host, port,)) self._rb_process.start() # starts thread handling callbacks self._rb_proxy = Thread(target=self._event_loop_proxy) self._rb_proxy.start() # waits for initialization is completed n = 0 while not self.is_init and n < _CONNECTION_TIMEOUT: n += 1 time.sleep(1) if not self.is_init: raise RosBridgeConnectError('Rosbridge client could not connect to server')
def _event_loop_process(self, host, port=None): """ Method handling the communication with the RosBridge server from a separate process :param host: string representing the ip of the RosBridge server :param port: (optional) integer corresponding to the RosBridge server port. None by default. """ # everything is executed inside of the forked process from now on self._in_process = True self.is_init = False # start rosbridge client try: super().initialize(host, port) # pylint: disable=broad-except except Exception as e: self._logger.info('failure connecting to RosBridge server: %s', e) self._pipe[1].send(['error']) return else: self._pipe[1].send(['init']) # listen for proxy requests def make_callback(_msg): return lambda x: self._pipe[1].send([_msg[1], x]) try: while True: msg = self._pipe[1].recv() if msg[0] == 'close': self.close() self._pipe[1].send(['close']) break elif msg[0] == 'subscribe': self.subscribe_topic(msg[1], msg[2], make_callback(msg)) elif msg[0] == 'unsubscribe': self.unsubscribe_topic(msg[1]) # pylint: disable=broad-except except Exception as e: self._logger.info('Exception in RosBridge client process: %s', e) self._pipe[1].send(['error']) def _event_loop_proxy(self): """ Method handling the communication from the RosBridge client process in a separate thread """ while True: try: msg = self._pipe[0].recv() if msg[0] in self._callbacks: self._callbacks[msg[0]](msg[1]) elif msg[0] == 'init': self.is_init = True elif msg[0] == 'close': break elif msg[0] == 'error': self.is_init = False raise RosBridgeConnectError('Failure in RosBridge client process') except EOFError: self._logger.info('connection closed') break
[docs] def subscribe_topic(self, name, ros_type, callback): """ Subscribes a callback to a Ros topic :param name: string corresponding to the topic address :param ros_type: string corresponding to the topic type :param callback: function object to be called upon new message """ if not self._is_init: self._logger.info('Rosbridge client was not correctly initialized. Could not subscribe ' 'to: %s', name) return if self._in_process: super().subscribe_topic(name, ros_type, callback) else: # tell process to subscribe to topic self._callbacks[name] = callback self._pipe[0].send(['subscribe', name, ros_type])
[docs] def unsubscribe_topic(self, name): """ Unsubscribes a callback to a Ros topic :param name: string corresponding to the topic address """ if not self._is_init: self._logger.info('Rosbridge client was not correctly initialized. Could not ' 'unsubscribe from: %s', name) return if self._in_process: super().unsubscribe_topic(name) else: # tell process to unsubscribe from topic self._pipe[0].send(['unsubscribe', name])
[docs] def close(self): """ Closes the RosBridge connection. After this the handler can not be initialized again. Furthermore, it is not recommended to initialized any other client in the same process. Use class RosBridgeHandlerProcess if you need to do that. """ if not self._is_init: self._logger.info('Rosbridge client was not correctly initialized. Cannot be closed') return if self._in_process: super().close() else: # tell process to shutdown self._pipe[0].send(['close']) self._callbacks = {} self.is_init = False