Executing Engines Asynchronously and in Real-time with the Event Loop

In a FTILoop based, NRPCore experiment we have a set of Engine processes running their own simulations and interacting through a Computational Graph or Transceiver Functions managed by a FTILoop in a central NRPCoreSim process. There are two important characteristics in this setup: (1) Both the progress of the Engine simulations, the execution of the Computational Graph and the exchange of data between Engines is fully synchronous and deterministic, orchestrated by the FTILoop in the Simulation Loop; (2) It is not possible to impose real-time constraints on the execution of the Simulation Loop, only Simulation Time is relevant in the coordination of the different Engines.

As an alternative, NRPCore offers the possibility of executing each of the engines in the experiments as an independent process managed by its own Event Loop, which takes care of getting and setting datapacks and advancing the simulation at a fixed frequency, which is a property of the Event Loop configuration. In this setup the Computational Graph is also run by an Event Loop in the NRPCoreSim process, all communications are asynchronous and each of the components in the experiment, Engines and NRPCoreSim run in real-time, i.e. executes their Event Loops at a fixed frequency.

This second setup is more suitable for interacting with real-time systems, as robotics systems, from NRP-Core. A practical workflow in this case where the ultimate goal is exert some sort of control or interaction with a real-time system would be:

  1. Design a synchronous, FTILoop based experiment in NRPCore in which the real-time system is modelled as one or more Engines. The controller is implemented as a one or more Engines and a Computational Graph or as a Computational Graph alone

  2. Use this setup to train, tune or test the controller in which the experiment execution is synchronous, deterministic and reproducible

  3. Port the experiment to a fully asynchronous, real-time setup. Tests, benchmarks or tuning can be performed here again before interacting with the real system

  4. Replace the simulated environment or system with the real one

This page explains how to perform step (3) in the workflow described above, i.e., how to either port an existing synchronous, FTILoop based, NRPCore experiment to a fully asynchronous, real-time setup or to write a new one from scratch.

The main differences between an asynchronous experiment and a synchronous one are that:

  • Engines are not synchronized by a FTILoop. Instead, each of the engines in the experiments is an independent process managed by its own Event Loop, which takes care of getting and setting datapacks and advancing the simulation at a fixed frequency, which is a property of the Event Loop configuration

  • Engine processes are not started by NRPCoreSim using engines launchers. They must be started independently

  • The NRPCoreSim uses another Event Loop which runs a Computational Graph, again at a fixed frequency to allow exchanging and transforming data between Engine processes

Below are explained the implications of the points described above in the experiment configuration, code and runtime behavior.

Asynchronous Engines

All Engines available in NRPCore implement the Engine Interface, which allows a NRPCoreSim process to control the time progress of the Engine and to make requests for getting/setting datapacks using one of the communication protocols available in NRPCore: JSON over REST and Protobuf over gRPC. The former allows a FTILoop to orchestrate the progress and data exchange, through Transceiver Functions or a Computational Graph, between Engines in a fully synchronous fashion.

Now, in the case of Protobuf over gRPC and the Python JSON Engine, it is also possible to execute the Engine standalone, in real-time and asynchronously, i.e. independently from NRPCoreSim. In the latter case, the “body” of the Engine remains unaltered, i.e. the Engine configuration parameters, scripts and/or any other assets used by it doesn’t need to be modified. The main differences are:

  • For each Engine there is a dedicated executable which allows to run it standalone, in real-time and asynchronously using an Event Loop

  • The Engine configuration is augmented with Event Loop specific parameters

  • Datapacks are received and sent through MQTT topics instead of through RPCs as in the synchronous setup

Each of these points are explained in the sections below.

Executables

For each of the Engine types which can be run asynchronously there is a dedicated executable which allows to launch it. The behavior of each of them is the same: they read and parse a configuration file, connect to MQTT and start an Event Loop which processes incoming datapacks, advance the Engine and send datapacks at a fixed frequency. See the section Example Experiments below for a reference on asynchronous example experiments for each of the supported Engines.

The asynchronous Engine executables expect two command line parameters:

  • “–config”: path to the configuration file

  • “–loglevel”: the desired console log level: ‘critical’, ‘fatal’, ‘error’, ‘warning’, ‘info’, ‘debug’

Configuration

The configuration of Asynchronous Engine executable is defined in EventLoopEngine schema, which contains the following parameters:

Name

Description

Type

Default

Required

Array

Values

Timeout

Event loop timeout (in seconds). 0 means no timeout

integer

0

Timestep

Time in seconds the event loop advances in each loop

