# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Setup, build, and launch a distributed Nest instance that will spawn the CLE and
requested brain processes.
"""
from builtins import object
from hbp_nrp_distributed_nest.launch.host.LocalLauncher import LocalLauncher
from hbp_nrp_distributed_nest.launch.MPILauncher import MPILauncher
from hbp_nrp_distributed_nest.launch.DaintLauncher import DaintLauncher
import os
import random
import sys
import logging
logger = logging.getLogger(__name__)
[docs]class NestLauncher(object):
"""
Setup, build, and launch a distributed Nest instance that will spawn the CLE and
requested brain processes.
NOTE: This class intentionally does not inherit SimulationServer
(even though it is an implementation of it) in order to avoid duplicate notificators
"""
def __init__(self, sim_config):
"""
Store all experiment configuration parameters so that they can be propagated
to the remote hosts.
:param exc: the experiment configuration
:param bibi: the BIBI configuration.
:param server_host: Target Gazebo/brain process host (e.g. local or lugano)
:param reservation: Reservation string for cluster backend (None is a valid option)
:param sim_id: The id of the simulation/experiment to be launched.
:param timeout: The default simulation timeout (time initially allocated).
"""
self._sim_config = sim_config
# host specific launch configuration/allocation
self._launcher = None
# the MPI process launcher for the CLE and brain processes
self.mpilauncher = None
# we should call the except_hook when something goes wrong in the simulation,
# but currently we don't
# pylint: disable=unused-argument
[docs] def initialize(self, except_hook):
"""
Construct the launch configuration that will spawn CLE + brain processes
on distributed hosts.
"""
# TODO: figure out why this replace was done. environment_file was a param
# nrp_models_path = os.environ.get('NRP_MODELS_DIRECTORY').rstrip('/')
# self._env_file = environment_file.replace(nrp_models_path, '$NRP_MODELS_DIRECTORY')
# create a host specific launcher
if self._sim_config.gzserver_host == 'local':
self._launcher = LocalLauncher()
else:
raise Exception('Unsupported server host {}, cannot configure distributed launch!'
.format(self._sim_config.gzserver_host))
# command line argument friendly versions of timeout and reservation arguments
# the receiving processes must understand how to convert these back
reservation_str = self._sim_config.reservation if self._sim_config.reservation else ''
timeout_str = str(self._sim_config.timeout).replace(' ', '_')
rng_str = (self._sim_config.rng_seed if self._sim_config.rng_seed
else random.randint(1, sys.maxsize))
# construct the actual MPI launcher with the process that determines if the CLE or
# standalone brain should be launched
# TODO: Find way to send simconfig object directly to the DistributedNestProcess
args = ['--exdconf={}'.format(os.path.realpath(self._sim_config.exc_path.abs_path)),
'--gzserver-host={}'.format(self._sim_config.gzserver_host),
'--reservation={}'.format(reservation_str),
'--sim-id={}'.format(self._sim_config._sim_id),
'--timeout={}'.format(timeout_str),
'--timeout_type={}'.format(self._sim_config.timeout_type),
'--rng-seed={}'.format(rng_str),
'--token={}'.format(self._sim_config._token),
'--experiment_id={}'.format(self._sim_config.experiment_id),
'--profiler={}'.format(self._sim_config.profiler)]
exe = '{python} -u -m hbp_nrp_distributed_nest.launch.main {args}'\
.format(python=sys.executable, args=' '.join(args))
logger.info("Initializing MPI launcher")
launcher = MPILauncher
self.mpilauncher = launcher(exe)
# build and deploy configuration
self._build()
def _build(self):
"""
Perform launcher and MPI build and deployment, can be invoked by subclasses after their
implementation specific initialize.
"""
# deploy the generated configuration files / launch scripts to the target host
self._launcher.deploy()
# construct the actual MPI launcher based on the deployed configuration
self.mpilauncher.add_host(self._launcher.hostname,
self._launcher.host_tmpdir,
self._sim_config.num_brain_processes)
# construct the mpi command line with the above host/launch information
self.mpilauncher.build()
# for error propagation reasons, we have to launch and init the MPI processes to emulate
# the behavior of the single process launcher, if the mpirun command fails or the CLE/brain
# processes fail then the error will be properly propagated
logger.info("Launching MPI")
self.mpilauncher.launch()
[docs] def run(self):
"""
Runs the assembled simulation
"""
self.mpilauncher.run()
[docs] def shutdown(self):
"""
Shutdown all spawned processes and cleanup temporary files.
"""
# terminate the mpirun command (if it is still running)
if self.mpilauncher is not None:
self.mpilauncher.shutdown()
self.mpilauncher = None
# perform any launcher host specific cleanup
if self._launcher:
self._launcher.shutdown()
self._launcher = None
# finally, cleanup the roscore and any registrations launched by the above
os.system("echo 'y' | timeout -s SIGKILL 10s rosnode cleanup >/dev/null 2>&1")