Source code for hbp_nrp_cleserver.server.GazeboSimulationRecorder

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
ROS wrapper for Gazebo Simulation Recorder plugin
"""

from builtins import object

from hbp_nrp_cleserver.server import SERVICE_SIMULATION_RECORDER

from cle_ros_msgs import srv
from std_srvs.srv import Trigger, TriggerResponse
from gazebo_ros_replay_plugins.srv import Start

import rospy
import logging

logger = logging.getLogger(__name__)


[docs]class GazeboSimulationRecorder(object): """ A ROS wrapper to convert internal NRP/REST commands into Gazebo simulator recorder plugin actions. """ def __init__(self, sim_id, record_ros_topics=None): """ Initialize ROS service for handling command requests and service proxies for issuing commands to the Gazebo plugin. """ # list of ros topics to be recorded self.__record_ros_topics = record_ros_topics if record_ros_topics else [] # internal NRP interface from ROS CLE Client / REST interface self.__service_simulation_recorder = rospy.Service( SERVICE_SIMULATION_RECORDER(sim_id), srv.SimulationRecorder, self.__command ) # interfaces to the recorder plugin self.__recorder_start = rospy.ServiceProxy('/gazebo/recording/start', Start) self.__recorder_stop = rospy.ServiceProxy('/gazebo/recording/stop', Trigger) self.__recorder_cancel = rospy.ServiceProxy('/gazebo/recording/cancel', Trigger) self.__recorder_cleanup = rospy.ServiceProxy('/gazebo/recording/cleanup', Trigger) self.__recorder_state = rospy.ServiceProxy('/gazebo/recording/get_recording', Trigger)
[docs] def shutdown(self): """ Shutdown the internal NRP ROS handler and issues a cleanup command to the Gazebo plugin. """ # shutdown the internal ROS handler and ignore any further user commands logger.info("Shutting down simulation recorder services") self.__service_simulation_recorder.shutdown() # perform the actual recorder shutdown logger.info("Shutting down simulation recorder") self.__recorder_cleanup()
def __command(self, req): """ ROS service callback, handle command requests and route them to the Gazebo plugin. :param req: The SimulationRecorder request, see definition for details. :return SimulationRecorderResponse: with success of command and any status/error message. """ # call the appropriate service based on the request type if req.request_type == srv.SimulationRecorderRequest.STATE: resp = self.__recorder_state() elif req.request_type == srv.SimulationRecorderRequest.START: resp = self.__recorder_start([str(x) for x in self.__record_ros_topics]) elif req.request_type == srv.SimulationRecorderRequest.STOP: resp = self.__recorder_stop() elif req.request_type == srv.SimulationRecorderRequest.CANCEL: resp = self.__recorder_cancel() # reset the recorder by discarding any saved files elif req.request_type == srv.SimulationRecorderRequest.RESET: resp = self.__recorder_cleanup() # invalid request type, notify caller of failure else: resp = TriggerResponse() resp.success = False resp.message = "Invalid Simulation Recorder command: %s" % str(req.request_type) # populate our internal response based on the actual call return srv.SimulationRecorderResponse(value=resp.success, message=resp.message)