.. index:: pair: page; DataTransfer Engine .. _doxid-datatransfer_engine: DataTransfer Engine =================== The DataTransfer Engine brings data logging and streaming capabilities to NRP-core. It enables users to log experiment data to file for further offline anaylisis or to stream it over the network for, e.g., remote data visualization and experiment monitoring. The engine implementation is based on the :ref:`Protobuf over gRPC communication protocol ` and, therefore, accepts :ref:`Protobuf DataPacks `. In transceiver functions, users can fetch datapacks from other engines and process them or send them directly to the DataTransfer Engine where they will be logged or streamed. Which datapacks are accepted by the engine and how are they processed (i.e. whether they are logged or streamed or both) is specified in the engine json configuration. See :ref:`below ` for more details about the engine configuration parameters. In the folder *examples/husky_braitenberg_dump* insterested users can find a complete experiment using the DataTransfer Engine. .. _doxid-datatransfer_engine_1datatransfer_engine_streaming: Enabling Data Streaming with MQTT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ As commented above, besides from logging to file, the DataTransfer Engine allows to stream datapacks over the network. For this purpose the MQTT protocol is used. In order to enable data streaming from NRP-core, the Paho MQTT library must be installed beforehand as described in :ref:`Installation Instructions `. Additionally, the Engine will need to connect to an MQTT broker and the latter's address specified in the engine configuration. In case of needing to bring up your own MQTT broker, below are provided commands to quickly set one up and run it from docker. The commands should be run from the NRP-core source root folder. .. ref-code-block:: cpp sudo docker pull eclipse-mosquitto sudo docker run -it -p 1883:1883 -v ${PWD}/examples/husky_braitenberg/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto The resulting MQTT broker will be listening to port *1883*. If a different port is wished, it can be specified in *examples/husky_braitenberg/mosquitto.conf* or in other mosquitto configuration file. .. _doxid-datatransfer_engine_1datatransfer_engine_datapacks: DataPacks ~~~~~~~~~ The DataTransfer Engine processes every incoming datapack and tries to save its "data" field to file and/or send it to and MQTT broker, depending on the experiment configuration. :ref:`Protobuf DataPacks `, containing any protobuf message type, can be sent to the DataTransfer Engine for logging or streaming, with some limitations. Protobuf message types that have *repeated* fields cannot be logged to files, but can still be streamed. Additionally, two Protobuf message types are provided with special formatting support for logging into the file: *Dump.String* and *Dump.ArrayFloat*. See below for more details .. _doxid-datatransfer_engine_1dump_proto: Dump Protobuf message types --------------------------- .. ref-code-block:: cpp syntax = "proto3"; package Dump; /* * Data going to Data Stream Engine as string */ message String { string string_stream = 5; } message ArrayFloat { repeated float float_stream = 5; repeated uint32 dims = 6; } // EOF * *Dump.String* has a single field *string_stream* of string type. * *Dump.ArrayFloat* can be used to transfer arrays as explained below. .. _doxid-datatransfer_engine_1dump_array_float: Logging arrays with Dump.ArrayFloat +++++++++++++++++++++++++++++++++++ The *Dump.ArrayFloat* Protobuf message contains two fields: * float_stream: repeated float field containing the array data. * dims: repeated integer field specifying the dimensions of the array. The number of elements in the *dims* field specifies the number of dimensions of the array, and the value of each element the number of elements of the array in that dimension. The DataTransfer engine supports arrays of one or two dimensions. If *dims* is not set or has one element, the array is treated as 1-dimensional. If it has two, the array is treated as 2-dimensional. In this case the first element indicates the number of rows of the array and the second element the number of columns (*r* and *c* in the explanation below). The formatting of 3- or more-dimensional arrays is not supported, if *dims* has a number of elements other than 2, then *float_stream* is formatted as a 1-dimensional array. When the engine is requested to log to file a *Dump.ArrayFloat* message *r* lines are printed to the logged file, each of them containing *c* elements from *float_stream*. In case the size of *float_stream* is greater than the specified dimensions (i.e. than r \* c), the remaining data (not fitting into the number of rows multiplied by the number of columns) is truncated and not printed. In case the total size of *float_stream* is smaller, then the remaining "space" of the array is printed empty. In the file *examples/husky_braitenberg_dump/tf_1.py* are given some examples examples for constructing *Dump.ArrayFloat*. .. _doxid-datatransfer_engine_1logging_address: Logging and streaming address ----------------------------- When logging to file is enabled, datapacks are always logged to a file ``//.data``, where is a configuration parameter of the Engine. When Data Streaming with MQTT is enabled, datapacks are published to an *nrp//data/* MQTT topic as serialized protobuf objects. Whenever the Engine is reset, a message with the text "reset" is published through that topic. The type of the streamed protobuf message is published once to the topic *nrp//data/<_data_name>/type*. The data topic address (i.e. *nrp//data/<_data_name>*) is also published once to the topic *nrp//data*. Aditionally, a welcome message is published once to the topic *nrp//welcome* and the experiment simulation time is published every timestep to the topic *nrp//time*. MQTT messages published only once, i.e. the welcome message, datapack data topic addresses and datapack types, are always retained. .. _doxid-datatransfer_engine_1engine_datatransfer_config_section: Engine Configuration Parameters ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This Engine type parameters are defined in DataTransfer schema (listed :ref:`here `), which in turn is based on :ref:`EngineBase ` and :ref:`EngineGRPC ` schemas and thus inherits all parameters from them. To use this engine in an experiment, set ``EngineType`` to **"datatransfer_grpc_engine"**. * Parameters inherited from :ref:`EngineBase ` schema: ===================== =================================================================================================================================================== ====== ========================== ======== ===== Name Description Type Default Required Array ===================== =================================================================================================================================================== ====== ========================== ======== ===== EngineName Name of the engine string X EngineType Engine type. Used by string X EngineProcCmd Engine Process Launch command string EngineProcStartParams Engine Process Start Parameters string [] X EngineEnvParams Engine Process Environment Parameters string [] X EngineLaunchCommand object {"LaunchType":"BasicFork"} EngineTimestep Engine Timestep in seconds number 0.01 EngineCommandTimeout Engine Timeout (in seconds). It tells how long to wait for the completion of the engine runStep. 0 or negative values are interpreted as no timeout number 0.0 ===================== =================================================================================================================================================== ====== ========================== ======== ===== * Parameters inherited from the :ref:`EngineGRPC ` schema: ============= =============================================================================================== ====== ============== ======== ===== Name Description Type Default Required Array ============= =============================================================================================== ====== ============== ======== ===== ServerAddress gRPC Server address. Should this address already be in use, simulation initialization will fail string localhost:9004 ============= =============================================================================================== ====== ============== ======== ===== * Parameters specific to this engine type: ===================== ============================================================================================= ======== ============== ======== ===== Name Description Type Default Required Array ===================== ============================================================================================= ======== ============== ======== ===== MQTTBroker Address of the MQTT broker string localhost:1883 dataDirectory Path to the storage of file streams string data simulationID A simulation identifier to be added to MQTT topics published by this Engine string 0 streamDataPackMessage If true the engine will stream DataPackMessages, if false it will stream their contained data boolean true dumps List of datapacks for transfer dumpItem [] X ===================== ============================================================================================= ======== ============== ======== ===== * dumpItem: elements of the *dumps* array above: ======= ================================================== ======= ======= ======== ===== Name Description Type Default Required Array ======= ================================================== ======= ======= ======== ===== name Name of the datapack for transfer string X network Trigger, if the datapack should be sent to network boolean false file Trigger, if the datapack should be sent to file boolean false ======= ================================================== ======= ======= ======== ===== .. _doxid-datatransfer_engine_1engine_datatransfer_schema: Schema ~~~~~~ As explained above, the schema used by the DataTransfer engine inherits from :ref:`EngineBase ` and :ref:`EngineGRPC ` schemas. A complete schema for the configuration of this engine is given below: .. ref-code-block:: cpp {"engine_datatransfer_base" : { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Data Transfer Base", "description": "Data Transfer Engine configuration schema. Configuration for all DataTransfer engine implementations inherit from this one", "$id": "#DataTransfer", "definitions": { "dumpItem":{ "type": "object", "properties": { "name":{ "type": "string", "description": "Name of the datapack for transfer" }, "network": { "type": "boolean", "default": false, "description": "Trigger, if the datapack should be sent to network" }, "file": { "type": "boolean", "default": false, "description": "Trigger, if the datapack should be sent to file" } }, "required": ["name"] } }, "allOf": [ { "$ref": "json://nrp-core/engines/engine_comm_protocols.json#/engine_grpc" }, { "properties": { "MQTTBroker": { "type": "string", "default": "localhost:1883", "description": "Address of the MQTT broker" }, "dataDirectory": { "type": "string", "default": "data", "description": "Path to the storage of file streams." }, "simulationID": { "type": "string", "default": "0", "description": "A simulation identifier to be added to MQTT topics published by this Engine" }, "streamDataPackMessage" : { "type": "boolean", "default": true, "description": "if true the engine will stream DataPackMessages, if false it will stream their contained data" }, "dumps": { "type": "array", "items": { "$ref": "#/engine_datatransfer_base/definitions/dumpItem" }, "uniqueItems": true, "default": [], "description": "List of datapacks for transfer" }, "EngineType": { "enum": ["datatransfer_grpc_engine"] }, "ProtobufPackages": { "default": ["Dump"]} } } ] } }