Source code for hbp_nrp_distributed_nest.launch.MPILauncher

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END

"""
Helper class to build and execute a formatted mpirun command in the format:

  mpirun -envlist <vars> -np <proc> -host <hostname/ip> -wdir <temporary work dir> <command> : ...

where each of the hosts has a specific working directory with necessary config files already
in place. Also passes environment variables required for NRP/CLE execution.
"""
from builtins import object
import logging
import os
import subprocess

import rospy
import rosnode
from std_msgs.msg import String
import json

import time
import signal, psutil

from hbp_nrp_cle import config

logger = logging.getLogger(__name__)


[docs]class MPILauncher(object): """ Class constructs and executes the mpi launch command. """ def __init__(self, executable): self._exe = executable self._hosts = [] self._cmd = None self._process = None self._launched = False self._gazebo_master_uri = None
[docs] def add_host(self, hostname, tmpdir, processes=1): """ Add a target host to the mpi launch configuration. :param hostname: The remote host name or ip. :param tmpdir: A valid temporary directory on the remote host to launch in. :param processes: The number of processes for this host. """ self._hosts.append('-np {p} -host {h} -wdir {t}' .format(p=processes, h=hostname, t=tmpdir))
[docs] def build(self): """ Construct the mpirun command line string with all hosts provided. """ if len(self._hosts) == 0: raise Exception('No target host configurations specified for MPI processes!') # build the command with per host specific configuration hoststr = ' : '.join(['{h} {e}'.format(h=host, e=self._exe) for host in self._hosts]) self._cmd = 'mpirun -v {}'.format(hoststr)
[docs] def launch(self): """ Launch the mpirun command and wait for successful startup of the CLE. Blocks until the CLE publishes completion on the status topic or if the mpirun command aborts. """ if not self._cmd: raise Exception('No command set for MPI processes, build() was not called!') # provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn # other ssh sessions / processes for Gazebo/etc. env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K") # Spawn the mpirun command, we need to do a few special things because mvapich2 will # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this # would cause the entire Python stack to stop and create <defunct> processes. logger.info("Spawning MPI processes: {}".format(self._cmd)) self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang preexec_fn=os.setsid, # create a new session/tree stdin=subprocess.PIPE, # hangs without valid stdin env=env_vars) # environment variables # wait until the CLE subprocess initializes properly or fails, the conditions are: # failure if process exits (bad command, failure to launch, etc.) # success if the CLE publishes a status update indicating loading is complete # subscribe for loading status updates from the CLE, wait for completion message status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) # wait for the process to abort or be successfully launched while not self._process.poll() and not self._launched: # ensure gzbridge is running and accessible in deployed configurations self._check_gzbridge() # very short sleep to be as responsive as possible time.sleep(0.1) # disconnect the temporary status subscriber status_sub.unregister() # launch has failed, propagate an error to the user if not self._launched: raise Exception('Distributed MPI launch failure, aborting.\nPlease contact ' 'neurorobotics@humanbrainproject.eu if this problem persists.')
def _on_status(self, msg): """ Listen for CLE status messages, parse the JSON format that is also sent to the frontend progress bar. Wait for completion of the CLE loading task and mark the MPI process as successfully launched. :param msg: The ros message to parse. """ status = json.loads(msg.data) # ignore status bar messages, these don't give us information if a launch has failed # since the loading task will always be set to 'done' when it aborts if 'progress' in status: return # received a simulation status "tick", everything has launched successfully self._launched = True def _check_gzbridge(self): """ gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients that are configured and able to reach the backend machines. If Gazebo is launched on a remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start gzbridge on this host (a backend server). """ if self._gazebo_master_uri: return # request the ip of the Gazebo node, result will be -1 if not found res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True) if res == -1: return # replace the ROS port with the Gazebo port, configure env, and run gzbridge self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345' # only need to start the gzbridge if running in a deployed configuration if '127.0.0.1' not in self._gazebo_master_uri: # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we # cannot import and reuse due to circular dependencies os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri os.system(config.config.get('gzbridge', 'restart-cmd')) def _any_child_status(self, status): try: parent = psutil.Process(self._process.pid) children = parent.children(recursive=True) return not children or any([ child.status() in status for child in parent.children(recursive=True) ]) # pylint: disable=broad-exception except Exception as ex: logger.info("Error while attempting to get the status of child processes \ of {} with pid {} whichi is {}" .format( parent.name(), parent.pid, parent.status())) logger.info("Error:{} with args{}").format( ex.message, ex.args) def _kill_children(self, signal = signal.SIGTERM): try: parent = psutil.Process(self._process.pid) children = parent.children(recursive=True) children.sort(key=lambda x: x.pid, reverse=True) for child in children: logger.info("Sending SIGTERM to {} with pid {} who is {}".format( child.name(), child.pid, child.status())) child.send_signal(signal.SIGTERM) # pylint: disable=broad-exception except Exception as ex: logger.info("Sending SIGTERM to mother proccess {} with pid {} who is {}".format( parent.name(), parent.pid, parent.status())) logger.info("Error:{} with args{}".format( ex.message, ex.args))
[docs] def run(self): """ Block until the mpirun command exits. Check the return code to determine if it was successful or aborted. Backwards compatibility in naming convention, must be run. """ if not self._process or not self._launched: raise Exception('No MPI process launched, cannot run until launch() is called.') while not self._any_child_status([psutil.STATUS_STOPPED, psutil.STATUS_ZOMBIE]): time.sleep(2) self._kill_children() if self._any_child_status([psutil.STATUS_SLEEPING, psutil.STATUS_RUNNING]): self._kill_children(signal.SIGKILL)
[docs] def shutdown(self): """ Attempt to forecfully shutdown the mpirun command if it is still running and has not cleanly shut itself down. Guaranteed to be called after launch success or failure. """ # try to terminate the mpirun command, mpirun will automatically exit nautrally when all # of its spawned child processes exit or are killed, so this isn't explicitly necessary if self._process: try: self._process.kill() except OSError: # the process has already cleanly terminated pass # terminate the gzbrige process/websocket if we started it above if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri: os.system(config.config.get('gzbridge', 'stop-cmd')) # reset all class variables to prevent class reuse self._gazebo_master_uri = None self._launched = False self._process = None self._cmd = None self._exe = None