Source code for hbp_nrp_distributed_nest.launch.NestLauncher

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
Setup, build, and launch a distributed Nest instance that will spawn the CLE and
requested brain processes.
"""

from builtins import object
from hbp_nrp_distributed_nest.launch.host.LocalLauncher import LocalLauncher
from hbp_nrp_distributed_nest.launch.MPILauncher import MPILauncher
from hbp_nrp_distributed_nest.launch.DaintLauncher import DaintLauncher

import os
import random
import sys

import logging
logger = logging.getLogger(__name__)


[docs]class NestLauncher(object): """ Setup, build, and launch a distributed Nest instance that will spawn the CLE and requested brain processes. NOTE: This class intentionally does not inherit SimulationServer (even though it is an implementation of it) in order to avoid duplicate notificators """ def __init__(self, sim_config): """ Store all experiment configuration parameters so that they can be propagated to the remote hosts. :param exc: the experiment configuration :param bibi: the BIBI configuration. :param server_host: Target Gazebo/brain process host (e.g. local or lugano) :param reservation: Reservation string for cluster backend (None is a valid option) :param sim_id: The id of the simulation/experiment to be launched. :param timeout: The default simulation timeout (time initially allocated). """ self._sim_config = sim_config # host specific launch configuration/allocation self._launcher = None # the MPI process launcher for the CLE and brain processes self.mpilauncher = None # we should call the except_hook when something goes wrong in the simulation, # but currently we don't # pylint: disable=unused-argument
[docs] def initialize(self, except_hook): """ Construct the launch configuration that will spawn CLE + brain processes on distributed hosts. """ # TODO: figure out why this replace was done. environment_file was a param # nrp_models_path = os.environ.get('NRP_MODELS_DIRECTORY').rstrip('/') # self._env_file = environment_file.replace(nrp_models_path, '$NRP_MODELS_DIRECTORY') # create a host specific launcher if self._sim_config.gzserver_host == 'local': self._launcher = LocalLauncher() else: raise Exception('Unsupported server host {}, cannot configure distributed launch!' .format(self._sim_config.gzserver_host)) # command line argument friendly versions of timeout and reservation arguments # the receiving processes must understand how to convert these back reservation_str = self._sim_config.reservation if self._sim_config.reservation else '' timeout_str = str(self._sim_config.timeout).replace(' ', '_') rng_str = (self._sim_config.rng_seed if self._sim_config.rng_seed else random.randint(1, sys.maxsize)) # construct the actual MPI launcher with the process that determines if the CLE or # standalone brain should be launched # TODO: Find way to send simconfig object directly to the DistributedNestProcess args = ['--exdconf={}'.format(os.path.realpath(self._sim_config.exc_path.abs_path)), '--gzserver-host={}'.format(self._sim_config.gzserver_host), '--reservation={}'.format(reservation_str), '--sim-id={}'.format(self._sim_config._sim_id), '--timeout={}'.format(timeout_str), '--timeout_type={}'.format(self._sim_config.timeout_type), '--rng-seed={}'.format(rng_str), '--token={}'.format(self._sim_config._token), '--experiment_id={}'.format(self._sim_config.experiment_id), '--profiler={}'.format(self._sim_config.profiler)] exe = '{python} -u -m hbp_nrp_distributed_nest.launch.main {args}'\ .format(python=sys.executable, args=' '.join(args)) logger.info("Initializing MPI launcher") launcher = MPILauncher self.mpilauncher = launcher(exe) # build and deploy configuration self._build()
def _build(self): """ Perform launcher and MPI build and deployment, can be invoked by subclasses after their implementation specific initialize. """ # deploy the generated configuration files / launch scripts to the target host self._launcher.deploy() # construct the actual MPI launcher based on the deployed configuration self.mpilauncher.add_host(self._launcher.hostname, self._launcher.host_tmpdir, self._sim_config.num_brain_processes) # construct the mpi command line with the above host/launch information self.mpilauncher.build() # for error propagation reasons, we have to launch and init the MPI processes to emulate # the behavior of the single process launcher, if the mpirun command fails or the CLE/brain # processes fail then the error will be properly propagated logger.info("Launching MPI") self.mpilauncher.launch()
[docs] def run(self): """ Runs the assembled simulation """ self.mpilauncher.run()
[docs] def shutdown(self): """ Shutdown all spawned processes and cleanup temporary files. """ # terminate the mpirun command (if it is still running) if self.mpilauncher is not None: self.mpilauncher.shutdown() self.mpilauncher = None # perform any launcher host specific cleanup if self._launcher: self._launcher.shutdown() self._launcher = None # finally, cleanup the roscore and any registrations launched by the above os.system("echo 'y' | timeout -s SIGKILL 10s rosnode cleanup >/dev/null 2>&1")