Instantiating a Computational Graph in Python

This page describes how to create a Computational Graph, instantiating Input, Output and Functional nodes and connecting them with edges. The created graph can be afterwards executed by the EventLoop. For this purpose, a set of Python decorators are provided; their use is described below.

The main decorator is @FunctionalNode, which takes a Python function and creates a Functional Node from it.

The rest of the decorators always decorate a @FunctionalNode. They allow to create Input and Output Nodes of the different available types and connect the decorated Functional Node to them.

All the decorators are defined in the nrp_core.event_loop Python module.

@FunctionalNode

Functional Nodes can be created from a Python function using the @FunctionalNode decorator. The Node will then execute this function every Event Loop cycle.

The code snippet below shows the creation of a simple Functional Node which just forward each of its two inputs to its two outputs.

@FunctionalNode(name="my_node", outputs=['output_1', 'output_2'], exec_policy=node_policies.functional_node.exec_policy.on_new_message)
def my_function(input_1, input_2):
    return [input_1, input_2]

The decorator has three arguments:

  • name : string used as the node id in the graph and must be therefore unique in the graph

  • outputs : a list of strings declaring the number and names of outputs of this node.

  • policy (optional): the execution policy of the node. Its value must be of type node_policies.functional_node.exec_policy, an enum with possible values: always and on_new_message. Its default value is on_new_message.

Each of the declared outputs can be connected to other nodes using decorators. Each of the input arguments of the function can be connected to other nodes too.

The decorated Python function can return either None or a list with length equal to the number of declared outputs in the Functional Node decorator. Otherwise a runtime error is triggered.

Returning None can be used in cases when the function can’t produce valid outputs.

@FromFunctionalNode

This decorator allows to connect one Functional Node input port to another Functional Node output port. It, therefore, creates an edge in the Computational Graph between two Functional Nodes.

The code below will create a Functional Node with one input, input_1, and one output output_1, and connect input_1 to an output Port with id output_2 from a node with id my_node.

@FromFunctionalNode(keyword='input_1', address='/my_node/output_2')
@FunctionalNode(name="my_node_2", outputs=['output_1'])
def my_function_2(input_1):
    return [input_1]

The decorator has two arguments:

  • keyword : id of the input port in the decorated functional node which will be connected.

  • address : address of the output port which will be connected.

The expected syntax for the output port address is /node_id/port_id. If a node with id node_id doesn’t exist or doesn’t have an output with id port_id, a runtime error will occur.

@FromEngine

This decorator creates an InputEngineNode and connects it to one of the decorated Functional Node inputs. It, therefore, creates a node and an edge in the Computational Graph.

The code below extends on a previous example by connecting input_1 in my_node to an output my_datapack of an InputEngineNode with id my_engine.

@FromEngine(keyword='input_1', address='/my_engine/my_datapack')
@FunctionalNode(name="my_node", outputs=['output_1', 'output_2'])
def my_function(input_1, input_2):
    return [input_1, input_2]

If my_engine doesn’t exist in the graph it is created. Then an Output Port my_datapack is registered with my_engine and datapack id my_datapack is added to the node requested datapacks. See here for more details.

The decorator has three arguments:

  • keyword : id of the input port in the decorated functional node which will be connected.

  • address : address of the output port which will be connected.

  • cache_policy (optional): the message cache policy of the node. Its value must be of type node_policies.input_node.msg_cache, an enum with possible values: clear and keep. Its default value is keep.

@ToEngine

This decorator creates an OutputEngineNode and connects it to one of the decorated Functional Node outputs. It works in a very similar way to the previous one.

The code below extends the previous example by connecting output_1 in my_node to an OutputEngineNode with id my_engine.

@ToEngine(keyword="output_1", address="/my_engine")
@FromEngine(keyword='input_1', address='/my_engine/my_datapack')
@FunctionalNode(name="my_node", outputs=['output_1', 'output_2'])
def my_function(input_1, input_2):
    return [input_1, input_2]

The decorator has two arguments:

  • keyword : id of the output port in the decorated functional node which will be connected.

  • address : address of the input port which will be connected.

It can be noted that from the example above, according to the descriptions given, two nodes with the same id, my_engine, would be created. Since node ids must be unique in the graph and in order to still allow using the same Engine name for both Input and Output Engine node ids, when an OutputEngineNode is created with a particular id, it internally adds the postscript "_output" to it. In the same way, InputEngineNodes add ‘“_input”’ to their ids. In this way, from the example above three nodes will be created with ids: my_engine_input, my_engine_output and my_node.

