# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
A distributed brain process that can be launched standalone on remote hosts.
"""
from builtins import object
from hbp_nrp_cle.brainsim import config
import pyNN.nest as sim
import nest
import hbp_nrp_cle.tf_framework.config as tf_config
from collections import OrderedDict
from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter
from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter
from hbp_nrp_cle.cle.ClosedLoopEngine import ClosedLoopEngine
from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice
from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter import (
ICleanableTransferFunctionParameter)
from mpi4py import MPI
from hbp_nrp_cle.brainsim import COMM_NRP
import traceback
[docs]class NestBrainProcess(object):
"""
A distributed Nest brain process that can be launched standalone on remote hosts.
"""
# tag to listen for MPI configuration/command messages
MPI_MSG_TAG = 100
def __init__(self, sim_config):
"""
Nest will automatically allocate the brain in a round-robin
fashion under the hood, we do not need to do anything explicitly.
:param sim_config: A sim config object that "similar" to the one used for the CLE process
"""
self._sim_config = sim_config
# set the RNG seed before initializing any PyNN interfaces in the brain controller
config.rng_seed = sim_config.rng_seed
# integration timestep between simulators, in seconds (default to CLE value)
self._timestep = (ClosedLoopEngine.DEFAULT_TIMESTEP
if sim_config.timestep is None else sim_config.timestep)
# spawn CLE components that will handle loading the brain file and interfaces
self._brain_controller = PyNNControlAdapter(sim)
self._brain_controller.initialize()
self._brain_controller.load_brain(sim_config.brain_model.resource_path.abs_path)
self._brain_controller.load_populations(sim_config.get_populations_dict())
# spawn the communication adapter for use as needed
self._brain_communicator = PyNNNestCommunicationAdapter()
self._brain_communicator.initialize()
# store created devices, to be used when we can add/delete them dynamically
self.devices = OrderedDict()
# status variables for this process
self._ready = False
self._running = False
#pylint: disable=too-many-branches
[docs] def run(self):
"""
Blocking run loop for this brain process. First accept any transfer function configuration
via MPI messages and then block running the brain until terminated externally by the CLE
shutting down.
"""
# listen for transfer function creation messages until the CLE tells us to start
self._ready = False
self._running = True
# run until shutdown or an Exception is raised
while self._running:
# block and read messages from the CLE
data = COMM_NRP.recv(source=MPI.ANY_SOURCE, tag=NestBrainProcess.MPI_MSG_TAG)
# new transfer function, dictionary of parameters
if isinstance(data, dict):
if 'command' not in data:
raise Exception('Remote brain process received unknown message: %s' % str(data))
command = data['command']
# create and connect a TF
if command == 'ConnectTF':
self._connect_tf(data)
# delete a previously created TF
elif command == 'DeleteTF':
self._delete_tf(data)
# load/reload the brain and populations
elif command == 'LoadBrain':
self._load_brain(data)
# handle updates that use Nest directly (PyNNNest adapters)
elif command == 'SetStatus':
nest.SetStatus(data['ids'], data['params'])
# command and control string from the CLE
elif data == 'ready':
# TF loading is complete, we can start the simulation
self._ready = True
# notify all processes start at the same time (all Nest commands barrier themselves)
COMM_NRP.Barrier()
# step the simulation, commanded by the CLE
elif data == 'step':
# run the coordinated simulation step
# print "[MPI] ===================== step ======================="
self._brain_controller.run_step(self._timestep * 1000.0) # msec
self._brain_communicator.refresh_buffers(0.0)
# CLE shutdown, abandon the recv loop which may block MPI_ABORT commands
elif data == 'shutdown':
# handle shutdown event from CLE, no barrier to prevent blocking on abort
self._running = False
# self._brain_controller.shutdown()
# self._brain_communicator.shutdown()
# CLE abort, abandon the recv loop which may block MPI_ABORT commands
elif data == 'abort':
# handle abort event from CLE, no barrier to prevent blocking on abort
self._running = False
# self._brain_controller.shutdown()
# self._brain_communicator.shutdown()
raise Exception('Raising exception to ABORT distributed NEST process: {}'.format(
str(COMM_NRP.Get_rank())
))
# unknown message, this is a critical failure since this should never happen
# fully abort and log the condition
else:
raise Exception('Remote brain process received unknown message: %s' % str(data))
def _connect_tf(self, params):
"""
Reflect a transfer function connection made on the CLE side by performing the same
connection on this side.
:param params: The connectivity/synapse parameters passed by the CLE.
"""
# get the population of neurons from params
def get_population_from_label(pop, label):
"""
Given an input population and a label returns a population with the specified label.
This can be the population itself or its root (grandparent)
"""
if pop.label == label:
return pop
elif isinstance(pop, sim.PopulationView) and pop.grandparent.label == label:
return pop.grandparent
else:
return None
def get_population_from_brain(label):
"""
Finds a population in brain with an specified label.
"""
# check if label in brain
if label in tf_config.brain_root.__dict__:
return tf_config.brain_root.__dict__[label]
# check if label in circuit populations
elif 'circuit' in tf_config.brain_root.__dict__:
circuit = tf_config.brain_root.__dict__['circuit']
pops = circuit.populations if isinstance(circuit,sim.Assembly) else [circuit]
for pop in pops:
p = get_population_from_label(pop,label)
if p:
return p
# could not find population
return None
populations = []
for assembly in params['assemblies']:
a = []
for p in assembly:
population = get_population_from_brain(p[0])
if population and p[1] is not None:
population = sim.PopulationView(population,p[1])
a += [population]
if len(a) == 1:
populations += [a[0]]
else:
populations += [sim.Assembly(*a)]
brain_pop = populations[0] if len(populations) == 1 else populations
# perform the actual device creation/connection
if params['type'] == 'sink':
device = self._brain_communicator.register_spike_sink(brain_pop,
params['device'],
**params['params'])
else:
device = self._brain_communicator.register_spike_source(brain_pop,
params['device'],
**params['params'])
# mark the timestamp from CLE process for tracking between processes
device.timestep = params['timestep']
# mark the device as MPI-aware, only used by Nest-specific devices
if not isinstance(brain_pop,list) and isinstance(device, PyNNNestDevice):
setattr(device, 'mpi_aware', True)
else:
for d in device.devices:
if isinstance(d, PyNNNestDevice):
setattr(d, 'mpi_aware', True)
# store the device in a way that we can easily retrieve later
self.devices[device.timestep] = device
def _delete_tf(self, params):
"""
Disconnect the specified device and remove it from tracking.
:param params: The device parameters passed by the CLE.
"""
device = self.devices.pop(params['timestep'], None)
if device is not None:
if device in self._brain_communicator.generator_devices:
self._brain_communicator.unregister_spike_source(device)
else:
self._brain_communicator.unregister_spike_sink(device)
if isinstance(device, ICleanableTransferFunctionParameter):
device.cleanup()
def _load_brain(self, params):
"""
Load/reload the brain file and population definitions.
:param params: The brain parameters passed by the CLE.
"""
# ignore any commands during simulation construction
if not self._ready:
return
# mirror the CLE brain loading order, the CLE process will handle errors
# preserve the brain communicator if there is an error in the brain to mirror the CLE
try:
# discard any previously loaded brain and load the new spec
self._brain_controller.shutdown()
self._brain_controller.load_brain(params['file'])
self._brain_controller.load_populations(params['populations'])
# reinitialize the communication adapter to connect to the "new" brain
self._brain_communicator.shutdown()
self._brain_communicator.initialize()
# remove the list of old TF devices, no need to shut them down
self.devices.clear()
# pylint: disable=bare-except
except:
traceback.print_exc()