ROS wrapper for Gazebo Simulation Recorder plugin

from builtins import object

from hbp_nrp_cleserver.server import SERVICE_SIMULATION_RECORDER

from cle_ros_msgs import srv
from std_srvs.srv import Trigger, TriggerResponse
from gazebo_ros_replay_plugins.srv import Start

import rospy
import logging

logger = logging.getLogger(__name__)

[docs]class GazeboSimulationRecorder(object): """ A ROS wrapper to convert internal NRP/REST commands into Gazebo simulator recorder plugin actions. """ def __init__(self, sim_id, record_ros_topics=None): """ Initialize ROS service for handling command requests and service proxies for issuing commands to the Gazebo plugin. """ # list of ros topics to be recorded self.__record_ros_topics = record_ros_topics if record_ros_topics else [] # internal NRP interface from ROS CLE Client / REST interface self.__service_simulation_recorder = rospy.Service( SERVICE_SIMULATION_RECORDER(sim_id), srv.SimulationRecorder, self.__command ) # interfaces to the recorder plugin self.__recorder_start = rospy.ServiceProxy('/gazebo/recording/start', Start) self.__recorder_stop = rospy.ServiceProxy('/gazebo/recording/stop', Trigger) self.__recorder_cancel = rospy.ServiceProxy('/gazebo/recording/cancel', Trigger) self.__recorder_cleanup = rospy.ServiceProxy('/gazebo/recording/cleanup', Trigger) self.__recorder_state = rospy.ServiceProxy('/gazebo/recording/get_recording', Trigger)
[docs] def shutdown(self): """ Shutdown the internal NRP ROS handler and issues a cleanup command to the Gazebo plugin. """ # shutdown the internal ROS handler and ignore any further user commands"Shutting down simulation recorder services") self.__service_simulation_recorder.shutdown() # perform the actual recorder shutdown"Shutting down simulation recorder") self.__recorder_cleanup()
def __command(self, req): """ ROS service callback, handle command requests and route them to the Gazebo plugin. :param req: The SimulationRecorder request, see definition for details. :return SimulationRecorderResponse: with success of command and any status/error message. """ # call the appropriate service based on the request type if req.request_type == srv.SimulationRecorderRequest.STATE: resp = self.__recorder_state() elif req.request_type == srv.SimulationRecorderRequest.START: resp = self.__recorder_start([str(x) for x in self.__record_ros_topics]) elif req.request_type == srv.SimulationRecorderRequest.STOP: resp = self.__recorder_stop() elif req.request_type == srv.SimulationRecorderRequest.CANCEL: resp = self.__recorder_cancel() # reset the recorder by discarding any saved files elif req.request_type == srv.SimulationRecorderRequest.RESET: resp = self.__recorder_cleanup() # invalid request type, notify caller of failure else: resp = TriggerResponse() resp.success = False resp.message = "Invalid Simulation Recorder command: %s" % str(req.request_type) # populate our internal response based on the actual call return srv.SimulationRecorderResponse(value=resp.success, message=resp.message)