# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
Extensions of the base CLE PyNNCommunicationAdapter to communicate with distributed
processes. Maxmimum code reuse and minimal duplication where possible.
"""
from builtins import range
from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter
from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice
from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
import hbp_nrp_cle.tf_framework.config as tf_config
from hbp_nrp_cle.brainsim import COMM_NRP
import pyNN.nest as sim
import logging
import time
logger = logging.getLogger(__name__)
[docs]class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter):
"""
Represents a distributed Nest communication adapter for the neuronal simulation
"""
[docs] def initialize(self):
"""
Marks the adapter as initialized.
"""
logger.info("Distributed Nest communication adapter initialized")
super(DistributedPyNNCommunicationAdapter, self).initialize()
[docs] def register_spike_source(self, populations, spike_generator_type, **params):
"""
Intercepts default PyNNNestCommunicationAdapter request and notifies all other
processes to do the same.
:param populations: A reference to the populations to which the spike generator
should be connected
:param spike_generator_type: A spike generator type (see documentation
or a list of allowed values)
:param params: A dictionary of configuration parameters
:return: A communication object or a group of objects
"""
ts = self.__notify_processes_register('source', populations, spike_generator_type, params)
device = super(DistributedPyNNCommunicationAdapter, self). \
register_spike_source(populations, spike_generator_type, **params)
device.timestep = ts
# mark the device as MPI-aware, only used by Nest-specific devices
if not isinstance(populations, list) and isinstance(device, PyNNNestDevice):
setattr(device, 'mpi_aware', True)
else:
for d in device.devices:
if isinstance(d, PyNNNestDevice):
setattr(d, 'mpi_aware', True)
return device
[docs] def register_spike_sink(self, populations, spike_detector_type, **params):
"""
Intercepts default PyNNNestCommunicationAdapter request and notifies all other
processes to do the same.
:param populations: A reference to the populations which should be connected
to the spike detector
:param spike_detector_type: A spike detector type (see documentation
for a list of allowed values)
:param params: A dictionary of configuration parameters
:return: A Communication object or a group of objects
"""
ts = self.__notify_processes_register('sink', populations, spike_detector_type, params)
device = super(DistributedPyNNCommunicationAdapter, self). \
register_spike_sink(populations, spike_detector_type, **params)
device.timestep = ts
# mark the device as MPI-aware, only used by Nest-specific devices
if isinstance(device, PyNNNestDevice):
setattr(device, 'mpi_aware', True)
return device
[docs] def unregister_spike_source(self, device):
"""
Disconnects and unregisters the given spike generator device and notifies all other
processes to do the same.
:param device: The spike generator device to deregister.
"""
self.__notify_processes_unregister(device.timestep)
super(DistributedPyNNCommunicationAdapter, self).unregister_spike_source(device)
[docs] def unregister_spike_sink(self, device):
"""
Disconnects and unregisters the given spike detector device and notifies all other
processes to do the same.
:param device: The spike detector device to deregister.
"""
self.__notify_processes_unregister(device.timestep)
super(DistributedPyNNCommunicationAdapter, self).unregister_spike_sink(device)
@staticmethod
def __notify_processes_register(kind, populations, device, params):
"""
Notify remote MPI Brain Processes that they must complete this transfer function
connection by duplicating the parameters for this device connection.
:param kind: Either 'source' or 'sink'.
:param populations: The target population to connect to.
:param device: The device to create.
:param params: The dictionary of adapter params to use.
"""
# timestamp the adapter creation, we have no other way of tracking it between
# the CLE and remote processes
timestep = int(round(time.time() * 1e6))
# population information to send to the remote MPI nodes, we can't pickle Populations
# directly and those references wouldn't be valid on the remote nodes anyway
# create a structure describing 'populations'
br = tf_config.brain_root
assemblies = []
if not isinstance(populations, list):
populations = [populations]
def index_from_population_view(p):
"""
Returns neuron indices of a PopulationView in the root Population and the root label
"""
label = p.grandparent.label
indices = p.index_in_grandparent(list(range(p.size)))
return label, indices
def index_from_assembly(a):
"""
Returns neuron indices and labels for each population in an Assembly
"""
return [index_from_population_view(p) if isinstance(p, sim.PopulationView) else
(p.label, None) for p in a.populations]
for population in populations:
if isinstance(population, sim.Population):
assemblies += [[(population.label, None)]]
elif isinstance(population, sim.PopulationView):
assemblies += [[index_from_population_view(population)]]
elif isinstance(population, sim.Assembly):
assemblies += [index_from_assembly(population)]
# propagate the synapse creation parameters to all remote notes, they will run the same
# connection/creation commands after receiving these messages, guaranteed to be
# run from CLE MPI process 0 only
for rank in range(1, COMM_NRP.Get_size()):
COMM_NRP.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies,
'device': device, 'timestep': timestep,
'params': params},
dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)
# return the timestep for the device in this process
return timestep
@staticmethod
def __notify_processes_unregister(timestep):
"""
Notify remote MPI Brain Processes that they must delete transfer function
connection by duplicating the calls in this process.
:param timestep: The creation timestep of the TF to delete.
"""
# propagate the deletion configuration to all other processes, guaranteed to be
# run from CLE MPI process 0 only
for rank in range(1, COMM_NRP.Get_size()):
COMM_NRP.send({'command': 'DeleteTF', 'timestep': timestep},
dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)