Source code for hbp_nrp_distributed_nest.launch.DistributedCLEProcess

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module contains the CLE process logic for the simulation assembly
"""
from __future__ import print_function
from __future__ import absolute_import

from builtins import range
import os
import argparse
import logging

from .DistributedCLESimulationAssembly import DistributedCLESimulationAssembly

simulation = None

logger = logging.getLogger(__name__)


# pylint: disable=too-many-statements, too-many-locals
[docs]def launch_cle(argv): # pragma: no cover """ Launch the distributed CLE process with given assembly class. Process the command line and handles all shutdown events to terminate other MPI processes. :param assembly_class: An invokable CLEGazeboSimulationAssembly class definition to use. """ # import MPI here, must be done after Nest in subclass adapters from hbp_nrp_cle.brainsim import COMM_NRP # exit code, 0 for success and -1 for any failures try: if os.environ["ROS_MASTER_URI"] == "": raise Exception("You should run ROS first.") parser = argparse.ArgumentParser() parser.add_argument('--exdconf', dest='exd_file', help='specify the ExDConfiguration file', required=True) parser.add_argument('--gzserver-host', dest='gzserver_host', help='the gzserver target host', required=True) parser.add_argument('--reservation', dest='reservation', default=None, help='cluster resource reservation', required=False) parser.add_argument('--sim-id', dest='sim_id', type=int, help='the simulation id to use', required=True) parser.add_argument('--timeout', dest='timeout', help='the simulation default time allocated', required=True) parser.add_argument('--timeout_type', dest='timeout_type', help='the time type for timeout', required=True) parser.add_argument('--rng-seed', dest='rng_seed', help='the global experiment RNG seed', required=True) parser.add_argument('-v', '--verbose', action='store_true', help='increase output verbosity') parser.add_argument('--token', dest='token', help='specify the user Token', required=True) parser.add_argument('--experiment_id', dest='experiment_id', help='specify the experiment id', required=True) parser.add_argument('--profiler', dest='profiler', help='enabling the profiler in the CLE', required=True) args = parser.parse_args(argv) # expand any parameters (e.g., NRP_EXPERIMENTS_DIRECTORY) in paths args.exd_file = os.path.expandvars(args.exd_file) # simplified launch process below from ROSCLESimulationFactory.py # avoid circular dependency by importing here import rospy from hbp_nrp_cleserver.server import ROS_CLE_NODE_NAME from hbp_nrp_cleserver.server.ROSCLESimulationFactory import set_up_logger from hbp_nrp_commons.sim_config.SimConfig import SimConfig # reconfigure the logger to stdout as done in ROSCLESimulationFactory.py otherwise all # output will be trapped by the ROS logger after the first ROS node is initialized rospy.init_node(ROS_CLE_NODE_NAME, anonymous=True) set_up_logger(None, args.verbose) # parse the timeout string command line argument into a valid datetime if args.timeout_type == "real": import dateutil.parser as datetime_parser timeout_parsed = datetime_parser.parse(args.timeout.replace('_', ' ')) else: timeout_parsed = int(args.timeout) # check the reservation argument, if empty default to None if args.reservation == '': args.reservation = None sim_config = SimConfig(args.exd_file, sim_id=args.sim_id, gzserver_host=args.gzserver_host, reservation=args.reservation, timeout=timeout_parsed, timeout_type=args.timeout_type, playback_path=None, context_id=None, token=args.token, experiment_id=args.experiment_id, brain_processes=None, rng_seed=int(args.rng_seed), profiler=int(args.profiler)) # construct the simulation with given assembly class global simulation simulation = DistributedCLESimulationAssembly(sim_config) simulation.initialize(None) if simulation.cle_server is None: raise Exception("Error in cle_function_init. Cannot start simulation.") # FIXME: This should be done more cleanly within the adapter, see [NRRPLT-4858] # the tag is a magic number to avoid circular build/release dependency for now but # this will be removed when the referenced bug is fixed # notify MPI processes that configuration is complete for rank in range(COMM_NRP.Get_size()): if rank != COMM_NRP.Get_rank(): COMM_NRP.send('ready', dest=rank, tag=100) COMM_NRP.Barrier() logger.info('Starting CLE.') simulation.run() # This is a blocking call, not to be confused with except Exception as e: # pylint: disable=broad-except # if running through MPI, catch Exception and terminate below to ensure brain processes # are also killed logger.error('CLE aborted with message {}, terminating.'.format(e.message)) # if no logger print('[ MPI ] CLE aborted with message {}, terminating.'.format(e.message)) logger.exception(e) for rank in range(COMM_NRP.Get_size()): if rank != COMM_NRP.Get_rank(): COMM_NRP.send('abort', dest=rank, tag=100) print('[ MPI ] ABORTing distributed CLE process: {}'.format(str(COMM_NRP.Get_rank()))) COMM_NRP.Abort(-1) finally: # always attempt to shutdown the CLE launcher and release resources # global simulation if simulation: logger.info('Shutting down CLE.') simulation.shutdown() logger.info('Shutdown complete, terminating.') # terminate the spawned brain processes # send a shutdown message in case the brain processes are in a recv loop at startup since they # seem to block and ignore the Abort command until receiving a message for rank in range(COMM_NRP.Get_size()): if rank != COMM_NRP.Get_rank(): COMM_NRP.send('shutdown', dest=rank, tag=100)