number

0.01

TimestepWarnThreshold

Threshold (in seconds) above which a warning message is printed at runtime everytime the Event Loop can’t run at the frequency specified in the “Timestep” parameter

number

0.001

EngineConfig

Configuration of the Engine run by the Event Loop

X

MQTTConfig

Configuration of the MQTT client used to send/receive datapacks

ProcessLastMsg

if true, only the last message received through a topic during the last step is processed

bool

true

DataQueueSize

Maximum number of messages received through a topic which are stored each step

integer

1

The parameter “EngineConfig” corresponds to the Engine configuration as used in a regular FTILoop, NRPCore experiment configuration.

Data Exchange through MQTT topics

When run Asynchronously, Engines use MQTT topics to exchange datapacks, instead of RPCs as in the synchronous setup.

For each registered datapack in the Engine, it subscribes to a MQTT topic with address: “{engine_name}/set/{datapack_name}”. Messages received through this topic are converted into datapacks and set to the Engine using the same mechanism used in the synchronous setup for setting datapacks produced by TFs or the Computational Graph.

Likewise, each datapack registered in the Engine is updated after each Engine step and published through a MQTT topic with address: “{engine_name}/get/{datapack_name}”.

Adapting Experiments to Run Asynchronously

There are several aspects that need to be considered when porting a NRPCore experiment from synchronous to asynchronous:

  • the exchange of data between Engines and the NRPCoreSim process

  • the experiment configuration

  • the way of launching the experiment

  • the resulting runtime behavior

Each of them are discussed in the sections below.

Data Exchange and Experiment Code Changes

In this setup, data exchange between the NRPCoreSim central process and Engine processes happens asynchronously through MQTT topics. Also, running NRPCoreSim and exchanging data with Engines asynchronously can only be done with an Event Loop managing a Computational Graph. This means that:

  • NRPCoreSim must be configured to use an Event Loop instead of a FTILoop

  • Only a Computational Graph can be used in this case. In case of using TFs in the original experiment, they must be converted

The configuration part is explained in the following section.

The main changes in the experiment code comes from the need to replace @FromEngine and @ToEngine decorators in the Computational Graph Functional Nodes with @MQTTSubscriber and @MQTTPublisher to shift from synchronous, RPC based data exchange to asynchronous, MQTT based data exchange. The Functional Nodes function bodies doesn’t need to be modified. In the case of coming from an experiment using Transceiver Functions, TFs can be transformed into Computational Graph Functional Nodes without changing the TF function bodies as well, i.e. with only changes in the used decorators. This process is planned to be explained in a separate guide. In the meanwhile, the example experiments: ‘examples/husky_braitenberg_cg’ , ‘examples/event_loop_examples/husky_braitenberg_cg’ and ‘examples/event_loop_examples/husky_braitenberg_async’ offer a very good example of the same experiment implemented synchronously using TFs, synchronously using a Computational Graph and asynchronously.

Experiment Configuration

In the first place, the original experiment configuration must be split between the NRPCoreSim configuration and a separate configuration file for each of the Engines participating in the experiment. Each of the Engine configurations in the “EngineConfigs” parameter of the original experiment configuration can be copy/pasted into the EngineConfig parameter of the corresponding async Engine configuration file.

Also, as indicated in the section above, NRPCoreSim needs to use an Event Loop in order to run asynchronously. Read this guide for more information about how to do this.

Finally, NRPCoreSim needs to be configured to connect to MQTT. In the same guide there is information on how to do this.

Launching the Experiment

In a fully synchronous NRPCore experiment, NRPCoreSim is by default in charge of launching and shutting down the different Engine processes involved in the experiment. In the asynchronous case, this Engine processes can be used standalone and without the need of a NRPCoreSim process to control them. Thus, by default NRPCoreSim doesn’t launch them. If the user desires to let NRPCoreSim manage the different Engine processes in this case, it still can be done by using the experiment configuration parameter “ExternalProcesses”. See the section Example Experiments below for examples on how this can be done.

Runtime Experiment Behavior

The last point to take into consideration is that, even after following all the points described above and ending with a working asynchronous version of your experiment, it might not behave exactly the same as its synchronous counterpart. This should be normal and expected given the addition of asynchronicity and non-determinism in the communications between the different Engines in the experiment and the NRPCoreSim process. After converting an Experiment to run asynchronously, further tests and tuning might be required.

Example Experiments

In the example experiments provided in NRPCore inside the ‘examples’ folder, there three experiments with synchronous and asynchronous versions which can be used to better understand the conversion process. They are described below.

