DataTransfer Engine

The DataTransfer Engine provides data logging and streaming capabilities to NRP-core. It enables users to log experiment data to a file for further offline analysis or to stream it over the network for, e.g., remote data visualization and experiment monitoring.

The engine’s implementation is based on the Protobuf over gRPC communication protocol and thus accepts Protobuf DataPacks.

In transceiver functions, users can fetch datapacks from other engines, process them, or send them directly to the DataTransfer Engine, where they will be logged or streamed. Which datapacks are accepted by the engine and how they are processed (i.e., whether they are logged, streamed, or both) is specified in the engine’s JSON configuration. See the section below for more details on the engine configuration parameters.

In the simulation configuration file examples/husky_braitenberg/simulation_config_data_transfer.json, interested users can find a complete experiment using the DataTransfer Engine.

Enabling Data Streaming with MQTT

As mentioned above, besides logging to a file, the DataTransfer Engine also allows streaming datapacks over the network. For this purpose, the MQTT protocol is used.

To enable data streaming from NRP-core, the Paho MQTT library must be installed as described in Installation Instructions. Additionally, the Engine will need to connect to an MQTT broker, and the broker’s address must be specified in the engine configuration.

If you need to set up your own MQTT broker, commands are provided below to quickly set one up and run it using Docker. These commands should be run from the NRP-core source root folder.

sudo docker pull eclipse-mosquitto
sudo docker run -it -p 1883:1883 -v ${PWD}/examples/husky_braitenberg/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

The resulting MQTT broker will listen on port 1883. If you wish to use a different port, you can specify it in examples/husky_braitenberg/mosquitto.conf or in another mosquitto configuration file.

DataPacks

The DataTransfer Engine processes every incoming datapack and tries to save its “data” field to a file and/or send it to an MQTT broker, depending on the experiment configuration.

Protobuf DataPacks, containing any protobuf message type, can be sent to the DataTransfer Engine for logging or streaming, with some limitations. Protobuf message types with repeated fields cannot be logged to files but can still be streamed.

Additionally, two Protobuf message types are provided with special formatting support for logging to a file: Dump.String and Dump.ArrayFloat. See below for more details.

Dump Protobuf message types

syntax = "proto3";

package Dump;

/*
 * Data going to Data Stream Engine as string
 */

message String
{
    string string_stream  = 5;
}

message ArrayFloat
{
    repeated float float_stream = 5;
    repeated uint32 dims = 6;
}


// EOF
  • Dump.String has a single field string_stream of the string type.

  • Dump.ArrayFloat can be used to transfer arrays as explained below.

Logging arrays with Dump.ArrayFloat

The Dump.ArrayFloat Protobuf message contains two fields:

  • float_stream: a repeated float field containing the array data.

  • dims: a repeated integer field specifying the dimensions of the array.

The number of elements in the dims field specifies the number of dimensions of the array, and the value of each element indicates the number of elements of the array in that dimension.

The DataTransfer engine supports arrays of one or two dimensions. If dims is not set or has one element, the array is treated as 1-dimensional. If it has two, the array is treated as 2-dimensional. In this case, the first element indicates the number of rows of the array and the second element the number of columns (r and c in the explanation below). The formatting of 3- or more-dimensional arrays is not supported; if dims has a number of elements other than 2, then float_stream is formatted as a 1-dimensional array.

When the engine is requested to log a Dump.ArrayFloat message to a file, r lines are printed, each containing c elements from float_stream. If the size of float_stream is greater than the specified dimensions (i.e., greater than r * c), the excess data (not fitting into the number of rows multiplied by the number of columns) is truncated and not printed. If the total size of float_stream is smaller, then the remaining “space” of the array is printed empty.

Examples of constructing Dump.ArrayFloat are given in the file examples/husky_braitenberg_dump/tf_1.py.

Logging and streaming address

When logging to a file is enabled, datapacks are always logged to a file <dataDirectory>/<timestamp>/<datapack_name>.data, where <dataDirectory> is a configuration parameter of the Engine.

When Data Streaming with MQTT is enabled, datapacks are published to an *<MQTTPrefix>/nrp_simulation/<simulationID>/data/<datapack_name>* MQTT topic as serialized protobuf objects. <MQTTPrefix> and <simulationID> are configuration parameters of the Engine. Whenever the Engine is reset, a message with the text “reset” is published to that topic.

The list of available data topics, along with their corresponding data types, is published to a dedicated topic: *<MQTTPrefix>/nrp_simulation/<simulationID>/data*. The format for this information is provided below:

