Source code for hbp_nrp_commons.sim_config.ConfigEditor

# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
# ---LICENSE-END

"""
This module provides support methods to manipulate exc and bibi files.
"""
from hbp_nrp_commons.generated import bibi_api_gen as bibi_parser, exp_conf_api_gen as exc_parser
import logging
from builtins import object
from builtins import range

from future import standard_library
standard_library.install_aliases()

__author__ = 'Hossain Mahmud'

logger = logging.getLogger(__name__)


[docs]class ConfigEditor(object): # pragma: no cover """ 'Friend' class for SimConfig to manipulate (in-memory and storage) exc and bibi """ def __init__(self, sim_config): self._sim_config = sim_config # Note: the rationale of separating this class from the SimConfig is to keep SimConfig # strictly as a model. But since we are restricting direct access to the dom objects # we had make exceptions here # pylint: disable=protected-access self._exc_dom = sim_config._exc_dom self._bibi_dom = sim_config._bibi_dom
[docs] def add_robotpose(self, robot_id, pose=None): """ Adds a <robotPose> tag in the exc :param robot_id: robotId attribute for the tag :param pose: A cle_ros_msgs.msgs.Pose object (defines an object's Euler pos and orientation) :return: """ # RobotPose is the type of <robotPose> defined in the exc schema (xsd) file in Experiments tag = exc_parser.RobotPose() tag.robotId = robot_id if pose is None: tag.x = 0.0 tag.y = 0.0 tag.z = 0.0 tag.roll = 0.0 tag.pitch = 0.0 tag.yaw = 0.0 else: tag.x = pose.x tag.y = pose.y tag.z = pose.z tag.roll = pose.roll tag.pitch = pose.pitch tag.yaw = pose.yaw self._exc_dom.environmentModel.append(tag) # HACK: Replace exc and bibi representation by reading from storage rp = self._exc_dom.environmentModel.robotPose self._hack_() self._exc_dom.environmentModel.robotPose = rp # END HACK # Update sim dir copy of the exc self._write_xml(self._exc_dom.toxml('utf-8'), self._sim_config.exc_path.abs_path)
[docs] def delete_robotpose(self, robot_id): """ Deletes <robotPose> tag in the exc where robotId is robot_id :param robot_id: robotId attribute for the tag :return: """ del_index = None for i in range(len(self._exc_dom.environmentModel.robotPose)): if self._exc_dom.environmentModel.robotPose[i].robotId == robot_id: del_index = i break if del_index is not None: del self._exc_dom.environmentModel.robotPose[del_index] # HACK: Replace exc and bibi representation by reading from storage rp = self._exc_dom.environmentModel.robotPose self._hack_() self._exc_dom.environmentModel.robotPose = rp # END HACK # Update sim dir copy of the exc self._write_xml(self._exc_dom.toxml('utf-8'), self._sim_config.exc_path.abs_path)
[docs] def update_robotpose(self, robot_id, pose): """ Edit <robotPose> tag in the exc where robotId is robot_id :param robot_id: robotId attribute for the tag :param pose: A cle_ros_msgs.msgs.Pose object (defines an object's Euler pos and orientation) :return: Tuple (True, SDF relative path) or (False, error message) to update config files """ if robot_id not in self._sim_config.robot_models: return False, "No robot exists with id equals {id}".format(id=robot_id) for tag in self._exc_dom.environmentModel.robotPose: if tag.robotId == robot_id: tag.x = pose.x tag.y = pose.y tag.z = pose.z tag.roll = pose.roll tag.pitch = pose.pitch tag.yaw = pose.yaw # HACK: Replace exc and bibi representation by reading from storage rp = self._exc_dom.environmentModel.robotPose self._hack_() self._exc_dom.environmentModel.robotPose = rp # END HACK # Update sim dir copy of the exc self._write_xml(self._exc_dom.toxml('utf-8'), self._sim_config.exc_path.abs_path) return True, "Tag updated successfully"
[docs] def add_bodymodel(self, robot_id, model_path, robot_model): """ Adds a <bodyModel> tag in the bibi :param robot_id: attribute robotId in the tag :param model_path: value() fo the tag :param robot_model_name: the name of the model is based the robot :return: """ # SDFWithPath is the type of <bodyModel> defined in the bibi schema (xsd) file tag = bibi_parser.SDFWithPath(model_path) tag.robotId = robot_id tag.model = robot_model if self._bibi_dom.bodyModel: self._bibi_dom.bodyModel.append(tag) else: self._bibi_dom.append(tag) # HACK: Replace exc and bibi representation by reading from storage bm = self._bibi_dom.bodyModel self._hack_() self._bibi_dom.bodyModel = bm # END HACK # Update sim dir copy of the bibi self._write_xml(self._bibi_dom.toxml('utf-8'), self._sim_config.bibi_path.abs_path)
[docs] def delete_bodymodel(self, robot_id): """ Deletes a <bodyModel> tag from the bibi :param robot_id: attribute robotId in the tag :return: """ del_index = None for i in range(len(self._bibi_dom.bodyModel)): if self._bibi_dom.bodyModel[i].robotId == robot_id: del_index = i break if del_index is not None: del self._bibi_dom.bodyModel[del_index] # HACK: Replace exc and bibi representation by reading from storage bm = self._bibi_dom.bodyModel self._hack_() self._bibi_dom.bodyModel = bm # END HACK # Update sim dir copy of the exc self._write_xml(self._bibi_dom.toxml('utf-8'), self._sim_config.bibi_path.abs_path)
def _prettify_xml(self, plain_text): """ Format a given xml text :param plain_text: xml text string :return: formatted xml string """ # pylint: disable=no-self-use import lxml return lxml.etree.tostring( lxml.etree.XML(plain_text), pretty_print=True, encoding='unicode' ) def _write_xml(self, plain_text, filename): """ Write xml into file. If the file exists, it overwrites the content. :param plain_text: xml text :param filename: absolute path to the file to write :return: Tuple (True, None) or (False, error) """ # pylint: disable=no-self-use, broad-except try: with open(filename, 'w') as f: try: f.write(self._prettify_xml(plain_text)) except IOError as e: return False, str(e) except Exception as e: return False, str(e) return True, None def _hack_(self): """ TODO: please remove me at earliest convenience HACK for the inconsistent proxy and backend behavior Replace exc and bibi representation by reading from storage Proxy might have changed something that backend is not aware of """ from hbp_nrp_backend.storage_client_api.StorageClient import StorageClient import urllib.request import urllib.parse import urllib.error client = StorageClient() latest_exc = client.get_file( self._sim_config.token, urllib.parse.quote_plus( self._sim_config.experiment_id), self._sim_config.exc_path.rel_path, by_name=True ) latest_bibi = client.get_file( self._sim_config.token, urllib.parse.quote_plus( self._sim_config.experiment_id), self._sim_config.bibi_path.rel_path, by_name=True ) try: self._exc_dom = exc_parser.CreateFromDocument(latest_exc) self._bibi_dom = bibi_parser.CreateFromDocument(latest_bibi) except Exception as ex: raise Exception("Something went horribly wrong while creating latest " "config objects with following exception {}".format(str(ex)))