# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module represents the interfaces for the robot (world) simulation both in terms of data
exchange and control of the simulation
"""
from builtins import object
__author__ = 'GeorgHinkel'
# pylint: disable=W0613
[docs]class Service(object):
"""
Represents a reference to a robot service
"""
def __init__(self, name, service_type): # -> None:
"""
Create a new robot type reference
:param name: the name of the service
:param service_type: the type of the service
"""
self.__name = name
self.__type = service_type
@property
def name(self): # -> str:
"""
Gets the name of the service
"""
return self.__name
@property
def service_type(self): # -> type:
"""
Gets the type of the service
"""
return self.__type
def __repr__(self): # pragma: no cover
return self.__name + " : " + self.__type.__name__
[docs]class Topic(object):
"""
Represents a reference to a robot topic
"""
def __init__(self, name, topic_type): # -> None:
"""
Create a new robot type reference
:param name: the name of the topic
:param topic_type: the type of the topic
"""
self.__name = name
self.__type = topic_type
@property
def name(self): # -> str:
"""
Gets the name of the topic
"""
return self.__name
@property
def topic_type(self): # -> type:
"""
Gets the type of the topic
"""
return self.__type
def __repr__(self): # pragma: no cover
return self.__name + " : " + self.__type.__name__
[docs]class PreprocessedTopic(Topic):
"""
Represents a topic where pre-processing is applied to
"""
def __init__(self, name, topic_type, pre_processing):
"""
Creates a new pre-processing topic
:param name: The name of the topic
:param topic_type: The topic type
:param pre_processing: The pre-processing function
"""
super(PreprocessedTopic, self).__init__(name, topic_type)
self.__pre_processing = pre_processing
@property
def pre_processor(self): # -> func
"""
Gets the pre_processor
"""
return self.__pre_processing
def _unregister(self):
"""
INTERNAL USE ONLY: this should never be directly invoked by a user.
Unregister the Topic. After this call, nobody can publish or receive messages anymore.
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs]class IRobotPublishedTopic(object): # pragma: no cover
"""
Represents a communication object for a published robot topic
"""
[docs] def send_message(self, value): # -> None:
"""
Send a message to the robot topic represented by this instance
:param value: The message to be sent to the robot
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def reset(self, transfer_function_manager):
"""
Resets the published topic
:param transfer_function_manager: The transfer function manager the publisher belongs to
:return: The reset adapter
"""
return self
def _unregister(self):
"""
INTERNAL USE ONLY: this should never be directly invoked by a user.
Unregister the Topic. After this call, nobody can publish.
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs]class IRobotSubscribedTopic(object): # pragma: no cover
"""
Represents a communication object for a subscribed robot topic
"""
@property
def changed(self): # -> bool:
"""
Gets a value indicating whether the value of the subscribed topic has changed since the last
time step
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
@property
def value(self): # -> object:
"""
Gets the current value of the subscribed topic
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def reset(self, transfer_function_manager):
"""
Resets the subscribed topic
:param transfer_function_manager: The transfer function manager the subscriber belongs to
:return: The reset adapter
"""
return self
def _unregister(self):
"""
INTERNAL USE ONLY: this should never be directly invoked by a user.
Unregister the Topic. After this call, nobody can receive messages anymore.
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs]class IRobotServiceProxy(object): # pragma: no cover
"""
Represents a communication object for a service proxy
"""
[docs] def set_request_args(self, *args, **kwargs):
"""
Set the request call arguments
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def update_value(self, *args, **kwargs):
"""
Call service and update response value
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
@property
def value(self): # -> object:
"""
Gets the current value of the service response
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def reset(self, transfer_function_manager):
"""
Resets the service proxy
:param transfer_function_manager: The transfer function manager the service proxy belongs to
:return: The reset adapter
"""
return self
def _unregister(self):
"""
INTERNAL USE ONLY: this should never be directly invoked by a user.
Unregister the Service. After this call, nobody can use the service proxy anymore.
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs]class IRobotCommunicationAdapter(object): # pragma: no cover
"""
Represents the communication adapter to the robot
"""
def __init__(self): # -> None:
self.__published_topics = []
self.__subscribed_topics = []
self.__service_proxies = []
@property
def published_topics(self): # -> list:
"""
Gets the published topics for the robot communication adapter
:return: A hash table of the communication adapters published topics
"""
return self.__published_topics
@property
def subscribed_topics(self): # -> list:
"""
Gets the subscribed topics for the robot communication adapter
:return: A hash table of the communication adapters subscribed topics
"""
return self.__subscribed_topics
@property
def service_proxies(self): # -> list:
"""
Gets the service proxies for the robot communication adapter
:return: A hash table of the communication adapters service responses
"""
return self.__service_proxies
[docs] def register_subscribe_topic(self, topic, **kwargs): # -> IRobotSubscribedTopic:
"""
Requests a subscription object for the given topic
:param topic: The topic that should be subscribed
:param kwargs: Additional configuration parameters
:return: A subscription object that holds the current data
"""
subscriber = self.create_topic_subscriber(topic, **kwargs)
self.__subscribed_topics.append(subscriber)
return subscriber
[docs] def unregister_subscribe_topic(self, topic):
"""
Unregisters and removes the given subscriber topic object.
:param topic: The IRobotSubscribedTopic to unregister.
"""
topic._unregister() # pylint: disable=protected-access
if topic in self.__subscribed_topics:
self.__subscribed_topics.remove(topic)
[docs] def register_publish_topic(self, topic, **kwargs): # -> IRobotPublishedTopic:
"""
Requests a publisher object for the given topic
:param topic: The topic for which to create a publisher
:param kwargs: Additional configuration parameters
:return: A publisher communication object
"""
publisher = self.create_topic_publisher(topic, **kwargs)
self.__published_topics.append(publisher)
return publisher
[docs] def unregister_publish_topic(self, topic):
"""
Unregisters and removes the given publisher topic object.
:param topic: The IRobotPublishedTopic to unregister.
"""
topic._unregister() # pylint: disable=protected-access
if topic in self.__published_topics:
self.__published_topics.remove(topic)
[docs] def register_service_proxy(self, service, **kwargs): # -> IRobotServiceProxy:
"""
Requests a proxy object for the given service
:param service: The service to which a proxy should be established
:param kwargs: Additional configuration parameters
:return: A responder object that holds the current data
"""
service_proxy = self.create_service_proxy(service, **kwargs)
self.__service_proxies.append(service_proxy)
return service_proxy
[docs] def unregister_service_proxies(self, service_proxy):
"""
Unregisters and removes the given service proxy object.
:param service_proxy: The IRobotServiceProxy to unregister.
"""
service_proxy._unregister() # pylint: disable=protected-access
if service_proxy in self.__service_proxies:
self.__service_proxies.remove(service_proxy)
[docs] def create_topic_subscriber(self, topic, **config): # -> IRobotSubscribedTopic:
"""
Creates the subscription object for the given topic
:param topic: The topic
:param config: Additional configuration for the subscriber
:return: A subscription object
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def create_topic_publisher(self, topic, **config): # -> IRobotPublishedTopic:
"""
Creates a publisher object for the given topic
:param topic: The topic
:param config: Additional configuration for the publisher
:return: A publisher object
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def create_service_proxy(self, service, **config): # -> IRobotServiceProxy:
"""
Creates a service proxy object for the given service
:param service: The Service
:param config: Additional configuration for the service proxy
:return: A service proxy object
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def initialize(self, name): # -> None:
"""
Initializes the robot adapter
:param name: The name of the node
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def refresh_buffers(self, t): # -> None:
"""
Refreshes the subscribed topic buffers for the given simulation time
:param t: The simulation time
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def shutdown(self):
"""
Closes any connections created by the adapter
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs]class IRobotControlAdapter(object): # pragma: no cover
"""
Represents a control adapter for the world simulation
"""
@property
def is_alive(self): # -> bool:
"""
Queries the current status of the world simulation
:return: True, if the world simulation is alive, otherwise False
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
@property
def is_paused(self): # -> bool:
"""
Queries the current status of the physics simulation
:return: True, if the physics simulation is paused, otherwise False
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def initialize(self): # -> None:
"""
Initializes the world simulation control adapter
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def run_step(self, dt): # -> float64:
"""
Runs the world simulation for the given CLE time step in seconds
:param dt: The CLE time step in seconds
:return: Updated simulation time, otherwise -1
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def run_step_async(self, dt): # -> Future:
"""
Runs the world simulation for the given CLE time step in seconds in an asynchronous manner.
:param dt: The CLE time step in seconds
:return: a Future for the result or potential exceptions of the execution
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def shutdown(self): # -> None:
"""
Shuts down the world simulation
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
@property
def time_step(self): # -> float64:
"""
Gets the physics simulation time step in seconds
:param dt: The physics simulation time step in seconds
:return: The physics simulation time step in seconds
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def set_time_step(self, time_step): # -> Bool:
"""
Sets the physics simulation time step in seconds
:param dt: The physics simulation time step in seconds
:return: True, if the physics simulation time step is updated, otherwise False
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def set_robots(self, robots):
"""
Sets the list of robots
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def reset(self): # -> None:
"""
Resets the physics simulation
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")
[docs] def reset_world(self, models, lights): # -> None:
"""
Resets the world excluding the robot
:param models: A dictionary containing pairs model_name: model_sdf.
:param lights: A dictionary containing pairs light_name: light sdf.
"""
raise NotImplementedError("This method was not implemented in the concrete implementation")