DataTransfer Engine

The DataTransfer Engine brings data logging and streaming capabilities to NRP-core. It enables users to log experiment data to file for further offline anaylisis or to stream it over the network for, e.g., remote data visualization and experiment monitoring.

The engine implementation is based on the Protobuf over gRPC communication protocol and, therefore, accepts Protobuf DataPacks.

In transceiver functions, users can fetch datapacks from other engines and process them or send them directly to the DataTransfer Engine where they will be logged or streamed. Which datapacks are accepted by the engine and how are they processed (i.e. whether they are logged or streamed or both) is specified in the engine json configuration. See below for more details about the engine configuration parameters.

In the folder examples/husky_braitenberg_dump insterested users can find a complete experiment using the DataTransfer Engine.

Enabling Data Streaming with MQTT

As commented above, besides from logging to file, the DataTransfer Engine allows to stream datapacks over the network. For this purpose the MQTT protocol is used.

In order to enable data streaming from NRP-core, the Paho MQTT library must be installed beforehand as described in Installation Instructions. Additionally, the Engine will need to connect to an MQTT broker and the latter’s address specified in the engine configuration.

In case of needing to bring up your own MQTT broker, below are provided commands to quickly set one up and run it from docker. The commands should be run from the NRP-core source root folder.

sudo docker pull eclipse-mosquitto
sudo docker run -it -p 1883:1883 -v ${PWD}/examples/husky_braitenberg/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto

The resulting MQTT broker will be listening to port 1883. If a different port is wished, it can be specified in examples/husky_braitenberg/mosquitto.conf or in other mosquitto configuration file.

DataPacks

The DataTransfer Engine processes every incoming datapack and tries to save its “data” field to file and/or send it to and MQTT broker, depending on the experiment configuration.

Protobuf DataPacks, containing any protobuf message type, can be sent to the DataTransfer Engine for logging or streaming, with some limitations. Protobuf message types that have repeated fields cannot be logged to files, but can still be streamed.

Additionally, two Protobuf message types are provided with special formatting support for logging into the file: Dump.String and Dump.ArrayFloat. See below for more details

Dump Protobuf message types

syntax = "proto3";

package Dump;

/*
 * Data going to Data Stream Engine as string
 */

message String
{
    string string_stream  = 5;
}

message ArrayFloat
{
    repeated float float_stream = 5;
    repeated uint32 dims = 6;
}


// EOF
  • Dump.String has a single field string_stream of string type.

  • Dump.ArrayFloat can be used to transfer arrays as explained below.

Logging arrays with Dump.ArrayFloat

The Dump.ArrayFloat Protobuf message contains two fields:

  • float_stream: repeated float field containing the array data.

  • dims: repeated integer field specifying the dimensions of the array.

The number of elements in the dims field specifies the number of dimensions of the array, and the value of each element the number of elements of the array in that dimension.

The DataTransfer engine supports arrays of one or two dimensions. If dims is not set or has one element, the array is treated as 1-dimensional. If it has two, the array is treated as 2-dimensional. In this case the first element indicates the number of rows of the array and the second element the number of columns (r and c in the explanation below). The formatting of 3- or more-dimensional arrays is not supported, if dims has a number of elements other than 2, then float_stream is formatted as a 1-dimensional array.

When the engine is requested to log to file a Dump.ArrayFloat message r lines are printed to the logged file, each of them containing c elements from float_stream. In case the size of float_stream is greater than the specified dimensions (i.e. than r * c), the remaining data (not fitting into the number of rows multiplied by the number of columns) is truncated and not printed. In case the total size of float_stream is smaller, then the remaining “space” of the array is printed empty.

In the file examples/husky_braitenberg_dump/tf_1.py are given some examples examples for constructing Dump.ArrayFloat.

Logging and streaming address

When logging to file is enabled, datapacks are always logged to a file <dataDirectory>/<timestamp>/<datapack_name>.data, where <dataDirectory> is a configuration parameter of the Engine.

When Data Streaming with MQTT is enabled, datapacks are published to an nrp/<simulationID>/data/<datapack_name> MQTT topic as serialized protobuf objects. Whenever the Engine is reset, a message with the text “reset” is published through that topic. The type of the streamed protobuf message is published once to the topic nrp/<simulationID>/data/<_data_name>/type. The data topic address (i.e. nrp/<simulationID>/data/<_data_name>) is also published once to the topic nrp/<simulationID>/data.

