Source code for hbp_nrp_distributed_nest.cle.DistributedPyNNCommunicationAdapter

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
Extensions of the base CLE PyNNCommunicationAdapter to communicate with distributed
processes. Maxmimum code reuse and minimal duplication where possible.
"""
from builtins import range
from hbp_nrp_cle.brainsim.pynn_nest.PyNNNestCommunicationAdapter import PyNNNestCommunicationAdapter
from hbp_nrp_cle.brainsim.pynn_nest.devices.__NestDeviceGroup import PyNNNestDevice

from hbp_nrp_distributed_nest.launch.NestBrainProcess import NestBrainProcess
import hbp_nrp_cle.tf_framework.config as tf_config

from hbp_nrp_cle.brainsim import COMM_NRP

import pyNN.nest as sim

import logging
import time

logger = logging.getLogger(__name__)


[docs]class DistributedPyNNCommunicationAdapter(PyNNNestCommunicationAdapter): """ Represents a distributed Nest communication adapter for the neuronal simulation """
[docs] def initialize(self): """ Marks the adapter as initialized. """ logger.info("Distributed Nest communication adapter initialized") super(DistributedPyNNCommunicationAdapter, self).initialize()
[docs] def register_spike_source(self, populations, spike_generator_type, **params): """ Intercepts default PyNNNestCommunicationAdapter request and notifies all other processes to do the same. :param populations: A reference to the populations to which the spike generator should be connected :param spike_generator_type: A spike generator type (see documentation or a list of allowed values) :param params: A dictionary of configuration parameters :return: A communication object or a group of objects """ ts = self.__notify_processes_register('source', populations, spike_generator_type, params) device = super(DistributedPyNNCommunicationAdapter, self). \ register_spike_source(populations, spike_generator_type, **params) device.timestep = ts # mark the device as MPI-aware, only used by Nest-specific devices if not isinstance(populations, list) and isinstance(device, PyNNNestDevice): setattr(device, 'mpi_aware', True) else: for d in device.devices: if isinstance(d, PyNNNestDevice): setattr(d, 'mpi_aware', True) return device
[docs] def register_spike_sink(self, populations, spike_detector_type, **params): """ Intercepts default PyNNNestCommunicationAdapter request and notifies all other processes to do the same. :param populations: A reference to the populations which should be connected to the spike detector :param spike_detector_type: A spike detector type (see documentation for a list of allowed values) :param params: A dictionary of configuration parameters :return: A Communication object or a group of objects """ ts = self.__notify_processes_register('sink', populations, spike_detector_type, params) device = super(DistributedPyNNCommunicationAdapter, self). \ register_spike_sink(populations, spike_detector_type, **params) device.timestep = ts # mark the device as MPI-aware, only used by Nest-specific devices if isinstance(device, PyNNNestDevice): setattr(device, 'mpi_aware', True) return device
[docs] def unregister_spike_source(self, device): """ Disconnects and unregisters the given spike generator device and notifies all other processes to do the same. :param device: The spike generator device to deregister. """ self.__notify_processes_unregister(device.timestep) super(DistributedPyNNCommunicationAdapter, self).unregister_spike_source(device)
[docs] def unregister_spike_sink(self, device): """ Disconnects and unregisters the given spike detector device and notifies all other processes to do the same. :param device: The spike detector device to deregister. """ self.__notify_processes_unregister(device.timestep) super(DistributedPyNNCommunicationAdapter, self).unregister_spike_sink(device)
@staticmethod def __notify_processes_register(kind, populations, device, params): """ Notify remote MPI Brain Processes that they must complete this transfer function connection by duplicating the parameters for this device connection. :param kind: Either 'source' or 'sink'. :param populations: The target population to connect to. :param device: The device to create. :param params: The dictionary of adapter params to use. """ # timestamp the adapter creation, we have no other way of tracking it between # the CLE and remote processes timestep = int(round(time.time() * 1e6)) # population information to send to the remote MPI nodes, we can't pickle Populations # directly and those references wouldn't be valid on the remote nodes anyway # create a structure describing 'populations' br = tf_config.brain_root assemblies = [] if not isinstance(populations, list): populations = [populations] def index_from_population_view(p): """ Returns neuron indices of a PopulationView in the root Population and the root label """ label = p.grandparent.label indices = p.index_in_grandparent(list(range(p.size))) return label, indices def index_from_assembly(a): """ Returns neuron indices and labels for each population in an Assembly """ return [index_from_population_view(p) if isinstance(p, sim.PopulationView) else (p.label, None) for p in a.populations] for population in populations: if isinstance(population, sim.Population): assemblies += [[(population.label, None)]] elif isinstance(population, sim.PopulationView): assemblies += [[index_from_population_view(population)]] elif isinstance(population, sim.Assembly): assemblies += [index_from_assembly(population)] # propagate the synapse creation parameters to all remote notes, they will run the same # connection/creation commands after receiving these messages, guaranteed to be # run from CLE MPI process 0 only for rank in range(1, COMM_NRP.Get_size()): COMM_NRP.send({'command': 'ConnectTF', 'type': kind, 'assemblies': assemblies, 'device': device, 'timestep': timestep, 'params': params}, dest=rank, tag=NestBrainProcess.MPI_MSG_TAG) # return the timestep for the device in this process return timestep @staticmethod def __notify_processes_unregister(timestep): """ Notify remote MPI Brain Processes that they must delete transfer function connection by duplicating the calls in this process. :param timestep: The creation timestep of the TF to delete. """ # propagate the deletion configuration to all other processes, guaranteed to be # run from CLE MPI process 0 only for rank in range(1, COMM_NRP.Get_size()): COMM_NRP.send({'command': 'DeleteTF', 'timestep': timestep}, dest=rank, tag=NestBrainProcess.MPI_MSG_TAG)