# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
This module contains the CLE process logic for the simulation assembly
"""
from __future__ import print_function
from __future__ import absolute_import
from builtins import range
import os
import argparse
import logging
from .DistributedCLESimulationAssembly import DistributedCLESimulationAssembly
simulation = None
logger = logging.getLogger(__name__)
# pylint: disable=too-many-statements, too-many-locals
[docs]def launch_cle(argv): # pragma: no cover
"""
Launch the distributed CLE process with given assembly class. Process the command line and
handles all shutdown events to terminate other MPI processes.
:param assembly_class: An invokable CLEGazeboSimulationAssembly class definition to use.
"""
# import MPI here, must be done after Nest in subclass adapters
from hbp_nrp_cle.brainsim import COMM_NRP
# exit code, 0 for success and -1 for any failures
try:
if os.environ["ROS_MASTER_URI"] == "":
raise Exception("You should run ROS first.")
parser = argparse.ArgumentParser()
parser.add_argument('--exdconf', dest='exd_file',
help='specify the ExDConfiguration file', required=True)
parser.add_argument('--gzserver-host', dest='gzserver_host',
help='the gzserver target host', required=True)
parser.add_argument('--reservation', dest='reservation', default=None,
help='cluster resource reservation', required=False)
parser.add_argument('--sim-id', dest='sim_id', type=int,
help='the simulation id to use', required=True)
parser.add_argument('--timeout', dest='timeout',
help='the simulation default time allocated', required=True)
parser.add_argument('--timeout_type', dest='timeout_type',
help='the time type for timeout', required=True)
parser.add_argument('--rng-seed', dest='rng_seed',
help='the global experiment RNG seed', required=True)
parser.add_argument('-v', '--verbose', action='store_true',
help='increase output verbosity')
parser.add_argument('--token', dest='token',
help='specify the user Token', required=True)
parser.add_argument('--experiment_id', dest='experiment_id',
help='specify the experiment id', required=True)
parser.add_argument('--profiler', dest='profiler',
help='enabling the profiler in the CLE', required=True)
args = parser.parse_args(argv)
# expand any parameters (e.g., NRP_EXPERIMENTS_DIRECTORY) in paths
args.exd_file = os.path.expandvars(args.exd_file)
# simplified launch process below from ROSCLESimulationFactory.py
# avoid circular dependency by importing here
import rospy
from hbp_nrp_cleserver.server import ROS_CLE_NODE_NAME
from hbp_nrp_cleserver.server.ROSCLESimulationFactory import set_up_logger
from hbp_nrp_commons.sim_config.SimConfig import SimConfig
# reconfigure the logger to stdout as done in ROSCLESimulationFactory.py otherwise all
# output will be trapped by the ROS logger after the first ROS node is initialized
rospy.init_node(ROS_CLE_NODE_NAME, anonymous=True)
set_up_logger(None, args.verbose)
# parse the timeout string command line argument into a valid datetime
if args.timeout_type == "real":
import dateutil.parser as datetime_parser
timeout_parsed = datetime_parser.parse(args.timeout.replace('_', ' '))
else:
timeout_parsed = int(args.timeout)
# check the reservation argument, if empty default to None
if args.reservation == '':
args.reservation = None
sim_config = SimConfig(args.exd_file,
sim_id=args.sim_id,
gzserver_host=args.gzserver_host,
reservation=args.reservation,
timeout=timeout_parsed,
timeout_type=args.timeout_type,
playback_path=None,
context_id=None,
token=args.token,
experiment_id=args.experiment_id,
brain_processes=None,
rng_seed=int(args.rng_seed),
profiler=int(args.profiler))
# construct the simulation with given assembly class
global simulation
simulation = DistributedCLESimulationAssembly(sim_config)
simulation.initialize(None)
if simulation.cle_server is None:
raise Exception("Error in cle_function_init. Cannot start simulation.")
# FIXME: This should be done more cleanly within the adapter, see [NRRPLT-4858]
# the tag is a magic number to avoid circular build/release dependency for now but
# this will be removed when the referenced bug is fixed
# notify MPI processes that configuration is complete
for rank in range(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('ready', dest=rank, tag=100)
COMM_NRP.Barrier()
logger.info('Starting CLE.')
simulation.run() # This is a blocking call, not to be confused with
except Exception as e: # pylint: disable=broad-except
# if running through MPI, catch Exception and terminate below to ensure brain processes
# are also killed
logger.error('CLE aborted with message {}, terminating.'.format(e.message))
# if no logger
print('[ MPI ] CLE aborted with message {}, terminating.'.format(e.message))
logger.exception(e)
for rank in range(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('abort', dest=rank, tag=100)
print('[ MPI ] ABORTing distributed CLE process: {}'.format(str(COMM_NRP.Get_rank())))
COMM_NRP.Abort(-1)
finally:
# always attempt to shutdown the CLE launcher and release resources
# global simulation
if simulation:
logger.info('Shutting down CLE.')
simulation.shutdown()
logger.info('Shutdown complete, terminating.')
# terminate the spawned brain processes
# send a shutdown message in case the brain processes are in a recv loop at startup since they
# seem to block and ignore the Abort command until receiving a message
for rank in range(COMM_NRP.Get_size()):
if rank != COMM_NRP.Get_rank():
COMM_NRP.send('shutdown', dest=rank, tag=100)