Source code for hbp_nrp_cle.brainsim.pynn_spiNNaker.PyNNSpiNNakerControlAdapter

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END


"""
This module contains an adapted implementation of a neural controller for SpiNNaker
"""

import warnings

_spinnaker_not_installed_message = \
    "SpiNNaker is not installed. Please install SpiNNaker before you can use " \
    "the SpiNNaker support of the CLE"

try:
    from spinn_front_end_common.utilities.database.database_connection import DatabaseConnection
    from spinn_front_end_common.utilities import globals_variables
    from spinnman.exceptions import SpinnmanTimeoutException
    spinnaker_not_installed = False
except ImportError:
    warnings.warn(_spinnaker_not_installed_message)
    spinnaker_not_installed = True

from hbp_nrp_cle.brainsim.pynn.PyNNControlAdapter import PyNNControlAdapter, PyNNPopulationInfo
from hbp_nrp_cle.cle.CLEInterface import BrainRuntimeException
import logging
import time
from hbp_nrp_excontrol.logs import clientLogger
from threading import Thread, Condition

logger = logging.getLogger(__name__)


[docs]def all_ids(population): """ Returns all ids of the neurons :param population: The population """ # pylint: disable=protected-access return population._all_ids
[docs]class PySpiNNakerControlAdapter(PyNNControlAdapter): # pragma no cover """ An implementation to control a SpiNNaker board simulation synchronously """ def __init__(self, sim): """ :raise ImportError: When SpiNNaker support is not installed """ if spinnaker_not_installed: warnings.warn(_spinnaker_not_installed_message) raise ImportError(_spinnaker_not_installed_message) super(PySpiNNakerControlAdapter, self).__init__(sim) sim.Population.all = all_ids self._running = False self._ready = False self._exception = None self._ready_sync = Condition()
[docs] def initialize(self, **params): """ Initializes the neuronal simulator :param timestep: The timestep used for the neuronal simulation :param min_delay: The minimum delay :param max_delay: The maximum delay :param threads: The amount of threads that should be used to run the simulation :param rng_seeds: The rng seeds for the simulation :return: True if the simulator is initialized, otherwise False """ if 'timestep' not in params: params['timestep'] = 1.0 if 'min_delay' not in params: params['min_delay'] = 1.0 # store logging setup formatter = logging.root.handlers[0].formatter if logging.root.handlers else None logger.info("resetting spinnaker simulator") # note: this is a short-term workaround and # the underlying issue may get resolved in the future # for defails see # https://github.com/SpiNNakerManchester/sPyNNaker8/issues/373#issuecomment-595108523 # unset_simulator has been renamed to setup_for_unittest see: # SpiNNakerManchester/SpiNNFrontEndCommon/commit/a18a92060cf164ec61b4f738a4d165cffeaecdf4 globals_variables.setup_for_unittest() super(PySpiNNakerControlAdapter, self).initialize(**params) # restore logging setup for handler in logging.root.handlers: handler.setFormatter(formatter) if handler.filters: handler.removeFilter(handler.filters[-1]) self._running = False self._ready = False
def _notify_ready(self): """ Notify """ with self._ready_sync: self._ready = True self._ready_sync.notify_all() def _do_run(self, dt): """ Do run """ try: self._sim.external_devices.run_forever(dt) # pylint: disable=W0703 except Exception as e: logger.exception(e) with self._ready_sync: self._running = False self._ready = False self._exception = e self._ready_sync.notify_all()
[docs] def run_step(self, dt): if not self._running: self._running = True clientLogger.advertise("Brain is loading on to the Spinnaker Board...") connection = DatabaseConnection( start_resume_callback_function=self._notify_ready, local_port=None) self._sim.external_devices.add_database_socket_address( database_notify_host=None, database_notify_port_num=connection.local_port, database_ack_port_num=None) runner = Thread(target=self._do_run, args=[dt]) runner.start() with self._ready_sync: while not self._ready and self._exception is None: clientLogger.advertise("Brain is loading on to the SpiNNaker Board...") self._ready_sync.wait(1.0) # Wait just one more second to avoid issues with the continue time.sleep(1.0) clientLogger.advertise("Brain loading to the Spinnaker Board has been finished.") if self._exception is not None: raise BrainRuntimeException(str(self._exception)) else: sent = False retries = 3 while not sent and retries > 0: try: self._sim.external_devices.continue_simulation() sent = True except SpinnmanTimeoutException: retries -= 1 if retries == 0: raise # Wait a bit more if this happens time.sleep(0.5)
[docs] def shutdown(self): self._sim.external_devices.request_stop() super(PySpiNNakerControlAdapter, self).shutdown()
def _is_population(self, candidate): """ Determines whether the candidate is a population :param candidate: The candidate """ return isinstance(candidate, self._sim.Population) def _create_population_info(self, population, name): """ Creates a population info object for the given population :param population: The population :param name: The name of the population """ celltype = population.celltype parameters = dict(celltype.default_parameters) return PyNNPopulationInfo(population, name, parameters)
[docs] def get_Timeout(self): """ returns The maximum amount of time (in seconds) to wait for the end of this step """ return None