[
  {
    "topic": "<MQTTPrefix>/nrp_simulation/<simulationID>/data/<datapack_name1>",
    "type": "<Data.Type1>"
  },
  {
    "topic": "<MQTTPrefix>/nrp_simulation/<simulationID>/data/<datapack_name2>",
    "type": "<Data.Type2>"
  },
  ...
]

Upon initialization of the data topic (i.e., before the simulation begins), the type value is left blank. It is subsequently updated once the engine determines the data type, which typically occurs when the first message for that topic is sent.

Additionally, a welcome message is published once to the topic *<MQTTPrefix>/nrp_simulation/<simulationID>/welcome*.

Engine Configuration Parameters

This Engine type’s parameters are defined in the DataTransfer schema (listed here), which in turn is based on the EngineBase and EngineGRPC schemas, inheriting all parameters from them.

To use this engine in an experiment, set EngineType to “datatransfer_grpc_engine”.

Name

Description

Type

Default

Required

Array

EngineName

Name of the engine

string

X

EngineType

Engine type. Used by

string

X

EngineProcCmd

Engine Process Launch command

string

EngineProcStartParams

Engine Process Start Parameters

string

[]

X

EngineEnvParams

Engine Process Environment Parameters

string

[]

X

EngineLaunchCommand

object

{“LaunchType”:”BasicFork”}

EngineTimestep

Engine Timestep in seconds

number

0.01

EngineCommandTimeout

Engine Timeout (in seconds). It tells how long to wait for the completion of the engine runStep. 0 or negative values are interpreted as no timeout

number

0.0

  • Parameters inherited from the EngineGRPC schema:

Name

Description

Type

Default

Required

Array

ServerAddress

gRPC Server address. Should this address already be in use, simulation initialization will fail

string

localhost:9004

  • Parameters specific to this engine type:

Name

Description

Type

Default

Required

Array

MQTTBroker

Address of the MQTT broker

string

localhost:1883

dataDirectory

Path to the storage of file streams

string

data

MQTTPrefix

Prefix to MQTT topics published by this Engine

string

simulationID

Simulation identifier to be added to MQTT topics published by this Engine

string

0

streamDataPackMessage

If true the engine will stream DataPackMessages, if false it will stream their contained data

boolean

true

dumps

List of datapacks for transfer

dumpItem

[]

X

  • dumpItem: elements of the dumps array above:

Name

Description

Type

Default

Required

Array

name

Name of the datapack for transfer

string

X

network

Trigger, if the datapack should be sent to network

boolean

false

file

Trigger, if the datapack should be sent to file

boolean

false

Schema

As explained above, the schema used by the DataTransfer engine inherits from the EngineBase and EngineGRPC schemas. A complete schema for the configuration of this engine is provided below:

{"engine_datatransfer_base" : {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Data Transfer Base",
    "description": "Data Transfer Engine configuration schema. Configuration for all DataTransfer engine implementations inherit from this one",
    "$id": "#DataTransfer",
    "definitions": {
      "dumpItem":{
        "type": "object",
        "properties": {
          "name":{
            "type": "string",
            "description": "Name of the datapack for transfer"
          },
          "network": {
            "type": "boolean",
            "default": false,
            "description": "Trigger, if the datapack should be sent to network"
          },
          "file": {
            "type": "boolean",
            "default": false,
            "description": "Trigger, if the datapack should be sent to file"
          }
        },
        "required": ["name"]
      }
    },
    "allOf": [
      { "$ref": "json://nrp-core/engines/engine_comm_protocols.json#/engine_grpc" },
      {
        "properties": {
          "MQTTBroker": {
            "type": "string",
            "default": "localhost:1883",
            "description": "Address of the MQTT broker"
          },
          "dataDirectory": {
            "type": "string",
            "default": "data",
            "description": "Path to the storage of file streams."
          },
          "MQTTPrefix": {
            "type": "string",
            "description": "prefix to be added to MQTT topics published by this Engine"
          },
          "simulationID": {
            "type": "string",
            "default": "0",
            "description": "Simulation identifier to be added to MQTT topics published by this Engine"
          },
          "streamDataPackMessage" : {
            "type": "boolean",
            "default": true,
            "description": "if true the engine will stream DataPackMessages, if false it will stream their contained data"
          },
          "dumps": {
            "type": "array",
            "items": { "$ref": "#/engine_datatransfer_base/definitions/dumpItem" },
            "uniqueItems": true,
            "default": [],
            "description": "List of datapacks for transfer"
          },
          "EngineType": { "enum": ["datatransfer_grpc_engine"] },
          "ProtobufPackages": { "default": ["Dump"]}
        }
      }
    ]
  }
}