.. index:: pair: page; DataTransfer Engine .. _doxid-datatransfer_engine: DataTransfer Engine =================== The DataTransfer Engine provides data logging and streaming capabilities to NRP-core. It enables users to log experiment data to a file for further offline analysis or to stream it over the network for, e.g., remote data visualization and experiment monitoring. The engine's implementation is based on the :ref:`Protobuf over gRPC communication protocol ` and thus accepts :ref:`Protobuf DataPacks `. In transceiver functions, users can fetch datapacks from other engines, process them, or send them directly to the DataTransfer Engine, where they will be logged or streamed. Which datapacks are accepted by the engine and how they are processed (i.e., whether they are logged, streamed, or both) is specified in the engine's JSON configuration. See the :ref:`section below ` for more details on the engine configuration parameters. In the simulation configuration file *examples/husky_braitenberg/simulation_config_data_transfer.json*, interested users can find a complete experiment using the DataTransfer Engine. .. _doxid-datatransfer_engine_1datatransfer_engine_streaming: Enabling Data Streaming with MQTT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ As mentioned above, besides logging to a file, the DataTransfer Engine also allows streaming datapacks over the network. For this purpose, the MQTT protocol is used. To enable data streaming from NRP-core, the Paho MQTT library must be installed as described in :ref:`Installation Instructions `. Additionally, the Engine will need to connect to an MQTT broker, and the broker's address must be specified in the engine configuration. If you need to set up your own MQTT broker, commands are provided below to quickly set one up and run it using Docker. These commands should be run from the NRP-core source root folder. .. ref-code-block:: cpp sudo docker pull eclipse-mosquitto sudo docker run -it -p 1883:1883 -v ${PWD}/examples/husky_braitenberg/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto The resulting MQTT broker will listen on port *1883*. If you wish to use a different port, you can specify it in *examples/husky_braitenberg/mosquitto.conf* or in another mosquitto configuration file. .. _doxid-datatransfer_engine_1datatransfer_engine_datapacks: DataPacks ~~~~~~~~~ The DataTransfer Engine processes every incoming datapack and tries to save its "data" field to a file and/or send it to an MQTT broker, depending on the experiment configuration. :ref:`Protobuf DataPacks `, containing any protobuf message type, can be sent to the DataTransfer Engine for logging or streaming, with some limitations. Protobuf message types with *repeated* fields cannot be logged to files but can still be streamed. Additionally, two Protobuf message types are provided with special formatting support for logging to a file: *Dump.String* and *Dump.ArrayFloat*. See below for more details. .. _doxid-datatransfer_engine_1dump_proto: Dump Protobuf message types --------------------------- .. ref-code-block:: cpp syntax = "proto3"; package Dump; /* * Data going to Data Stream Engine as string */ message String { string string_stream = 5; } message ArrayFloat { repeated float float_stream = 5; repeated uint32 dims = 6; } // EOF * *Dump.String* has a single field *string_stream* of the string type. * *Dump.ArrayFloat* can be used to transfer arrays as explained below. .. _doxid-datatransfer_engine_1dump_array_float: Logging arrays with Dump.ArrayFloat +++++++++++++++++++++++++++++++++++ The *Dump.ArrayFloat* Protobuf message contains two fields: * float_stream: a repeated float field containing the array data. * dims: a repeated integer field specifying the dimensions of the array. The number of elements in the *dims* field specifies the number of dimensions of the array, and the value of each element indicates the number of elements of the array in that dimension. The DataTransfer engine supports arrays of one or two dimensions. If *dims* is not set or has one element, the array is treated as 1-dimensional. If it has two, the array is treated as 2-dimensional. In this case, the first element indicates the number of rows of the array and the second element the number of columns (*r* and *c* in the explanation below). The formatting of 3- or more-dimensional arrays is not supported; if *dims* has a number of elements other than 2, then *float_stream* is formatted as a 1-dimensional array. When the engine is requested to log a *Dump.ArrayFloat* message to a file, *r* lines are printed, each containing *c* elements from *float_stream*. If the size of *float_stream* is greater than the specified dimensions (i.e., greater than r \* c), the excess data (not fitting into the number of rows multiplied by the number of columns) is truncated and not printed. If the total size of *float_stream* is smaller, then the remaining "space" of the array is printed empty. Examples of constructing *Dump.ArrayFloat* are given in the file *examples/husky_braitenberg_dump/tf_1.py*. .. _doxid-datatransfer_engine_1logging_address: Logging and streaming address ----------------------------- When logging to a file is enabled, datapacks are always logged to a file ``//.data``, where is a configuration parameter of the Engine. When Data Streaming with MQTT is enabled, datapacks are published to an \*/nrp_simulation//data/\* MQTT topic as serialized protobuf objects. and are configuration parameters of the Engine. Whenever the Engine is reset, a message with the text "reset" is published to that topic. The list of available data topics, along with their corresponding data types, is published to a dedicated topic: \*/nrp_simulation//data\*. The format for this information is provided below: .. ref-code-block:: cpp [ { "topic": "/nrp_simulation//data/", "type": "" }, { "topic": "/nrp_simulation//data/", "type": "" }, ... ] Upon initialization of the data topic (i.e., before the simulation begins), the ``type`` value is left blank. It is subsequently updated once the engine determines the data type, which typically occurs when the first message for that topic is sent. Additionally, a welcome message is published once to the topic \*/nrp_simulation//welcome\*. .. _doxid-datatransfer_engine_1engine_datatransfer_config_section: Engine Configuration Parameters ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This Engine type's parameters are defined in the DataTransfer schema (listed :ref:`here `), which in turn is based on the :ref:`EngineBase ` and :ref:`EngineGRPC ` schemas, inheriting all parameters from them. To use this engine in an experiment, set ``EngineType`` to **"datatransfer_grpc_engine"**. * Parameters inherited from :ref:`EngineBase ` schema: ===================== =================================================================================================================================================== ====== ========================== ======== ===== Name Description Type Default Required Array ===================== =================================================================================================================================================== ====== ========================== ======== ===== EngineName Name of the engine string X EngineType Engine type. Used by string X EngineProcCmd Engine Process Launch command string EngineProcStartParams Engine Process Start Parameters string [] X EngineEnvParams Engine Process Environment Parameters string [] X EngineLaunchCommand object {"LaunchType":"BasicFork"} EngineTimestep Engine Timestep in seconds number 0.01 EngineCommandTimeout Engine Timeout (in seconds). It tells how long to wait for the completion of the engine runStep. 0 or negative values are interpreted as no timeout number 0.0 ===================== =================================================================================================================================================== ====== ========================== ======== ===== * Parameters inherited from the :ref:`EngineGRPC ` schema: ============= =============================================================================================== ====== ============== ======== ===== Name Description Type Default Required Array ============= =============================================================================================== ====== ============== ======== ===== ServerAddress gRPC Server address. Should this address already be in use, simulation initialization will fail string localhost:9004 ============= =============================================================================================== ====== ============== ======== ===== * Parameters specific to this engine type: ===================== ============================================================================================= ======== ============== ======== ===== Name Description Type Default Required Array ===================== ============================================================================================= ======== ============== ======== ===== MQTTBroker Address of the MQTT broker string localhost:1883 dataDirectory Path to the storage of file streams string data MQTTPrefix Prefix to MQTT topics published by this Engine string simulationID Simulation identifier to be added to MQTT topics published by this Engine string 0 streamDataPackMessage If true the engine will stream DataPackMessages, if false it will stream their contained data boolean true dumps List of datapacks for transfer dumpItem [] X ===================== ============================================================================================= ======== ============== ======== ===== * dumpItem: elements of the *dumps* array above: ======= ================================================== ======= ======= ======== ===== Name Description Type Default Required Array ======= ================================================== ======= ======= ======== ===== name Name of the datapack for transfer string X network Trigger, if the datapack should be sent to network boolean false file Trigger, if the datapack should be sent to file boolean false ======= ================================================== ======= ======= ======== ===== .. _doxid-datatransfer_engine_1engine_datatransfer_schema: Schema ~~~~~~ As explained above, the schema used by the DataTransfer engine inherits from the :ref:`EngineBase ` and :ref:`EngineGRPC ` schemas. A complete schema for the configuration of this engine is provided below: .. ref-code-block:: cpp {"engine_datatransfer_base" : { "$schema": "http://json-schema.org/draft-07/schema#", "title": "Data Transfer Base", "description": "Data Transfer Engine configuration schema. Configuration for all DataTransfer engine implementations inherit from this one", "$id": "#DataTransfer", "definitions": { "dumpItem":{ "type": "object", "properties": { "name":{ "type": "string", "description": "Name of the datapack for transfer" }, "network": { "type": "boolean", "default": false, "description": "Trigger, if the datapack should be sent to network" }, "file": { "type": "boolean", "default": false, "description": "Trigger, if the datapack should be sent to file" } }, "required": ["name"] } }, "allOf": [ { "$ref": "json://nrp-core/engines/engine_comm_protocols.json#/engine_grpc" }, { "properties": { "MQTTBroker": { "type": "string", "default": "localhost:1883", "description": "Address of the MQTT broker" }, "dataDirectory": { "type": "string", "default": "data", "description": "Path to the storage of file streams." }, "MQTTPrefix": { "type": "string", "description": "prefix to be added to MQTT topics published by this Engine" }, "simulationID": { "type": "string", "default": "0", "description": "Simulation identifier to be added to MQTT topics published by this Engine" }, "streamDataPackMessage" : { "type": "boolean", "default": true, "description": "if true the engine will stream DataPackMessages, if false it will stream their contained data" }, "dumps": { "type": "array", "items": { "$ref": "#/engine_datatransfer_base/definitions/dumpItem" }, "uniqueItems": true, "default": [], "description": "List of datapacks for transfer" }, "EngineType": { "enum": ["datatransfer_grpc_engine"] }, "ProtobufPackages": { "default": ["Dump"]} } } ] } }