@RosSubscriber

This decorator creates an InputROSNode and connects it to one of the decorated Functional Node inputs. It creates, then, a node and an edge in the Computational Graph.

The code below extends on the previous example by connecting input_2 in my_node to an output port /my_topic of an InputROSNode with id /my_topic.

@RosSubscriber(keyword="input_2", address="/my_topic", type=nrp_core.data.nrp_ros.std_msgs.String)
@ToEngine(keyword="output_1", address="/my_engine/my_datapack")
@FromEngine(keyword='input_1', address='/my_engine/my_datapack')
@FunctionalNode(name="my_node", outputs=['output_1', 'output_2'])
def my_function(input_1, input_2):
    return [input_1, input_2]

The decorator has five arguments:

  • keyword : as in other decorators, specifies the name of the Functional Node input port that the decorator connects.

  • address : tells the ROS topic to subscribe to.

  • type : the ROS message type that is received through this ROS topic.

  • cache_policy (optional): the message cache policy of the node. Its value must be of type node_policies.input_node.msg_cache, an enum with possible values: clear and keep. Its default value is keep.

  • publish_policy (optional): the message publish policy of the node. Its value must be of type node_policies.input_node.msg_publish, an enum with possible values: last and all. Its default value is last.

In this case, a new InputROSNode is created to subscribe to each different topic. From each @RosSubscriber decorator a node with id address is created, and an OuputPort with id also address is added to it.

The value used in the type argument of the decorator must be a class from nrp_core.data.nrp_ros Python module. These are the ROS message types supported in NRP-core. For more information about which are the message types supported by default see here. For information about how to compile NRP-core to support additional ROS message types see here.

@RosPublisher

This decorator creates an OutputROSNode and connects it to one of the decorated Functional Node outputs. It adds a node and an edge to the Computational Graph.

The code below extends on a previous example by connecting input_1 in my_node_2 to an input port /my_topic_2 of an OutputROSNode with id /my_topic_2.

@RosPublisher(keyword="output_1", address="/my_topic_2", type=nrp_core.data.nrp_ros.std_msgs.String)
@FromFunctionalNode(keyword='input_1', address='/my_node/output_2')
@FunctionalNode(name="my_node_2", outputs=['output_1'])
def my_function_2(input_1):
    return [input_1]

The decorator has the next arguments:

  • keyword : specifies the name of the Functional Node output port that the decorator connects.

  • address : tells the ROS topic to publish to.

  • type : the ROS message type the ROS topic this node publishes too.

  • compute_period (optional): ComputePeriod of the node, 1 by default.

  • publish_from_cache (optional): publishFromCache property of the node, false by default.

It is not possible to subscribe and publish to the same topic from a graph. Attempting to do this will result in having two nodes with the same id, which is not allowed.

@FromSpiNNaker

This decorator creates an InputSpiNNakerNode and connects it to one of the decorated Functional Node inputs. It, therefore, creates a node and an edge in the Computational Graph.

The code below connects input_1 in my_node to the spiking output of a Population pop_1 defined in SpiNNaker PyNN code. Note that the label of the Population is the import part, not the Python variable name. Also note that the Population must be activated for live output for this to be recieved by the function.

import pyNN.spiNNaker as sim
sim.setup(1.0)
my_pop_1 = sim.Population(5, sim.IF_curr_exp(), label="pop_1")
sim.external_devices.activate_live_output_for(my_pop_1, notify=False)

@FromSpiNNaker(keyword='input_1', address='pop_1')
@FunctionalNode(name="my_node", outputs=[])
def my_function(input_1):
    print(f"Spikes received from {input_1['label']} at time {input_1['time']}:"
          f" {input_1['neuron_ids']}")
    return []

The decorator has four arguments:

  • keyword : specifies the name of the Functional Node input port that the decorator connects.

  • address : the SpiNNaker population to receive spikes or voltage from.

  • cache_policy (optional): the message cache policy of the node. Its value must be of type node_policies.input_node.msg_cache, an enum with possible values: clear and keep. Its default value is keep.

  • publish_policy (optional): the message publish policy of the node. Its value must be of type node_policies.input_node.msg_publish, an enum with possible values: last and all. Its default value is last.

