.. index:: pair: page; Instantiating a Computational Graph in Python .. _doxid-python_graph: Instantiating a Computational Graph in Python ============================================= This page describes how to create a Computational Graph, instantiating :ref:`Input `, :ref:`Output ` and :ref:`Functional ` nodes and connecting them with edges. The created graph can be afterwards executed by the :ref:`EventLoop `. For this purpose, a set of Python decorators are provided; their use is described below. The main decorator is ``@:ref:`FunctionalNode ```, which takes a Python function and creates a :ref:`Functional Node ` from it. The rest of the decorators always decorate a ``@:ref:`FunctionalNode ```. They allow to create :ref:`Input ` and :ref:`Output ` Nodes of the different :ref:`available types ` and connect the decorated Functional Node to them. All the decorators are defined in the ``nrp_core.event_loop`` Python module. .. _doxid-python_graph_1functional_node_decorator: @FunctionalNode ~~~~~~~~~~~~~~~ :ref:`Functional Nodes ` can be created from a Python function using the ``@:ref:`FunctionalNode ``` decorator. The Node will then execute this function every Event Loop cycle. The code snippet below shows the creation of a simple Functional Node which just forward each of its two inputs to its two outputs. .. ref-code-block:: cpp @:ref:`FunctionalNode `(name="my_node", outputs=['output_1', 'output_2'], exec_policy=node_policies.functional_node.exec_policy.on_new_message) def my_function(input_1, input_2): return [input_1, input_2] The decorator has three arguments: * ``name`` : string used as the node ``id`` in the graph and must be therefore unique in the graph * ``outputs`` : a list of strings declaring the number and names of outputs of this node. * ``policy`` (optional): the :ref:`execution policy ` of the node. Its value must be of type ``node_policies.functional_node.exec_policy``, an enum with possible values: ``always`` and ``on_new_message``. Its default value is ``on_new_message``. Each of the declared outputs can be connected to other nodes using decorators. Each of the input arguments of the function can be connected to other nodes too. The decorated Python function can return either *None* or a list with length equal to the number of declared outputs in the Functional Node decorator. Otherwise a runtime error is triggered. Returning *None* can be used in cases when the function can't produce valid outputs. .. _doxid-python_graph_1from_functional_node_decorator: @FromFunctionalNode ~~~~~~~~~~~~~~~~~~~ This decorator allows to connect one Functional Node input port to another Functional Node output port. It, therefore, creates an edge in the Computational Graph between two Functional Nodes. The code below will create a Functional Node with one input, ``input_1``, and one output ``output_1``, and connect ``input_1`` to an output :ref:`Port ` with id ``output_2`` from a node with id ``my_node``. .. ref-code-block:: cpp @FromFunctionalNode(keyword='input_1', address='/my_node/output_2') @:ref:`FunctionalNode `(name="my_node_2", outputs=['output_1']) def my_function_2(input_1): return [input_1] The decorator has two arguments: * ``keyword`` : id of the input port in the decorated functional node which will be connected. * ``address`` : address of the output port which will be connected. The expected syntax for the output port address is ``/node_id/port_id``. If a node with id ``node_id`` doesn't exist or doesn't have an output with id ``port_id``, a runtime error will occur. .. _doxid-python_graph_1from_engine_decorator: @FromEngine ~~~~~~~~~~~ This decorator creates an :ref:`InputEngineNode ` and connects it to one of the decorated Functional Node inputs. It, therefore, creates a node and an edge in the Computational Graph. The code below extends on a previous example by connecting ``input_1`` in ``my_node`` to an output ``my_datapack`` of an :ref:`InputEngineNode ` with id ``my_engine``. .. ref-code-block:: cpp @FromEngine(keyword='input_1', address='/my_engine/my_datapack') @:ref:`FunctionalNode `(name="my_node", outputs=['output_1', 'output_2']) def my_function(input_1, input_2): return [input_1, input_2] If ``my_engine`` doesn't exist in the graph it is created. Then an Output :ref:`Port ` ``my_datapack`` is registered with ``my_engine`` and datapack id ``my_datapack`` is added to the node requested datapacks. See :ref:`here ` for more details. The decorator has three arguments: * ``keyword`` : id of the input port in the decorated functional node which will be connected. * ``address`` : address of the output port which will be connected. * ``cache_policy`` (optional): the :ref:`message cache policy ` of the node. Its value must be of type ``node_policies.input_node.msg_cache``, an enum with possible values: ``clear`` and ``keep``. Its default value is ``keep``. .. _doxid-python_graph_1to_engine_decorator: @ToEngine ~~~~~~~~~ This decorator creates an :ref:`OutputEngineNode ` and connects it to one of the decorated Functional Node outputs. It works in a very similar way to the previous one. The code below extends the previous example by connecting ``output_1`` in ``my_node`` to an :ref:`OutputEngineNode ` with id ``my_engine``. .. ref-code-block:: cpp @ToEngine(keyword="output_1", address="/my_engine") @FromEngine(keyword='input_1', address='/my_engine/my_datapack') @:ref:`FunctionalNode `(name="my_node", outputs=['output_1', 'output_2']) def my_function(input_1, input_2): return [input_1, input_2] The decorator has two arguments: * ``keyword`` : id of the output port in the decorated functional node which will be connected. * ``address`` : address of the input port which will be connected. It can be noted that from the example above, according to the descriptions given, two nodes with the same id, ``my_engine``, would be created. Since node ids must be unique in the graph and in order to still allow using the same Engine name for both Input and Output Engine node ids, when an :ref:`OutputEngineNode ` is created with a particular id, it internally adds the postscript ``"_output"`` to it. In the same way, InputEngineNodes add '"_input"' to their ids. In this way, from the example above three nodes will be created with ids: ``my_engine_input``, ``my_engine_output`` and ``my_node``. .. _doxid-python_graph_1ros_subscriber_decorator: @RosSubscriber ~~~~~~~~~~~~~~ This decorator creates an :ref:`InputROSNode ` and connects it to one of the decorated Functional Node inputs. It creates, then, a node and an edge in the Computational Graph. The code below extends on the previous example by connecting ``input_2`` in ``my_node`` to an output port ``/my_topic`` of an :ref:`InputROSNode ` with id ``/my_topic``. .. ref-code-block:: cpp @RosSubscriber(keyword="input_2", address="/my_topic", type=nrp_core.data.nrp_ros.std_msgs.String) @ToEngine(keyword="output_1", address="/my_engine/my_datapack") @FromEngine(keyword='input_1', address='/my_engine/my_datapack') @:ref:`FunctionalNode `(name="my_node", outputs=['output_1', 'output_2']) def my_function(input_1, input_2): return [input_1, input_2] The decorator has five arguments: * ``keyword`` : as in other decorators, specifies the name of the Functional Node input port that the decorator connects. * ``address`` : tells the ROS topic to subscribe to. * ``type`` : the ROS message type that is received through this ROS topic. * ``cache_policy`` (optional): the :ref:`message cache policy ` of the node. Its value must be of type ``node_policies.input_node.msg_cache``, an enum with possible values: ``clear`` and ``keep``. Its default value is ``keep``. * ``publish_policy`` (optional): the :ref:`message publish policy ` of the node. Its value must be of type ``node_policies.input_node.msg_publish``, an enum with possible values: ``last`` and ``all``. Its default value is ``last``. In this case, a new :ref:`InputROSNode ` is created to subscribe to each different topic. From each @RosSubscriber decorator a node with ``id`` ``address`` is created, and an OuputPort with id also ``address`` is added to it. The value used in the ``type`` argument of the decorator must be a class from ``nrp_core.data.nrp_ros`` Python module. These are the ROS message types supported in NRP-core. For more information about which are the message types supported by default see :ref:`here `. For information about how to compile NRP-core to support additional ROS message types see :ref:`here `. .. _doxid-python_graph_1ros_publisher_decorator: @RosPublisher ~~~~~~~~~~~~~ This decorator creates an :ref:`OutputROSNode ` and connects it to one of the decorated Functional Node outputs. It adds a node and an edge to the Computational Graph. The code below extends on a previous example by connecting ``input_1`` in ``my_node_2`` to an input port ``/my_topic_2`` of an :ref:`OutputROSNode ` with id ``/my_topic_2``. .. ref-code-block:: cpp @RosPublisher(keyword="output_1", address="/my_topic_2", type=nrp_core.data.nrp_ros.std_msgs.String) @FromFunctionalNode(keyword='input_1', address='/my_node/output_2') @:ref:`FunctionalNode `(name="my_node_2", outputs=['output_1']) def my_function_2(input_1): return [input_1] The decorator has the next arguments: * ``keyword`` : specifies the name of the Functional Node output port that the decorator connects. * ``address`` : tells the ROS topic to publish to. * ``type`` : the ROS message type the ROS topic this node publishes too. * ``compute_period`` (optional): :ref:`ComputePeriod ` of the node, 1 by default. * ``publish_from_cache`` (optional): :ref:`publishFromCache ` property of the node, false by default. It is not possible to subscribe and publish to the same topic from a graph. Attempting to do this will result in having two nodes with the same id, which is not allowed. .. _doxid-python_graph_1from_spinnaker_decorator: @FromSpiNNaker ~~~~~~~~~~~~~~ This decorator creates an :ref:`InputSpiNNakerNode ` and connects it to one of the decorated Functional Node inputs. It, therefore, creates a node and an edge in the Computational Graph. The code below connects ``input_1`` in ``my_node`` to the spiking output of a Population ``pop_1`` defined in SpiNNaker PyNN code. Note that the label of the Population is the import part, not the Python variable name. Also note that the Population must be activated for live output for this to be recieved by the function. .. ref-code-block:: cpp import pyNN.spiNNaker as sim sim.setup(1.0) my_pop_1 = sim.Population(5, sim.IF_curr_exp(), label="pop_1") sim.external_devices.activate_live_output_for(my_pop_1, notify=False) @FromSpiNNaker(keyword='input_1', address='pop_1') @:ref:`FunctionalNode `(name="my_node", outputs=[]) def my_function(input_1): print(f"Spikes received from {input_1['label']} at time {input_1['time']}:" f" {input_1['neuron_ids']}") return [] The decorator has four arguments: * ``keyword`` : specifies the name of the Functional Node input port that the decorator connects. * ``address`` : the SpiNNaker population to receive spikes or voltage from. * ``cache_policy`` (optional): the :ref:`message cache policy ` of the node. Its value must be of type ``node_policies.input_node.msg_cache``, an enum with possible values: ``clear`` and ``keep``. Its default value is ``keep``. * ``publish_policy`` (optional): the :ref:`message publish policy ` of the node. Its value must be of type ``node_policies.input_node.msg_publish``, an enum with possible values: ``last`` and ``all``. Its default value is ``last``. For a complete example using Spinnaker node decorators see ``examples/event_loop_examples/husky_braitenberg_spinnaker``. In this experiment can be seen how to use ``@FromSpiNNaker`` decorator to access neuron voltages from the Computational Graph. .. _doxid-python_graph_1to_spinnaker_decorator: @ToSpiNNaker ~~~~~~~~~~~~ This decorator creates an :ref:`OutputSpiNNakerNode ` and connects it to one of the decorated Functional Node outputs. It, therefore, creates a node and an edge in the Computational Graph. The code below connects ``output_1`` in ``my_node`` to a SpikeInjector Population ``pop_1`` defined in SpiNNaker PyNN code. Note that the label of the Population is the import part, not the Python variable name. .. ref-code-block:: cpp import pyNN.spiNNaker as sim sim.setup(1.0) my_pop_1 = sim.Population(5, sim.SpikeInjector(), label="pop_1") sim.external_devices.activate_live_output_for(my_pop_1, notify=False) @ToSpiNNaker(keyword='output_1', address='pop_1') @:ref:`FunctionalNode `(name="my_node", outputs=[output_1]) def my_function(): out = NlohmannJson() out['neuron_ids'] = list(range(5)) return [out] The decorator has two arguments: * ``keyword`` : specifies the name of the Functional Node output port that the decorator connects. * ``address`` : the SpiNNaker population to send spikes to or update the rates of. * ``compute_period`` (optional): :ref:`ComputePeriod ` of the node, 1 by default. * ``publish_from_cache`` (optional): :ref:`publishFromCache ` property of the node, false by default. In the aforementioned experiment located at ``examples/event_loop_examples/husky_braitenberg_spinnaker`` can be seen how to use ``@ToSpiNNaker`` decorator to control the rate of a SpikeSourcePoisson. .. _doxid-python_graph_1mqtt_subscriber_decorator: @MQTTSubscriber ~~~~~~~~~~~~~~~ This decorator creates an :ref:`InputMQTTNode ` and connects it to one of the decorated Functional Node inputs. It creates, then, a node and an edge in the Computational Graph. The code below subscribes to an MQTT topic 'test_topic' and connects incoming msgs to 'my_function' 'mqtt_in' argument. These messages are printed out inside of the function and a new message published to 'test_topic' from the function return value. In this way, in the created Computational Graph, the node 'mqtt_str' prints out its own published message in a loop. .. ref-code-block:: cpp @MQTTSubscriber(keyword='mqtt_in', address="test_topic", type=str) @MQTTPublisher(keyword="mqtt_out", address="test_topic", type=str) @:ref:`FunctionalNode `(name="mqtt_str", outputs=["mqtt_out"], exec_policy=node_policies.functional_node.exec_policy.always) def my_function(mqtt_in): if mqtt_in is not None: print('Message from mqtt str: {}'.format(mqtt_in)) return ["Hi there!"] The decorator has five arguments: * ``keyword`` : as in other decorators, specifies the name of the Functional Node input port that the decorator connects. * ``address`` : tells the MQTT topic to subscribe to. * ``type`` : the data type incoming MQTT messages will be converted to (see :ref:`here `). * ``cache_policy`` (optional): the :ref:`message cache policy ` of the node. Its value must be of type ``node_policies.input_node.msg_cache``, an enum with possible values: ``clear`` and ``keep``. Its default value is ``keep``. * ``publish_policy`` (optional): the :ref:`message publish policy ` of the node. Its value must be of type ``node_policies.input_node.msg_publish``, an enum with possible values: ``last`` and ``all``. Its default value is ``last``. Below are listed the possible valid values for the ``type`` argument in the decorator: * 'str': incoming messages payload is passed to the connected function as a string * 'nrp_core.data.nrp_json.NlohmannJson': payload is passed as :ref:`JSON objects ` * Any of the protobuf message types compiled with NRP-Core. These can be found under the Python module: 'nrp_core.data.nrp_protobuf'. * 'nrp_core.data.nrp_json.JsonDataPack' * Any of the protobuf datapack types defined in 'nrp_core.data.nrp_protobuf' 'str' is always safe to use, but in case of using any of the other types a runtime error will occur if incoming MQTT message can't be converted to that type. See the experiment 'examples/event_loop_examples/mqtt_simple' for examples of mqtt decorators using each of the supported types. .. _doxid-python_graph_1mqtt_publisher_decorator: @MQTTPublisher ~~~~~~~~~~~~~~ This decorator creates an :ref:`OutputMQTTNode ` and connects it to one of the decorated Functional Node outputs. It adds a node and an edge to the Computational Graph. The code below replicates the one above but using 'NlohmannJson' data type. .. ref-code-block:: cpp from nrp_core.data.nrp_json import NlohmannJson @MQTTSubscriber(keyword='mqtt_in', address="test_topic_json", type=NlohmannJson) @MQTTPublisher(keyword="mqtt_out", address="test_topic_json", type=NlohmannJson) @:ref:`FunctionalNode `(name="mqtt_json", outputs=["mqtt_out"], exec_policy=node_policies.functional_node.exec_policy.always) def my_function(mqtt_in): if mqtt_in is not None: print('Message from mqtt json: {}'.format(mqtt_in)) out = NlohmannJson() out['hi'] = 'there' return [out] * ``keyword`` : specifies the name of the Functional Node output port that the decorator connects. * ``address`` : the MQTT topic to publish to. * ``type`` : the data type that published MQTT messages will be converted from (see :ref:`here `). * ``compute_period`` (optional): :ref:`ComputePeriod ` of the node, 1 by default. * ``publish_from_cache`` (optional): :ref:`publishFromCache ` property of the node, false by default. .. _doxid-python_graph_1clock_decorator: @Clock ~~~~~~ This decorator creates an :ref:`InputClockNode ` and connects it to one of the decorated Functional Node inputs. It adds a node and an edge to the Computational Graph. The code below would create a Functional Node which reads the current system clock and prints it out. .. ref-code-block:: cpp @Clock('clock_in') @:ref:`FunctionalNode `(name="clock_example", outputs=[], exec_policy=node_policies.functional_node.exec_policy.always) def my_function(clock_in): print("current system time: {}".format(clock_in)) return [] As explained in this :ref:`section `, depending on whether the Computational Graph is being managed by an :ref:`EventLoop ` or an :ref:`FTILoop `, the :ref:`InputClockNode ` will stream either the current value of the internal (real-time) clock of the :ref:`EventLoop `, wrt the time point in which the loop was started, or the current simulation time, in the case of using an :ref:`FTILoop `. * ``keyword`` : specifies the name of the Functional Node output port that the decorator connects. .. _doxid-python_graph_1iteration_decorator: @Iteration ~~~~~~~~~~ This decorator creates an :ref:`InputIterationNode ` and connects it to one of the decorated Functional Node inputs. It adds a node and an edge to the Computational Graph. The code below would create a Functional Node which reads the current loop iteration number and prints it out. .. ref-code-block:: cpp @Iteration('iteration_in') @:ref:`FunctionalNode `(name="iteration_example", outputs=[], exec_policy=node_policies.functional_node.exec_policy.always) def my_function(iteration_in): print("current loop iteration number: {}".format(iteration_in)) return [] Either if the Computational Graph is being managed by an :ref:`EventLoop ` or an :ref:`FTILoop `, the :ref:`InputIterationNode ` streams the number of times the loop has been run. * ``keyword`` : specifies the name of the Functional Node output port that the decorator connects. .. _doxid-python_graph_1python_graph_example: Putting the Examples Together ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To conclude the overview of the available Computational Graph decorators lets put together the different examples shown in the sections above: .. ref-code-block:: cpp @RosSubscriber(keyword="input_2", address="/my_topic", type=nrp_core.data.nrp_ros.std_msgs.String) @ToEngine(keyword="output_1", address="/my_engine") @FromEngine(keyword='input_1', address='/my_engine/my_datapack') @:ref:`FunctionalNode `(name="my_node", outputs=['output_1', 'output_2']) def my_function(input_1, input_2): return [input_1, input_2] @RosPublisher(keyword="output_1", address="/my_topic_2", type=nrp_core.data.nrp_ros.std_msgs.String) @FromFunctionalNode(keyword='input_1', address='/my_node/output_2') @:ref:`FunctionalNode `(name="my_node_2", outputs=['output_1']) def my_function_2(input_1): return [input_1] This script, when loaded by the :ref:`EventLoop `, will create a graph with six nodes: * two input nodes: ``my_engine_input``, ``/my_topic``. * two output nodes: ``my_engine_output``, ``/my_topic_2`` * two functional nodes: ``my_node``, ``my_node_2`` Five edges will be added to the graph: * From ``my_engine_input`` to ``my_node`` * From ``/my_topic`` to ``my_node`` * From ``my_node`` to ``my_node_2`` * From ``my_node`` to ``my_engine_output`` * From ``my_node_2`` to ``/my_topic_2`` When the graph is configured, the nodes in the graph are divided into four layers (see :ref:`here ` for more details), each containing the following nodes: #. ``my_engine_input``, ``/my_topic``. #. ``my_node`` #. ``my_node_2`` #. ``my_engine_output``, ``/my_topic_2`` Ports required to connect the different Nodes are also created. The resulting graph is depicted in the figure below: .. image:: computational_graph_example.png :width: 1177 :align: center :alt: Computational Graph created from the example Python script Each time the graph is executed (i.e. in every Event Loop cycle) the nodes in each of the layers are executed sequentially and in order. The next behavior is expected: If a new message has been published to the ROS topic ``/my_topic`` in the last event loop cycle, it will be sent to ``my_node``, which will, in turn, forward it to ``my_node_2`` and finally will be published to a ``/my_topic_2`` ROS topic. ``my_engine_input`` node will publish the latest available datapack with id ``my_datapack`` from Engine ``my_engine`` to ``my_node`` input port ``input_1``. The datapack will be sent back to the same Engine through the node ``my_engine_output``. As a final comment, the order in which the decorators are added in the Python script is arbitrary. Only the ``@:ref:`FunctionalNode ``` decorator must be placed directly above the Python function which will be executed. For more examples on how the presented decorators are used to create Computational Graphs, take a look at the three example experiments provided in the ``examples/event_loop_examples`` folder.