Aditionally, a welcome message is published once to the topic nrp/<simulationID>/welcome and the experiment simulation time is published every timestep to the topic nrp/<simulationID>/time.

MQTT messages published only once, i.e. the welcome message, datapack data topic addresses and datapack types, are always retained.

Engine Configuration Parameters

This Engine type parameters are defined in DataTransfer schema (listed here), which in turn is based on EngineBase and EngineGRPC schemas and thus inherits all parameters from them.

To use this engine in an experiment, set EngineType to “datatransfer_grpc_engine”.

Name

Description

Type

Default

Required

Array

EngineName

Name of the engine

string

X

EngineType

Engine type. Used by

string

X

EngineProcCmd

Engine Process Launch command

string

EngineProcStartParams

Engine Process Start Parameters

string

[]

X

EngineEnvParams

Engine Process Environment Parameters

string

[]

X

EngineLaunchCommand

object

{“LaunchType”:”BasicFork”}

EngineTimestep

Engine Timestep in seconds

number

0.01

EngineCommandTimeout

Engine Timeout (in seconds). It tells how long to wait for the completion of the engine runStep. 0 or negative values are interpreted as no timeout

number

0.0

  • Parameters inherited from the EngineGRPC schema:

Name

Description

Type

Default

Required

Array

ServerAddress

gRPC Server address. Should this address already be in use, simulation initialization will fail

string

localhost:9004

  • Parameters specific to this engine type:

Name

Description

Type

Default

Required

Array

MQTTBroker

Address of the MQTT broker

string

localhost:1883

dataDirectory

Path to the storage of file streams

string

data

simulationID

A simulation identifier to be added to MQTT topics published by this Engine

string

0

streamDataPackMessage

If true the engine will stream DataPackMessages, if false it will stream their contained data

boolean

true

dumps

List of datapacks for transfer

dumpItem

[]

X

  • dumpItem: elements of the dumps array above:

Name

Description

Type

Default

Required

Array

name

Name of the datapack for transfer

string

X

network

Trigger, if the datapack should be sent to network

boolean

false

file

Trigger, if the datapack should be sent to file

boolean

false

Schema

As explained above, the schema used by the DataTransfer engine inherits from EngineBase and EngineGRPC schemas. A complete schema for the configuration of this engine is given below:

{"engine_datatransfer_base" : {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Data Transfer Base",
    "description": "Data Transfer Engine configuration schema. Configuration for all DataTransfer engine implementations inherit from this one",
    "$id": "#DataTransfer",
    "definitions": {
      "dumpItem":{
        "type": "object",
        "properties": {
          "name":{
            "type": "string",
            "description": "Name of the datapack for transfer"
          },
          "network": {
            "type": "boolean",
            "default": false,
            "description": "Trigger, if the datapack should be sent to network"
          },
          "file": {
            "type": "boolean",
            "default": false,
            "description": "Trigger, if the datapack should be sent to file"
          }
        },
        "required": ["name"]
      }
    },
    "allOf": [
      { "$ref": "json://nrp-core/engines/engine_comm_protocols.json#/engine_grpc" },
      {
        "properties": {
          "MQTTBroker": {
            "type": "string",
            "default": "localhost:1883",
            "description": "Address of the MQTT broker"
          },
          "dataDirectory": {
            "type": "string",
            "default": "data",
            "description": "Path to the storage of file streams."
          },
          "simulationID": {
            "type": "string",
            "default": "0",
            "description": "A simulation identifier to be added to MQTT topics published by this Engine"
          },
          "streamDataPackMessage" : {
            "type": "boolean",
            "default": true,
            "description": "if true the engine will stream DataPackMessages, if false it will stream their contained data"
          },
          "dumps": {
            "type": "array",
            "items": { "$ref": "#/engine_datatransfer_base/definitions/dumpItem" },
            "uniqueItems": true,
            "default": [],
            "description": "List of datapacks for transfer"
          },
          "EngineType": { "enum": ["datatransfer_grpc_engine"] },
          "ProtobufPackages": { "default": ["Dump"]}
        }
      }
    ]
  }
}