For a complete example using Spinnaker node decorators see examples/event_loop_examples/husky_braitenberg_spinnaker. In this experiment can be seen how to use @FromSpiNNaker decorator to access neuron voltages from the Computational Graph.

@ToSpiNNaker

This decorator creates an OutputSpiNNakerNode and connects it to one of the decorated Functional Node outputs. It, therefore, creates a node and an edge in the Computational Graph.

The code below connects output_1 in my_node to a SpikeInjector Population pop_1 defined in SpiNNaker PyNN code. Note that the label of the Population is the import part, not the Python variable name.

import pyNN.spiNNaker as sim
sim.setup(1.0)
my_pop_1 = sim.Population(5, sim.SpikeInjector(), label="pop_1")
sim.external_devices.activate_live_output_for(my_pop_1, notify=False)

@ToSpiNNaker(keyword='output_1', address='pop_1')
@FunctionalNode(name="my_node", outputs=[output_1])
def my_function():
    out = NlohmannJson()
    out['neuron_ids'] = list(range(5))
    return [out]

The decorator has two arguments:

  • keyword : specifies the name of the Functional Node output port that the decorator connects.

  • address : the SpiNNaker population to send spikes to or update the rates of.

  • compute_period (optional): ComputePeriod of the node, 1 by default.

  • publish_from_cache (optional): publishFromCache property of the node, false by default.

In the aforementioned experiment located at examples/event_loop_examples/husky_braitenberg_spinnaker can be seen how to use @ToSpiNNaker decorator to control the rate of a SpikeSourcePoisson.

@MQTTSubscriber

This decorator creates an InputMQTTNode and connects it to one of the decorated Functional Node inputs. It creates, then, a node and an edge in the Computational Graph.

The code below subscribes to an MQTT topic ‘test_topic’ and connects incoming msgs to ‘my_function’ ‘mqtt_in’ argument. These messages are printed out inside of the function and a new message published to ‘test_topic’ from the function return value. In this way, in the created Computational Graph, the node ‘mqtt_str’ prints out its own published message in a loop.

@MQTTSubscriber(keyword='mqtt_in', address="test_topic", type=str)
@MQTTPublisher(keyword="mqtt_out", address="test_topic", type=str)
@FunctionalNode(name="mqtt_str", outputs=["mqtt_out"], exec_policy=node_policies.functional_node.exec_policy.always)
def my_function(mqtt_in):
    if mqtt_in is not None:
        print('Message from mqtt str: {}'.format(mqtt_in))

    return ["Hi there!"]

The decorator has five arguments:

  • keyword : as in other decorators, specifies the name of the Functional Node input port that the decorator connects.

  • address : tells the MQTT topic to subscribe to.

  • type : the data type incoming MQTT messages will be converted to (see here).

  • cache_policy (optional): the message cache policy of the node. Its value must be of type node_policies.input_node.msg_cache, an enum with possible values: clear and keep. Its default value is keep.

  • publish_policy (optional): the message publish policy of the node. Its value must be of type node_policies.input_node.msg_publish, an enum with possible values: last and all. Its default value is last.

Below are listed the possible valid values for the type argument in the decorator:

  • ‘str’: incoming messages payload is passed to the connected function as a string

  • ‘nrp_core.data.nrp_json.NlohmannJson’: payload is passed as JSON objects

  • Any of the protobuf message types compiled with NRP-Core. These can be found under the Python module: ‘nrp_core.data.nrp_protobuf’.

  • ‘nrp_core.data.nrp_json.JsonDataPack’

  • Any of the protobuf datapack types defined in ‘nrp_core.data.nrp_protobuf’

‘str’ is always safe to use, but in case of using any of the other types a runtime error will occur if incoming MQTT message can’t be converted to that type. See the experiment ‘examples/event_loop_examples/mqtt_simple’ for examples of mqtt decorators using each of the supported types.

@MQTTPublisher

This decorator creates an OutputMQTTNode and connects it to one of the decorated Functional Node outputs. It adds a node and an edge to the Computational Graph.

The code below replicates the one above but using ‘NlohmannJson’ data type.

from nrp_core.data.nrp_json import NlohmannJson

