Source code for hbp_nrp_distributed_nest.launch.DaintLauncher

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END

"""
Helper class to build and execute a formatted srun command
"""
from builtins import object
import logging
import os
import subprocess
import time

import rospy
import rosnode
from std_msgs.msg import String
import json

from hbp_nrp_cle import config

logger = logging.getLogger(__name__)

[docs]class DaintLauncher(object): """ Class constructs and executes the mpi in daint environment """ def __init__(self, executable): self._exe = executable self._cmd = None self._process = None self._launched = False self._gazebo_master_uri = None self._status_sub = None # TODO: change NestLaucher and remove this method
[docs] def add_host(self, hostname, tmpdir, processes=1): """ Ignore add_host in daint. Slurm (srun) manages this internally :param hostname: ignored :param tmpdir: ignored :param processes: ignored """ logger.info("Igoring add_host for {} in {} for {} processes".format( hostname, tmpdir, processes)) pass
[docs] def build(self): """ Construct the srun command line string with all hosts provided. """ import socket def get_ip(): """ :return: Returns public interface's IP address """ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # it doesn't even have to be reachable s.connect(('10.255.255.255', 1)) ip = s.getsockname()[0] # pylint: disable=broad-exception except: # look up on the local host ip = '127.0.0.1' finally: s.close() return ip # Reconstruct Daint nodes' $HOME # WARNING: both $HOME and $USER has been overridden in the image # daint_user = os.popen('id -un').read().rstrip().lstrip() daint_user = subprocess.check_output(['id', '-un']).rstrip().lstrip() daint_home = os.path.join('/users', daint_user) daint_shared_dir = os.environ.get('DAINT_SHARED_DIRECTORY', daint_home) # Clear log files os.popen('echo > {shared_dir}/srun.log'.format(shared_dir=daint_shared_dir)) os.popen('echo > {shared_dir}/mpi.log'.format(shared_dir=daint_shared_dir)) # build the command with per host specific configuration _sarus = '{binary} --verbose run --mpi ' \ '--mount=type=bind,source={user_home},destination={user_home} ' \ '--mount=source={log_dir},dst={log_dir},type=bind ' \ '' \ 'hbpneurorobotics/nrp:daint ' \ '{hbp}/BrainSimulation/mpi.sh {log_dir} {ros_master} {mpi_command}'.format( binary='/apps/daint/system/opt/sarus/1.0.1/bin/sarus', home=os.environ.get('HOME'), # /home2/bbpnrsoa hbp=os.environ.get('HBP'), # /home2/bbpnrsoa/nrp/src user_home=daint_home, # /users/<cscs_username> log_dir=daint_shared_dir, # eg, daint_home or /scratch/snx3000/<cscs_username> ros_master=get_ip(), mpi_command=self._exe) self._cmd = 'srun -v -C gpu -N 2 -n 2 ' \ '--chdir {wdir} --jobid {jobid} {sarus}'.format( wdir=daint_shared_dir, jobid=os.environ.get('SLURM_JOB_ID'), sarus=_sarus)
[docs] def launch(self): """ Launch the srun command and wait for successful startup of the CLE. Blocks until the CLE publishes completion on the status topic or if the srun command aborts. """ if not self._cmd: raise Exception('No command set for MPI processes, build() was not called!') # provide extra ssh flags if ssh is used on the vizcluster to ensure we can spawn # other ssh sessions / processes for Gazebo/etc. env_vars = dict(os.environ, HYDRA_LAUNCHER_EXTRA_ARGS="-M -K") # Spawn the srun command, we need to do a few special things because mvapich2 will # send SIGSTOP (like a ctrl+v or suspend) to its parent process when executing and this # would cause the entire Python stack to stop and create <defunct> processes. logger.info("Spawning MPI processes: {}".format(self._cmd)) self._process = subprocess.Popen(self._cmd.split(), # no shell to SIGSTOP/hang preexec_fn=os.setsid, # create a new session/tree stdin=subprocess.PIPE, # hangs without valid stdin stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env_vars) # environment variables # subscribe for loading status updates from the CLE, wait for completion message self._status_sub = rospy.Subscriber('/ros_cle_simulation/status', String, self._on_status) import time start = time.time() # wait until the CLE subprocess initializes properly or fails, the conditions are: # failure if process exits (bad command, failure to launch, etc.) # success if the CLE publishes a status update indicating loading is complete # wait for the process to abort or be successfully launched # wait for the srun and shifter to be execute. Timeout in 120 seconds # _launched will become True when ROSCLEServer would be initialized # triggering a message at /ros_cle_simulation/status n = 0 while not self._process.poll() and not self._launched: n += 1 time.sleep(1) # launch has failed, propagate an error to the user if not self._launched: o = self._process.stdout.read() e = self._process.stderr.read() print('PROCESS END: {}'.format(n)) if o: print('--------- STDOUT -------') print(o) print('--------- STDOUT -------') if e: print('--------- ERROR -------') print(e) print('--------- ERROR -------') raise Exception( 'ABORTING: Distributed launch failure. ' 'Timed out while waiting for /ros_cle_simulation/status service to come alive. ' 'Please contact neurorobotics@humanbrainproject.eu if this problem persists.' ) start = time.time() # wait for the gzserver script to execute or timeout in 60 seconds while not self._gazebo_master_uri and time.time() - start < 60: # ensure gzbridge is running and accessible in deployed configurations self._check_gzbridge() time.sleep(1) # gzserver launch has failed, propagate an error to the user if not self._gazebo_master_uri: raise Exception( 'ABORTING: Distributed launch failure.\n' 'Timed out while waiting for /gazebo node to come alive. ' 'Please contact neurorobotics@humanbrainproject.eu if this problem persists.' )
def _on_status(self, msg): """ Listen for CLE status messages, parse the JSON format that is also sent to the frontend progress bar. Wait for completion of the CLE loading task and mark the MPI process as successfully launched. :param msg: The ros message to parse. """ status = json.loads(msg.data) logger.info(str(status)) # ignore status bar messages, these don't give us information if a launch has failed # since the loading task will always be set to 'done' when it aborts if 'progress' in status: return # received a simulation status "tick", everything has launched successfully logger.info("===== MARKING LAUNCHED to be TRUE =====") self._launched = True # disconnect the status subscriber self._status_sub.unregister() def _check_gzbridge(self): """ gzbridge cannot be launched on remote CLE nodes since they will not be reachable by clients that are configured and able to reach the backend machines. If Gazebo is launched on a remote node (e.g. not a local install), wait for the /gazebo ROS node to appear and start gzbridge on this host (a backend server). """ if self._gazebo_master_uri: return # request the ip of the Gazebo node, result will be -1 if not found res, _, ip = rosnode.get_api_uri(rospy.get_master(), '/gazebo', True) logger.info("===== gzbridge /gazebo ip from rospy {}".format(str(ip))) if res == -1: return # replace the ROS port with the Gazebo port, configure env, and run gzbridge self._gazebo_master_uri = ip[0:ip.rfind(':') + 1] + '11345' # only need to start the gzbridge if running in a deployed configuration if '127.0.0.1' not in self._gazebo_master_uri: # this emulates the functionality in hbp_nrp_cleserver/server/LocalGazebo.py but we # cannot import and reuse due to circular dependencies os.environ['GAZEBO_MASTER_URI'] = self._gazebo_master_uri logger.info("===== GAZEBO_MASTER_URI var: {}".format(self._gazebo_master_uri)) logger.info("===== GAZEBO_MASTER_URI env: {}".format(os.environ['GAZEBO_MASTER_URI'])) os.system(config.config.get('gzbridge', 'restart-cmd'))
[docs] def run(self): """ Block until the srun command exits. Check the return code to determine if it was successful or aborted. Backwards compatibility in naming convention, must be run. """ if not self._process or not self._launched: raise Exception('No MPI process launched, cannot run until launch() is called.') import time while True: logger.info("===== STDOUT from subprocess.Popen =====\n{}".format( str(self._process.stdout.read()))) logger.info("===== STDERR from subprocess.Popen =====\n{}".format( str(self._process.stderr.read()))) time.sleep(3) # wait until it terminates, if launch fails or is killed externally, propagate an # Exception to notify the simulation factory to shutdown since the launch/run has failed if self._process.wait() != 0: raise Exception('Distributed MPI runtime failure, aborting.\nPlease contact ' 'neurorobotics@humanbrainproject.eu if this problem persists.')
[docs] def shutdown(self): """ Attempt to forecfully shutdown the srun command if it is still running and has not cleanly shut itself down. Guaranteed to be called after launch success or failure. """ # try to terminate the srun command, srun will automatically exit nautrally when all # of its spawned child processes exit or are killed, so this isn't explicitly necessary if self._process: try: self._process.kill() except OSError: # the process has already cleanly terminated pass # terminate the gzbrige process/websocket if we started it above if self._gazebo_master_uri and '127.0.0.1' not in self._gazebo_master_uri: os.system(config.config.get('gzbridge', 'stop-cmd')) # reset all class variables to prevent class reuse self._gazebo_master_uri = None self._launched = False self._process = None self._cmd = None self._exe = None