examples/event_loop_examples/husky_braitenberg_async

This is the asynchronous version of examples/event_loop_examples/husky_braitenberg_cg. Below is listed its configuration file:

{
    "SimulationName": "husky_braitenberg_cg",
    "SimulationDescription": "Adaptation of the example `husky_braitenberg` in which a Computational Graph (CG) is used to process and rely data between Engines.",
    "SimulationLoop": "EventLoop",
    "MQTTNode": {},
    "EngineConfigs": [
        {
            "EngineType": "nest_json",
            "EngineName": "nest",
            "NestInitFileName": "braitenberg.py",
            "EngineEnvParams": ["PYNEST_QUIET=1"]
        }
    ],
    "ComputationalGraph": ["cam_fn.py", "mot_fn.py", "brain_stimulation_fn.py"],
    "EventLoop": { "Timestep": 0.1},
    "ExternalProcesses" : [
        {
            "ProcCmd": "gzserver",
            "ProcStartParams": ["--config=engine_async_conf.json", "--loglevel=info", "--verbose", "-s", "NRPGazeboAsyncPlugin.so", "--seed", "0", "husky_world.sdf"]
        }
    ]
}

The key points to notice are:

  • “SimulationLoop”: “EventLoop” configures NRPCoreSim to use an EventLoop

  • “EventLoop”: { “Timestep”: 0.1} sets the timestep of NRPCoreSim to 0.1 seconds

  • The “ExternalProcesses” parameter starts the gazebo engine using the NRPGazeboAsyncPlugin.so system plugin, which will run the Engine asynchronously using an Event Loop

  • The configuration of the Gazebo Engine is located in a separate file, engine_async_conf.json :

{
    "Timestep": 0.1,
    "EngineConfig": {
        "EngineName": "gazebo",
        "GazeboWorldFile": "",
        "GazeboSDFModels": [
            {
                "Name": "husky",
                "File": "husky.sdf",
                "InitPose": "-0.009853 0.086704 0.187752 0 0 0"
            }

        ]
    }
}

As it can be noticed, the engine configuration has its own “Timestep” parameter, which corresponds to the inverse of the Event Loop frequency. This is expected, since Asynchronous Engines are meant to be independent components and thus they will not inherit any configuration parameter from the NRPCoreSim configuration, even though they can be launched together with the latter by using the “ExternalProcesses” parameter.

The Functional Nodes defined in the files “cam_fn.py”, “mot_fn.py”, “brain_stimulation_fn.py” are almost identical to their counterparts in examples/event_loop_examples/husky_braitenberg_cg, with the exception that all @FromEngine decorators are replaced with @MQTTSubscriber decorators and @ToEngine decorators are replaced with @MQTTPublisher decorators. For example in “cam_fn.py” we find:

@MQTTSubscriber(keyword='camera', address="gazebo/get/husky::eye_vision_camera::camera", type=GazeboCameraDataPack)

and in examples/event_loop_examples/husky_braitenberg_cg :

@FromEngine(keyword='camera', address='/gazebo/husky::eye_vision_camera::camera')

In the same way, in “mot_fn.py” we find:

@MQTTPublisher(keyword="back_left_j", address="gazebo/set/husky::back_left_joint", type=GazeboJointDataPack)

and in examples/event_loop_examples/husky_braitenberg_cg :

@ToEngine(keyword="back_left_j", address="/gazebo")

examples/event_loop_examples/tf_exchange_async

The experiment examples/tf_exchange shows the use of Python JSON Engine and Python GRPC Engine and Transceiver Functions. examples/event_loop_examples/tf_exchange_async reimplements the same experiment with Engines and NRPCoreSim running asynchronously and using a Computational Graph instead of TFs.

The experiment includes two configuration files: simulation_confing_proto.json and simulation_confing_json.json. The former one will launch the experiment using async Python Grpc Engine and the latter one the Python JSON Engine version.

examples/event_loop_examples/opensim_control_async

This experiment is the asynchronous version of examples/pysim_examples/opensim_control. It exemplifies how Pysim Engines can also be run asynchronously.

gRPC Engines generated from template

The guide Creating a new Engine from template explains how to implement a new JSON over REST or Protobuf over gRPC Engine using the NRPCore tool create_new_engine.py.

In the case of using the tool for implementing a gRPC Engine, an executable for running the Engine asynchronously is generated as well. In the example experiment which comes with the new Engine can be found a file simulation_config_async.json which runs the Engine asynchronously in a very simple experiment and which is very convenient for testing.