@MQTTSubscriber(keyword='mqtt_in', address="test_topic_json", type=NlohmannJson)
@MQTTPublisher(keyword="mqtt_out", address="test_topic_json", type=NlohmannJson)
@FunctionalNode(name="mqtt_json", outputs=["mqtt_out"], exec_policy=node_policies.functional_node.exec_policy.always)
def my_function(mqtt_in):
    if mqtt_in is not None:
        print('Message from mqtt json: {}'.format(mqtt_in))

    out = NlohmannJson()
    out['hi'] = 'there'

    return [out]
  • keyword : specifies the name of the Functional Node output port that the decorator connects.

  • address : the MQTT topic to publish to.

  • type : the data type that published MQTT messages will be converted from (see here).

  • compute_period (optional): ComputePeriod of the node, 1 by default.

  • publish_from_cache (optional): publishFromCache property of the node, false by default.

@Clock

This decorator creates an InputClockNode and connects it to one of the decorated Functional Node inputs. It adds a node and an edge to the Computational Graph.

The code below would create a Functional Node which reads the current system clock and prints it out.

@Clock('clock_in')
@FunctionalNode(name="clock_example", outputs=[], exec_policy=node_policies.functional_node.exec_policy.always)
def my_function(clock_in):
    print("current system time: {}".format(clock_in))
    return []

As explained in this section, depending on whether the Computational Graph is being managed by an EventLoop or an FTILoop, the InputClockNode will stream either the current value of the internal (real-time) clock of the EventLoop, wrt the time point in which the loop was started, or the current simulation time, in the case of using an FTILoop.

  • keyword : specifies the name of the Functional Node output port that the decorator connects.

@Iteration

This decorator creates an InputIterationNode and connects it to one of the decorated Functional Node inputs. It adds a node and an edge to the Computational Graph.

The code below would create a Functional Node which reads the current loop iteration number and prints it out.

@Iteration('iteration_in')
@FunctionalNode(name="iteration_example", outputs=[], exec_policy=node_policies.functional_node.exec_policy.always)
def my_function(iteration_in):
    print("current loop iteration number: {}".format(iteration_in))
    return []

Either if the Computational Graph is being managed by an EventLoop or an FTILoop, the InputIterationNode streams the number of times the loop has been run.

  • keyword : specifies the name of the Functional Node output port that the decorator connects.

Putting the Examples Together

To conclude the overview of the available Computational Graph decorators lets put together the different examples shown in the sections above:

@RosSubscriber(keyword="input_2", address="/my_topic", type=nrp_core.data.nrp_ros.std_msgs.String)
@ToEngine(keyword="output_1", address="/my_engine")
@FromEngine(keyword='input_1', address='/my_engine/my_datapack')
@FunctionalNode(name="my_node", outputs=['output_1', 'output_2'])
def my_function(input_1, input_2):
    return [input_1, input_2]

@RosPublisher(keyword="output_1", address="/my_topic_2", type=nrp_core.data.nrp_ros.std_msgs.String)
@FromFunctionalNode(keyword='input_1', address='/my_node/output_2')
@FunctionalNode(name="my_node_2", outputs=['output_1'])
def my_function_2(input_1):
    return [input_1]

This script, when loaded by the EventLoop, will create a graph with six nodes:

  • two input nodes: my_engine_input, /my_topic.

  • two output nodes: my_engine_output, /my_topic_2

  • two functional nodes: my_node, my_node_2

Five edges will be added to the graph:

  • From my_engine_input to my_node

  • From /my_topic to my_node

  • From my_node to my_node_2

  • From my_node to my_engine_output

  • From my_node_2 to /my_topic_2

When the graph is configured, the nodes in the graph are divided into four layers (see here for more details), each containing the following nodes:

  1. my_engine_input, /my_topic.

  2. my_node

  3. my_node_2

  4. my_engine_output, /my_topic_2

Ports required to connect the different Nodes are also created. The resulting graph is depicted in the figure below:

Computational Graph created from the example Python script

Each time the graph is executed (i.e. in every Event Loop cycle) the nodes in each of the layers are executed sequentially and in order. The next behavior is expected: If a new message has been published to the ROS topic /my_topic in the last event loop cycle, it will be sent to my_node, which will, in turn, forward it to my_node_2 and finally will be published to a /my_topic_2 ROS topic. my_engine_input node will publish the latest available datapack with id my_datapack from Engine my_engine to my_node input port input_1. The datapack will be sent back to the same Engine through the node my_engine_output.

As a final comment, the order in which the decorators are added in the Python script is arbitrary. Only the @FunctionalNode decorator must be placed directly above the Python function which will be executed.

For more examples on how the presented decorators are used to create Computational Graphs, take a look at the three example experiments provided in the examples/event_loop_examples folder.