Source code for hbp_nrp_cle.externalsim.ExternalModule

"""
Includes the corresponding CLE class for external ROS modules.
"""
from builtins import object

__author__ = 'Omer Yilmaz'

import logging
import rospy
from iba_manager.srv import Initialize, RunStep, Shutdown
from hbp_nrp_cle.robotsim.GazeboHelper import TIMEOUT

logger = logging.getLogger('hbp_nrp_cle')


[docs]class ExternalModule(object): """ External ROS modules have initialize, run_step and shutdown methods. This class has the corresponding initialize, run_step and shutdown methods which triggers the external ones through ROS service proxies. Objects of this class is synchronized with the Deterministic Closed Loop Engine via ExternalModuleManager. """ def __init__(self, module_name): self.service_name = 'emi/' + module_name + '_module/' self.response = None rospy.wait_for_service(self.service_name + 'initialize', timeout=TIMEOUT) self.initialize_proxy = rospy.ServiceProxy( self.service_name + 'initialize', Initialize) rospy.wait_for_service(self.service_name + 'run_step', timeout=TIMEOUT) self.run_step_proxy = rospy.ServiceProxy( self.service_name + 'run_step', RunStep) rospy.wait_for_service(self.service_name + 'shutdown', timeout=TIMEOUT) self.shutdown_proxy = rospy.ServiceProxy( self.service_name + 'shutdown', Shutdown)
[docs] def initialize(self): """ This method triggers the initialize method served at the external module synchronously with the CLE. """ try: self.response = self.initialize_proxy.call() return self.response except rospy.ServiceException as e: logger.exception(self.service_name + 'initialize call failed: %s' % e)
[docs] def run_step(self): """ This method triggers the run_step method served at the external module synchronously with the CLE. """ try: return self.run_step_proxy.call() except rospy.ServiceException as e: logger.exception(self.service_name + 'run_step call failed: %s' % e)
[docs] def shutdown(self): """ This method triggers the shutdown method served at the external module synchronously with the CLE. """ try: shutdown_response = self.shutdown_proxy.call() self.initialize_proxy.close() self.run_step_proxy.close() self.shutdown_proxy.close() return shutdown_response except rospy.ServiceException as e: logger.exception(self.service_name + 'shutdown call failed: %s' % e)