# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Helper class to build and execute a formatted mpirun command in the format:
mpirun -envlist <vars> -np <proc> -host <hostname/ip> -wdir <temporary work dir> <command> : ...
where each of the hosts has a specific working directory with necessary config files already
in place. Also passes environment variables required for NRP/CLE execution.
"""
from builtins import object
import logging
import os
import subprocess
import rospy
import rosnode
from std_msgs.msg import String
import json
import time
import signal, psutil
from hbp_nrp_cle import config
logger = logging.getLogger(__name__)
[docs]class MPILauncher(object):
"""
Class constructs and executes the mpi launch command.
"""
def __init__(self, executable):
self._exe = executable
self._hosts = []
self._cmd = None
self._process = None
self._launched = False
self._gazebo_master_uri = None
[docs] def add_host(self, hostname, tmpdir, processes=1):
"""
Add a target host to the mpi launch configuration.
:param hostname: The remote host name or ip.
:param tmpdir: A valid temporary directory on the remote host to launch in.
:param processes: The number of processes for this host.
"""
self._hosts.append('-np {p} -host {h} -wdir {t}'
.format(p=processes, h=hostname, t=tmpdir))
[docs] def build(self):
"""
Construct the mpirun command line string with all hosts provided.
"""
if len(self._hosts) == 0:
raise Exception('No target host configurations specified for MPI processes!')
# build the command with per host specific configuration
hoststr = ' : '.join(['{h} {e}'.format(h=host, e=self._exe) for host in self._hosts])
self._cmd = 'mpirun -v {}'.format(hoststr)
[docs] def launch(self):
"""
Launch the mpirun command and wait for successful startup of the CLE. Blocks until the
CLE publishes completion on the status topic or if the mpirun command aborts.
"""
if not self._cmd:
raise Exception('No command set for MPI processes, build() was not called!')
# provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn
# other ssh sessions / processes for Gazebo/etc.
env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K")
# Spawn the mpirun command, we need to do a few special things because mvapich2 will
# send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this
# would cause the entire Python stack to stop and create <defunct> processes.
logger.info("Spawning MPI processes: {}".format(self._cmd))
self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang
preexec_fn=os.setsid, # create a new session/tree
stdin=subprocess.PIPE, # hangs without valid stdin
env=env_vars) # environment variables
# wait until the CLE subprocess initializes properly or fails, the conditions are:
# failure if process exits (bad command, failure to launch, etc.)
# success if the CLE publishes a status update indicating loading is complete
# subscribe for loading status updates from the CLE, wait for completion message
status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status)
# wait for the process to abort or be successfully launched
while not self._process.poll() and not self._launched:
# ensure gzbridge is running and accessible in deployed configurations
self._check_gzbridge()
# very short sleep to be as responsive as possible
time.sleep(0.1)
# disconnect the temporary status subscriber
status_sub.unregister()
# launch has failed, propagate an error to the user
if not self._launched:
raise Exception('Distributed MPI launch failure, aborting.\nPlease contact '
'neurorobotics@humanbrainproject.eu if this problem persists.')
def _on_status(self, msg):
"""
Listen for CLE status messages, parse the JSON format that is also sent to the frontend
progress bar. Wait for completion of the CLE loading task and mark the MPI process as
successfully launched.
:param msg: The ros message to parse.
"""
status = json.loads(msg.data)
# ignore status bar messages, these don't give us information if a launch has failed
# since the loading task will always be set to 'done' when it aborts
if 'progress' in status:
return
# received a simulation status "tick", everything has launched successfully
self._launched = True
def _check_gzbridge(self):
"""
gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients
that are configured and able to reach the backend machines. If Gazebo is launched on a
remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start
gzbridge on this host (a backend server).
"""
if self._gazebo_master_uri:
return
# request the ip of the Gazebo node, result will be -1 if not found
res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True)
if res == -1:
return
# replace the ROS port with the Gazebo port, configure env, and run gzbridge
self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345'
# only need to start the gzbridge if running in a deployed configuration
if '127.0.0.1' not in self._gazebo_master_uri:
# this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we
# cannot import and reuse due to circular dependencies
os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri
os.system(config.config.get('gzbridge', 'restart-cmd'))
def _any_child_status(self, status):
try:
parent = psutil.Process(self._process.pid)
children = parent.children(recursive=True)
return not children or any([
child.status() in status
for child in parent.children(recursive=True)
])
# pylint: disable=broad-exception
except Exception as ex:
logger.info("Error while attempting to get the status of child processes \
of {} with pid {} whichi is {}" .format(
parent.name(), parent.pid, parent.status()))
logger.info("Error:{} with args{}").format(
ex.message,
ex.args)
def _kill_children(self, signal = signal.SIGTERM):
try:
parent = psutil.Process(self._process.pid)
children = parent.children(recursive=True)
children.sort(key=lambda x: x.pid, reverse=True)
for child in children:
logger.info("Sending SIGTERM to {} with pid {} who is {}".format(
child.name(), child.pid, child.status()))
child.send_signal(signal.SIGTERM)
# pylint: disable=broad-exception
except Exception as ex:
logger.info("Sending SIGTERM to mother proccess {} with pid {} who is {}".format(
parent.name(), parent.pid, parent.status()))
logger.info("Error:{} with args{}".format(
ex.message,
ex.args))
[docs] def run(self):
"""
Block until the mpirun command exits. Check the return code to determine if it was
successful or aborted. Backwards compatibility in naming convention, must be run.
"""
if not self._process or not self._launched:
raise Exception('No MPI process launched, cannot run until launch() is called.')
while not self._any_child_status([psutil.STATUS_STOPPED, psutil.STATUS_ZOMBIE]):
time.sleep(2)
self._kill_children()
if self._any_child_status([psutil.STATUS_SLEEPING, psutil.STATUS_RUNNING]):
self._kill_children(signal.SIGKILL)
[docs] def shutdown(self):
"""
Attempt to forecfully shutdown the mpirun command if it is still running and has not
cleanly shut itself down. Guaranteed to be called after launch success or failure.
"""
# try to terminate the mpirun command, mpirun will automatically exit nautrally when all
# of its spawned child processes exit or are killed, so this isn't explicitly necessary
if self._process:
try:
self._process.kill()
except OSError: # the process has already cleanly terminated
pass
# terminate the gzbrige process/websocket if we started it above
if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri:
os.system(config.config.get('gzbridge', 'stop-cmd'))
# reset all class variables to prevent class reuse
self._gazebo_master_uri = None
self._launched = False
self._process = None
self._cmd = None
self._exe = None