Source code for hbp_nrp_distributed_nest.launch.NestBrainProcess

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
A distributed brain process that can be launched standalone on remote hosts.
"""
from builtins import object
from hbp_nrp_cle.brainsim import config

import pyNN.nest as sim
import nest
import hbp_nrp_cle.tf_framework.config as tf_config
from collections import OrderedDict

from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter
from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter
from hbp_nrp_cle.cle.ClosedLoopEngine import ClosedLoopEngine

from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice
from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter import (
    ICleanableTransferFunctionParameter)

from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
import traceback


[docs]class NestBrainProcess(object): """ A distributed Nest brain process that can be launched standalone on remote hosts. """ # tag to listen for MPI configuration/command messages MPI_MSG_TAG = 100 def __init__(self, sim_config): """ Nest will automatically allocate the brain in a round-robin fashion under the hood, we do not need to do anything explicitly. :param sim_config: A sim config object that "similar" to the one used for the CLE process """ self._sim_config = sim_config # set the RNG seed before initializing any PyNN interfaces in the brain controller config.rng_seed = sim_config.rng_seed # integration timestep between simulators, in seconds (default to CLE value) self._timestep = (ClosedLoopEngine.DEFAULT_TIMESTEP if sim_config.timestep is None else sim_config.timestep) # spawn CLE components that will handle loading the brain file and interfaces self._brain_controller = PyNNControlAdapter(sim) self._brain_controller.initialize() self._brain_controller.load_brain(sim_config.brain_model.resource_path.abs_path) self._brain_controller.load_populations(sim_config.get_populations_dict()) # spawn the communication adapter for use as needed self._brain_communicator = PyNNNestCommunicationAdapter() self._brain_communicator.initialize() # store created devices, to be used when we can add/delete them dynamically self.devices = OrderedDict() # status variables for this process self._ready = False self._running = False #pylint: disable=too-many-branches
[docs] def run(self): """ Blocking run loop for this brain process. First accept any transfer function configuration via MPI messages and then block running the brain until terminated externally by the CLE shutting down. """ # listen for transfer function creation messages until the CLE tells us to start self._ready = False self._running = True # run until shutdown or an Exception is raised while self._running: # block and read messages from the CLE data = COMM_NRP.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG) # new transfer function, dictionary of parameters if isinstance(data, dict): if 'command' not in data: raise Exception('Remote brain process received unknown message: %s' % str(data)) command = data['command'] # create and connect a TF if command == 'ConnectTF': self._connect_tf(data) # delete a previously created TF elif command == 'DeleteTF': self._delete_tf(data) # load/reload the brain and populations elif command == 'LoadBrain': self._load_brain(data) # handle updates that use Nest directly (PyNNNest adapters) elif command == 'SetStatus': nest.SetStatus(data['ids'], data['params']) # command and control string from the CLE elif data == 'ready': # TF loading is complete, we can start the simulation self._ready = True # notify all processes start at the same time (all Nest commands barrier themselves) COMM_NRP.Barrier() # step the simulation, commanded by the CLE elif data == 'step': # run the coordinated simulation step # print "[MPI] ===================== step =======================" self._brain_controller.run_step(self._timestep * 1000.0) # msec self._brain_communicator.refresh_buffers(0.0) # CLE shutdown, abandon the recv loop which may block MPI_ABORT commands elif data == 'shutdown': # handle shutdown event from CLE, no barrier to prevent blocking on abort self._running = False # self._brain_controller.shutdown() # self._brain_communicator.shutdown() # CLE abort, abandon the recv loop which may block MPI_ABORT commands elif data == 'abort': # handle abort event from CLE, no barrier to prevent blocking on abort self._running = False # self._brain_controller.shutdown() # self._brain_communicator.shutdown() raise Exception('Raising exception to ABORT distributed NEST process: {}'.format( str(COMM_NRP.Get_rank()) )) # unknown message, this is a critical failure since this should never happen # fully abort and log the condition else: raise Exception('Remote brain process received unknown message: %s' % str(data))
def _connect_tf(self, params): """ Reflect a transfer function connection made on the CLE side by performing the same connection on this side. :param params: The connectivity/synapse parameters passed by the CLE. """ # get the population of neurons from params def get_population_from_label(pop, label): """ Given an input population and a label returns a population with the specified label. This can be the population itself or its root (grandparent) """ if pop.label == label: return pop elif isinstance(pop, sim.PopulationView) and pop.grandparent.label == label: return pop.grandparent else: return None def get_population_from_brain(label): """ Finds a population in brain with an specified label. """ # check if label in brain if label in tf_config.brain_root.__dict__: return tf_config.brain_root.__dict__[label] # check if label in circuit populations elif 'circuit' in tf_config.brain_root.__dict__: circuit = tf_config.brain_root.__dict__['circuit'] pops = circuit.populations if isinstance(circuit,sim.Assembly) else [circuit] for pop in pops: p = get_population_from_label(pop,label) if p: return p # could not find population return None populations = [] for assembly in params['assemblies']: a = [] for p in assembly: population = get_population_from_brain(p[0]) if population and p[1] is not None: population = sim.PopulationView(population,p[1]) a += [population] if len(a) == 1: populations += [a[0]] else: populations += [sim.Assembly(*a)] brain_pop = populations[0] if len(populations) == 1 else populations # perform the actual device creation/connection if params['type'] == 'sink': device = self._brain_communicator.register_spike_sink(brain_pop, params['device'], **params['params']) else: device = self._brain_communicator.register_spike_source(brain_pop, params['device'], **params['params']) # mark the timestamp from CLE process for tracking between processes device.timestep = params['timestep'] # mark the device as MPI-aware, only used by Nest-specific devices if not isinstance(brain_pop,list) and isinstance(device, PyNNNestDevice): setattr(device, 'mpi_aware', True) else: for d in device.devices: if isinstance(d, PyNNNestDevice): setattr(d, 'mpi_aware', True) # store the device in a way that we can easily retrieve later self.devices[device.timestep] = device def _delete_tf(self, params): """ Disconnect the specified device and remove it from tracking. :param params: The device parameters passed by the CLE. """ device = self.devices.pop(params['timestep'], None) if device is not None: if device in self._brain_communicator.generator_devices: self._brain_communicator.unregister_spike_source(device) else: self._brain_communicator.unregister_spike_sink(device) if isinstance(device, ICleanableTransferFunctionParameter): device.cleanup() def _load_brain(self, params): """ Load/reload the brain file and population definitions. :param params: The brain parameters passed by the CLE. """ # ignore any commands during simulation construction if not self._ready: return # mirror the CLE brain loading order, the CLE process will handle errors # preserve the brain communicator if there is an error in the brain to mirror the CLE try: # discard any previously loaded brain and load the new spec self._brain_controller.shutdown() self._brain_controller.load_brain(params['file']) self._brain_controller.load_populations(params['populations']) # reinitialize the communication adapter to connect to the "new" brain self._brain_communicator.shutdown() self._brain_communicator.initialize() # remove the list of old TF devices, no need to shut them down self.devices.clear() # pylint: disable=bare-except except: traceback.print_exc()