Source code for hbp_nrp_cle.tf_framework

"""
This package contains the classes to specify transfer functions and connect them to communication
adapters of both the neuronal simulator and the world simulator
"""

from builtins import next

from builtins import range
from future.utils import raise_
from six import PY3

from hbp_nrp_excontrol.restricted_python import _inplacevar_
from hbp_nrp_cle.tf_framework._TfApi import TfApi

TF_API = TfApi()


[docs]class UserCodeException(Exception): """ General exception class returning a meaningful message to the ExD frontend when user code fails to be loaded or run. :param message: message that needs to be forwarded to the frontend. :param error_type: Type of error (like 'CLE Error') """ def __init__(self, message, error_type): super(UserCodeException, self).__init__() self.error_type = error_type self.message = message def __str__(self): return "{0} ({1})".format(repr(self.message), self.error_type)
[docs]class BrainParameterException(Exception): """ Exception raised when a brain connection parameter fails to create the underlying adapter :param source: the exception source :param message: the brain """ def __init__(self, message): super(BrainParameterException, self).__init__() self.message = message
[docs]class TFException(UserCodeException): """ Exception class used to return a meaningful message to ExD front-end in case the update of TF's user code fails. :param tf_name: name of the TF updated by the user. :param message: message that needs to be forwarded to the front-end. """ def __init__(self, tf_name, message, error_type): super(TFException, self).__init__(message, error_type) self.tf_name = tf_name def __str__(self): return "{0}: {1} ({2})".format(self.tf_name, repr(self.message), self.error_type)
[docs]class TFRunningException(UserCodeException): """ Exception class used to communicate the TF with the TransferFunction manager :param tf_name: name of the TF updated by the user. :param message: message that needs to be forwarded to the front-end. """ def __init__(self, message): super(TFRunningException, self).__init__( message, 'TF Running Exception')
[docs]class TFLoadingException(TFException): """ Exception class used to return a meaningful message to ExD front-end in case the loading of a TF with updated user code fails. :param tf_name: name of the TF updated by the user. :param message: message that needs to be forwarded to the front-end. """ def __init__(self, tf_name, message): super(TFLoadingException, self).__init__( tf_name, message, 'TF Loading Exception')
from . import config from ._PropertyPath import PropertyPath, RangeSegment, CustomSegment
[docs]def resolve_brain_variable(var): """ Resolves the given brain variable for the current brain :param var: The brain variable :return: If the variable does not depend on the neural network, it is returned unchanged. Otherwise, it is resolved for the current neural network """ if isinstance(var, PropertyPath): return var.select(config.brain_root, config.active_node.brain_adapter) return var
from hbp_nrp_cle.brainsim.BrainInterface import IFixedSpikeGenerator, \ ILeakyIntegratorAlpha, ILeakyIntegratorExp, IPoissonSpikeGenerator, IRawSignal, \ IDCSource, IACSource, INCSource, IPopulationRate, ISpikeRecorder, ISpikeInjector import logging logger = logging.getLogger(__name__) # alias _Facade module needed by set_transfer_function import sys nrp = sys.modules[__name__] # CLE restricted python environment # TODO(Luc): create a module for it from RestrictedPython.PrintCollector import PrintCollector if PY3: from RestrictedPython.Guards import guarded_iter_unpack_sequence, guarded_unpack_sequence from operator import getitem # pylint: disable=unused-import from ._Neuron2Robot import Neuron2Robot, MapSpikeSink, MapSpikeSource from ._Robot2Neuron import Robot2Neuron, MapRobotPublisher, \ MapRobotSubscriber from ._MapRobotServiceProxy import MapRobotServiceProxy from hbp_nrp_cle.tf_framework._TransferFunction import TransferFunction, FlawedTransferFunction from hbp_nrp_cle.tf_framework._CSVRecorder import MapCSVRecorder, CSVRecorder from hbp_nrp_cle.tf_framework._NeuronMonitor import NeuronMonitor from hbp_nrp_cle.tf_framework._GlobalData import MapVariable, GLOBAL, TRANSFER_FUNCTION_LOCAL from hbp_nrp_cle.tf_framework._CleanableTransferFunctionParameter \ import ICleanableTransferFunctionParameter from . import _TransferFunctionManager, _NeuronSelectors from ._TransferFunctionInterface import ITransferFunctionManager from hbp_nrp_cle.robotsim.RobotInterface import Topic, IRobotCommunicationAdapter import std_msgs.msg from std_msgs.msg import Float32, Int32, String import cle_ros_msgs.msg import geometry_msgs.msg from geometry_msgs.msg import Point, Pose, Quaternion import sensor_msgs.msg import gazebo_msgs.msg import dynamic_reconfigure.msg import tf2_msgs.msg import hbp_nrp_cle.tf_framework.tf_lib from hbp_nrp_excontrol.logs import clientLogger from cv_bridge import CvBridge import cv2 import numpy as np __author__ = 'Georg Hinkel' _getattr_ = getattr _getitem_ = getitem _getiter_ = iter _print_ = PrintCollector if PY3: # required by RestrictedPython # https://restrictedpython.readthedocs.io/en/python3_update/usage/index.html#necessary-setup __metaclass__ = type _iter_unpack_sequence_ = guarded_iter_unpack_sequence _unpack_sequence_ = guarded_unpack_sequence def _cle_write_guard(): """ Defines the write guard for execution of user code in restricted mode. """ def guard(ob): """ No guard at all """ return ob return guard cle_write_guard = _cle_write_guard() _write_ = cle_write_guard leaky_integrator_alpha = ILeakyIntegratorAlpha leaky_integrator_exp = ILeakyIntegratorExp fixed_frequency = IFixedSpikeGenerator poisson = IPoissonSpikeGenerator dc_source = IDCSource ac_source = IACSource nc_source = INCSource population_rate = IPopulationRate spike_recorder = ISpikeRecorder injector = ISpikeInjector raw_signal = IRawSignal brain = PropertyPath()
[docs]def nrange(start, stop, step=None): """ Defines a range of neurons :param start: The start of the range :param stop: The stop of the range :param step: The step of the range """ return RangeSegment(start, stop, step)
[docs]def resolve(fun): """ Resolves the given function when the neural network is available :param fun: The function that selects the item from the network """ return CustomSegment(fun)
[docs]def map_neurons(neuron_range, mapping): """ Maps the given range to neurons using the provided mapping :param neuron_range: A range that can be iterated :param mapping: A mapping function or lambda """ return _NeuronSelectors.MapNeuronSelector(neuron_range, mapping)
[docs]def chain_neurons(*neuron_selectors): """ Chains the given neuron selectors :param neuron_selectors: The neuron selectors """ return _NeuronSelectors.ChainNeuronSelector(neuron_selectors)
[docs]def initialize(name): # -> None: """ Initializes and starts the TF node :param name: The name of the TF node """ config.active_node.initialize(name)
[docs]def set_nest_adapter(nest_adapter): # -> None: """ Sets the brainsim adapter. :param nest_adapter: The brainsim adapter .. WARNING:: Must be executed before tf node initialization """ config.active_node.brain_adapter = nest_adapter
[docs]def set_robot_adapter(robot_adapter): # -> None: """ Sets the robot adapter. :param robot_adapter: The robot adapter .. WARNING:: Must be executed before tf node initialization """ config.active_node.robot_adapter = robot_adapter
[docs]def start_new_tf_manager(): """ Start a new transfer function manager """ config.active_node = _TransferFunctionManager.TransferFunctionManager() config.csv_recorders = [] config.brain_populations = None config.brain_root = None config.brain_source = None
[docs]def get_transfer_functions(flawed=True): """ Get all the transfer functions if flawed is True, only (R2N, N2R, Silent) otherwise :return: All the transfer functions if flawed is True, only (R2N, N2R, Silent) otherwise. """ proper_tfs = config.active_node.n2r + \ config.active_node.r2n + config.active_node.silent return proper_tfs + config.active_node.flawed if flawed else proper_tfs
[docs]def get_flawed_transfer_function(name): """ Get the flawed transfer function with the given name :param name: The name of the flawed transfer function :return: The flawed transfer function with the given name """ return next((f_tf for f_tf in config.active_node.flawed if f_tf.name == name), None)
[docs]def get_transfer_function(name): """ Get the transfer function with the given name :param name: The name of the transfer function :return: The transfer function with the given name """ return next((tf for tf in get_transfer_functions() if tf.name == name), None)
[docs]def activate_transfer_function(tf, activate): """ Set the activation state of the transfer function In case of errors the change is not applied. :param tf: the tf to (de-)activate :param activate: a boolean value denoting the new activation state """ config.active_node.activate_tf(tf, activate)
[docs]def get_brain_source(): """ Get the source of the brain (if loaded from a python file). Otherwise, returns None. :return: The source of the brain model """ return config.brain_source
[docs]def get_brain_populations(): """ Get the brain populations as a dictionary If the brain model is not loaded, the function returns None. :return: A dictionary containing the brain populations The dictionary keys are population names and its values are one of the following types: list, or a 'slice' dictionary of the following form {'from': 1, 'to': 10, 'step': 1}. """ return config.brain_populations
[docs]def dump_csv_recorder_to_files(): """ Find out all CSV recorders and dump their values to CSV files. :return: an array containing a string with the CSV filename, an array containing the CSV headers separated by a comma and an array containing the CSV values """ result = [] for tf in get_transfer_functions(flawed=False): for param in tf.params[1:]: if isinstance(param, CSVRecorder): name = param.get_csv_recorder_name() headers = param.get_csv_headers() values = param.cleanup() result.append([name, headers, values]) return result
[docs]def clean_csv_recorders_files(): """ Clean out all CSV recorders generated files. """ for tf in get_transfer_functions(flawed=False): for param in tf.params[1:]: if isinstance(param, CSVRecorder): param.cleanup()
[docs]def delete_transfer_function(name): """ Delete a transfer function. If the transfer function does not exist, nothing will happen. :param name: The name of the transfer function :return: True if the transfer function is correctly deleted. False if the transfer function does not exist. """ tf = get_transfer_function(name) is_flawed_deleted = False if delete_flawed_transfer_function(name): is_flawed_deleted = True if tf in config.active_node.n2r: config.active_node.n2r.remove(tf) elif tf in config.active_node.r2n: config.active_node.r2n.remove(tf) elif tf in config.active_node.silent: config.active_node.silent.remove(tf) else: return is_flawed_deleted tf.unregister() brain_adapter = config.active_node.brain_adapter robot_adapter = config.active_node.robot_adapter for i in range(1, len(tf.params)): if tf.params[i] in brain_adapter.detector_devices: brain_adapter.unregister_spike_sink(tf.params[i]) elif tf.params[i] in robot_adapter.published_topics: robot_adapter.unregister_publish_topic(tf.params[i]) elif tf.params[i] in brain_adapter.generator_devices: brain_adapter.unregister_spike_source(tf.params[i]) elif tf.params[i] in robot_adapter.subscribed_topics: robot_adapter.unregister_subscribe_topic(tf.params[i]) if isinstance(tf.params[i], ICleanableTransferFunctionParameter): tf.params[i].cleanup() return True
[docs]def delete_flawed_transfer_function(name): """ Delete a flawed transfer function. If the transfer function does not exist, nothing will happen. :param name: The name of the transfer function :return: True if the transfer function is correctly deleted. False if the transfer function does not exist. """ result = True tf = get_flawed_transfer_function(name) if tf: config.active_node.flawed.remove(tf) else: result = False return result
[docs]def set_transfer_function(new_source, new_code, new_name, activation=True, priority=None): """ Apply transfer function changes made by a client :param new_source: Transfer function's updated source :param new_code: Compiled code of the updated source :param new_name: Transfer function's updated name :param activation: Activation state of the transfer function :param priority: execution order of the transfer function. Transfer functions with higher priority are executed first. """ # pylint: disable=broad-except try: # pylint: disable=exec-used exec(new_code) tf = get_transfer_function(new_name) if not isinstance(tf, TransferFunction): raise Exception("Transfer function has no decorator specifying its type") config.active_node.initialize_tf(tf, activation) except Exception as e: tb = sys.exc_info()[2] logger.error("Error while loading new transfer function") logger.exception(e) delete_transfer_function(new_name) raise_(TFLoadingException(new_name, str(e)), None, tb) # we set the new source in an attribute because inspect.getsource won't work after exec # indeed inspect.getsource is based on a source file object # see findsource in http://www.opensource.apple.com/source/python/python-3/python/Lib/inspect.py tf.source = new_source if priority is not None: tf.priority = priority
[docs]def set_flawed_transfer_function(source, name="NO_NAME", error=None): """ Creates a new flawed transfer function, i.e. a TF that is not valid due to some error in the source code. The user will correct it at a later stage (e.g. during an experiment) :param source: the source code :param name: The name of the transfer function :param error: the Exception raised during the compilation/loading of the code """ config.active_node.flawed.append( FlawedTransferFunction(name, source, error))
start_new_tf_manager()