Source code for hbp_nrp_simserver.server.mqtt_notifier

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END
"""
This module implements a MQTT Notifier interface for status/error messages.
"""

import contextlib
import json
import logging
from typing import Optional

import paho.mqtt.client as mqtt

from hbp_nrp_commons.workspace.settings import Settings

from . import TOPIC_STATUS, TOPIC_ERROR

logger = logging.getLogger(__name__)


[docs]class MQTTNotifier: """ This class encapsulates publishing of state/errors/task status to the frontend/clients. """ DEFAULT_MQTT_CLIENT_ID = "mqtt_notifier" def __init__(self, sim_id: int, broker_hostname: str = Settings.DEFAULT_MQTT_BROKER_HOST, broker_port: int = Settings.DEFAULT_MQTT_BROKER_PORT, topics_prefix: str = Settings.DEFAULT_MQTT_TOPICS_PREFIX, client_id: Optional[str] = DEFAULT_MQTT_CLIENT_ID): self.sim_id: int = sim_id self.mqtt_broker_hostname: str = broker_hostname self.mqtt_broker_port: str = broker_port self.mqtt_topics_prefix: str = topics_prefix self.mqtt_client_id: str = client_id self.status_topic: str = TOPIC_STATUS(self.sim_id) self.error_topic: str = TOPIC_ERROR(self.sim_id) if self.mqtt_topics_prefix: self.status_topic = f"{self.mqtt_topics_prefix}/{self.status_topic}" self.error_topic = f"{self.mqtt_topics_prefix}/{self.error_topic}" # prefix mqtt_client_id with mqtt_topics_prefix # TODO it should be prefixed by a globally unique sim_id (NRRPLT-8917) self.mqtt_client_id = f"{self.mqtt_topics_prefix}_{self.mqtt_client_id}" # task specific bookkeeping self.__current_task: Optional[str] = None self.__current_subtask_count: int = 0 self.__current_subtask_index: int = 0 # NOTE MQTTv5 requires clean_start=True parameter to connect # instead of clean_session=True here self.__mqtt_client: Optional[mqtt.Client] = mqtt.Client(self.mqtt_client_id, clean_session=True) self.__mqtt_client.on_connect = self.__on_connect self.__mqtt_client.connect(host=self.mqtt_broker_hostname, port=self.mqtt_broker_port) self.__mqtt_client.loop_start() # start message processing thread logger.info("MQTT notifier initialized. Simulation ID: '%s'", self.sim_id) def __on_connect(self, _client, _userdata, _flags, _rc): logger.debug("Connected to MQTT broker at %s:%d with 'id' %s. Simulation ID: '%s'", self.mqtt_broker_hostname, self.mqtt_broker_port, self.mqtt_client_id, self.sim_id)
[docs] def shutdown(self): """ Shutdown all publishers, notification will no longer function after called. """ logger.info('Shutting down MQTT notifier') self.__mqtt_client.loop_stop() self.__mqtt_client.disconnect() self.__mqtt_client = None
[docs] def publish_status(self, msg): """ Publishes a state message :param msg: A string of formatted JSON to publish. """ if self.__mqtt_client is None: logger.error('Attempting to publish state after shutdown!') return self.__mqtt_client.publish(self.status_topic, msg)
[docs] def publish_error(self, error_msg): """ Publishes an error message :param error_msg: A string of formatted JSON to publish. """ if self.__mqtt_client is None: logger.error('Attempting to publish error after shutdown!') return logger.debug("Publishing an Error: '%s'", error_msg) self.__mqtt_client.publish(self.error_topic, error_msg)
# TASK NOTIFIER
[docs] def start_task(self, task_name, subtask_name, number_of_subtasks, block_ui=False): """ Sends, on the MQTT status topic, a notification that a task is starting. This method will save the task name and the task size in class members so that it could be reused in subsequent call to the update_task method. :param task_name: Title of the task (example: initializing experiment). :param subtask_name: Title of the first subtask. Could be empty (example: 'loading...'). :param number_of_subtasks: Number of expected subsequent calls to update_current_task(_, True, _). :param block_ui: Indicate that the client should block any user interaction. """ if self.__current_task is not None: logger.warning( "Previous task was not closed properly, closing it now.") self.finish_task() self.__current_task = task_name self.__current_subtask_count = number_of_subtasks message = {'progress': {'task': task_name, 'subtask': subtask_name, 'number_of_subtasks': number_of_subtasks, 'subtask_index': self.__current_subtask_index, 'block_ui': block_ui}} self.publish_status(json.dumps(message))
[docs] def update_task(self, new_subtask_name, update_progress, block_ui=False): """ Sends a status notification that the current task is updated with a new subtask. :param subtask_name: Title of the first subtask. Could be empty (example: 'Loading Foo...'). :param update_progress: Boolean indicating if the index of the current subtask should be updated (usually yes). :param block_ui: Indicate that the client should block any user interaction. """ if self.__current_task is None: logger.warning("Can't update a non existing task.") return if update_progress: self.__current_subtask_index += 1 message = {'progress': {'task': self.__current_task, 'subtask': new_subtask_name, 'number_of_subtasks': self.__current_subtask_count, 'subtask_index': self.__current_subtask_index, 'block_ui': block_ui}} self.publish_status(json.dumps(message))
[docs] def finish_task(self): """ Sends a status notification that the current task is finished. """ if self.__current_task is None: logger.warning("Can't finish a non existing task.") return message = {'progress': {'task': self.__current_task, 'done': True}} self.publish_status(json.dumps(message)) self.__current_subtask_count = 0 self.__current_subtask_index = 0 self.__current_task = None
[docs] @contextlib.contextmanager def task_notifier(self, task_name, subtask_name=None): """ Task notifier context manager :param task_name: :param subtask_name: """ self.start_task(task_name, subtask_name if subtask_name else "", number_of_subtasks=0, block_ui=True) try: yield finally: self.finish_task()