# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
"""
A Wrapper for NrpCore clients instances making it amenable to be controlled interactively.
It adds cooperative execution via thread synchronization and checks on simulation timeouts
"""
import logging
import threading
from time import time_ns as now_ns
from typing import List, Optional, Type
from hbp_nrp_commons.workspace.settings import Settings
import hbp_nrp_simserver.server.experiment_configuration as exp_conf_utils
import hbp_nrp_simserver.server as simserver
logger = logging.getLogger(__name__)
[docs]class NrpCoreWrapper:
NRP_CORE_DEFAULT_ADDRESS_PORT = 'localhost:5345' # TODO from some configuration instead?
def __init__(self,
nrp_core_class: Type[simserver.NrpCoreClientClass],
sim_id: str,
exp_config_file: str,
exp_config: exp_conf_utils.type_class, # experiment-related params
paused_event: threading.Event,
stopped_event: threading.Event):
self.sim_id = sim_id
self.__exp_config_file = exp_config_file
self.__paused_event = paused_event
self.__stopped_event = stopped_event
# exp_config is assumed to be valid
self.__timeout_s: float = float(exp_config.SimulationTimeout)
self.__timestep_s: float = float(exp_config.SimulationTimestep)
# keep internal simulation time in timestep units
self.__max_timesteps: int = int(self.__timeout_s / self.__timestep_s)
self.__timesteps_count: int = 0
# Track real time as follows
# right before nrp_client run_loop is run, start_time is set to the current clock time.
# When it's completed, elapsed_time is incremented by (stop_time - start_time).
# Thus, the real time is:
# - elapsed_time when paused
# - elapsed_time + (now - start_time) when running
# time unit is ns since we use time.time_ns()
self.__start_time: int = 0 # nsecs
self.__elapsed_time: int = 0 # nsecs
self.is_running: bool = False
# datatransfer_engine's needs sim_id to use in topics' naming.
# We pass "sim_id" overriding its "simulationID" parameter in its configuration
# But we need to find the position of its configuration in the exp_config.EngineConfigs list
data_engine_index: int = exp_conf_utils.engine_index(exp_config,
"datatransfer_grpc_engine")
data_engine_conf_prefix_str: str = f"EngineConfigs.{data_engine_index}"
conf_overrides: List[str] = [""] # empty string to ease string joining with str.join()
conf_overrides.append(f'{data_engine_conf_prefix_str}.simulationID="{sim_id}"')
# We override the MQTTBroker address if specified in workspace.Settings via Env Vars
if not Settings.is_mqtt_broker_default:
mqtt_address_str = f"{Settings.mqtt_broker_host}:{Settings.mqtt_broker_port}"
conf_overrides.append(f'{data_engine_conf_prefix_str}.MQTTBroker="{mqtt_address_str}"')
# override MQTTPrefix (specified in workspace.Settings via Env Var) in any case (set/empty string)
conf_overrides.append(f'{data_engine_conf_prefix_str}.MQTTPrefix="{Settings.mqtt_topics_prefix}"')
conf_overrides_str = ' -o '.join(conf_overrides).strip()
nrp_core_args = [conf_overrides_str] # NOTE append nrp_core args e.g. "--cloglevel=trace"
nrp_core_args_str = " ".join(nrp_core_args)
# Configurations Assumptions:
# - address is 'localhost:5345'
# - current directory is the experiment directory
logger.debug("Instantiating nrp-core client: "
"%s(%s, config_file=%s, args=%s) ",
nrp_core_class.__name__,
self.NRP_CORE_DEFAULT_ADDRESS_PORT,
self.__exp_config_file, nrp_core_args_str)
# NOTE Change here when NrpCore API changes
self.__nrp_core_client_instance = nrp_core_class(self.NRP_CORE_DEFAULT_ADDRESS_PORT,
config_file=self.__exp_config_file,
args=nrp_core_args_str)
def _initialize(self):
self.__nrp_core_client_instance.initialize()
def _shutdown(self):
self.__nrp_core_client_instance.shutdown()
@property
def max_timesteps(self) -> int:
"""
return: configured timeout / configured timestep as int
"""
return self.__max_timesteps
@property
def simulation_time(self) -> float:
""""
:return: the simulation time in seconds as a float
"""
return float(self.__timesteps_count * self.__timestep_s) # in secs
@property
def simulation_time_remaining(self) -> float:
""""
:return: the simulation time remaining until the configured timeout in seconds as a float
"""
return float((self.__max_timesteps - self.__timesteps_count) * self.__timestep_s) # in secs
@property
def real_time(self) -> float:
""""
:return: The wall-clock elapsed execution time in secs as a float
"""
time_delta: int = (now_ns() - self.__start_time) if self.is_running else 0
return float((self.__elapsed_time + time_delta) * 1e-9)
[docs] def run_loop(self, num_iterations: int = 1, json_data: Optional[str] = None) -> Optional[dict]:
"""
Ask to advance the simulation of num_iterations timesteps.
In the case such number of iterations will result in the timeout being reached,
an NRPSimulationTimeout exception will be raised; the simulation won't be advanced.
See nrp_client.NrpCore.run_loop for further details.
:raises NRPSimulationTimeout: if running for num_iterations will exceed the configured
simulation timeout (i.e. curr_timestep + num_iterations > max_timestep)
:raises NRPStopExecution: when stopped_event is set.
:return: same as NrpCore.run_loop, i.e any JSON data passed in the response or None
"""
logger.debug("run_loop: waiting on paused event. Simulation ID '%s'", self.sim_id)
# set by NRPScriptRunner.pause() and cleared by NRPScriptRunner.start()
self.__paused_event.wait() # NOTE Waiting point
logger.debug("run_loop: wait on paused event over. Simulation ID '%s'", self.sim_id)
# check if we have been asked to stop
if self.__stopped_event.is_set():
logger.debug("run_loop: Stop event is set! raise NRPStopExecution. "
"Simulation ID '%s'", self.sim_id)
raise NRPStopExecution()
# Check simulation timeout boundary
# if __max_timesteps is not a multiple of num_iterations,
# the last (self.__max_timesteps % num_iterations) timesteps won't be executed.
# The other behaviour will require a change in the method signature.
# In fact, since it will be possible to run fewer timesteps than required,
# the user will have to know how many have been actually run.
# e.g.
# loops_actually_run = run_loop(requested_loops)
# 0 <= loops_actually_run <= requested_loops
if (self.__timesteps_count + num_iterations) > self.__max_timesteps:
raise NRPSimulationTimeout("The number of iteration requested will exceed the timeout")
self.__start_time = now_ns()
self.is_running = True
try:
# delegate run_loop to wrapped NrpCore instance
# NOTE This is subject to changes in NrpCore's API
loop_result: Optional[dict] = self.__nrp_core_client_instance.run_loop(num_iterations,
json_data)
finally:
# in case of run_loop raising an exception,
# we don't know how many iterations have been completed, so don't count them.
# the simulation has failed anyway
self.__elapsed_time += now_ns() - self.__start_time
self.is_running = False
# time keeping. Ideally, NrpCore should take care of it
self.__timesteps_count += num_iterations
logger.debug("run_loop: loop completed. Simulation ID '%s'", self.sim_id)
return loop_result
[docs] def stop(self):
raise NotImplementedError("stop() is not available")
[docs] def reset(self):
raise NotImplementedError("reset() is not available")
[docs] def shutdown(self):
raise NotImplementedError("shutdown() is not available")
[docs] def initialize(self):
raise NotImplementedError("initialize() is not available")
[docs]class NRPStopExecution(Exception):
"""
Raised by NrpCoreWrapped.run_loop when called in a stopped state
"""
pass
[docs]class NRPSimulationTimeout(Exception):
"""
Raised by NrpCoreWrapped.run_loop when as simulation timeout has been reached
"""
pass