# ---LICENSE-BEGIN - DO NOT CHANGE OR MOVE THIS HEADER
# This file is part of the Neurorobotics Platform software
# Copyright (C) 2014,2015,2016,2017 Human Brain Project
# https://www.humanbrainproject.eu
#
# The Human Brain Project is a European Commission funded project
# in the frame of the Horizon2020 FET Flagship plan.
# http://ec.europa.eu/programmes/horizon2020/en/h2020-section/fet-flagships
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# ---LICENSE-END
#!/usr/bin/env python
"""
This script contains common code to launch services on the Lugano viz cluster.
"""
from builtins import object
import re
import pexpect
import logging
import os
import datetime
import subprocess
# Info messages sent to the notificator will be forwarded as notifications
# this does not require a cle import dependency in commons
notificator = logging.getLogger('hbp_nrp_cle.user_notifications')
logger = notificator.parent
# This will be the method called by the pexpect object to log.
# pexpect is expecting kwargs unused argument.
# FIXME inconsistent-return-statements
def _log_write(*args, **kwargs): # pylint: disable=unused-argument, inconsistent-return-statements
"""
Translation between pexpect log to file mechanism and python logging module
"""
content = args[0]
# let's ignore other params, pexpect only use one arg AFAIK
if content in [' ', '', '\n', '\r', '\r\n']:
return # don't log empty lines
for eol in ['\r\n', '\r', '\n']:
# remove ending EOL, the logger will add it anyway
content = re.sub(r'\%s$' % eol, '', content)
return logger.info(content) # call the logger info method with the reworked content
def _set_up_logger():
"""
Configure the root logger of the application
:param: logfile_name: name of the file created to collect logs
"""
# give the logger the methods required by pexpect
logger.write = _log_write
logger.flush = lambda: None
_set_up_logger()
[docs]class LuganoVizCluster(object):
"""
Represents a SLURM allocation instance running on the Lugano viz cluster.
There is a wide usage of pexpect in this class because of the remote connections via
ssh needed to access the Lugano machines. Every time a remote command is launched,
expect() is used to match strings with the shell output in order to understand its status.
"""
# -M option is necessary to allow to spawn child ssh connections for doing file transfer
# -K option means that we are using Kerberos
CLUSTER_SSH = 'ssh -M -K bbpnrsoa@{node}.cscs.ch'
CLUSTER_SLURM_FRONTEND = 'bbpviz1'
CLUSTER_DIR_COPY = 'scp -r {src} bbpnrsoa@{node}.cscs.ch:{trg}'
# SLURM salloc calls allocates a node on the cluster. From salloc man page:
#
# salloc - Obtain a SLURM job allocation (a set of nodes), execute a command,and then release
# the allocation when the command is finished.
# SYNOPSIS
# salloc [options] [<command> [command args]]
#
# -c, --cpus-per-task=<ncpus>
# Advise the SLURM controller that ensuing job steps will require ncpusnumber of processors
# per task. Without this option, the controller willjust try to allocate one processor per
# task.
# For instance,consider an application that has 4 tasks, each requiring 3 processors. If
# ourcluster is comprised of quad-processors nodes and we simply ask for12 processors, the
# controller might give us only 3 nodes. However, by usingthe --cpus-per-task=3 options, the
# controller knows that each task requires3 processors on the same node, and the controller
# will grant an allocationof 4 nodes, one for each of the 4 tasks.
#
# -I, --immediate[=<seconds>]
# exit if resources are not available within thetime period specified.If no argument is given,
# resources must be available immediatelyfor the request to succeed.By default, --immediate is
# off, and the commandwill block until resources become available. Since this option'sargument
# is optional, for proper parsing the single letter option mustbe followed immediately with the
# value and not include a space betweenthem. For example "-I60" and not "-I 60".
#
# --gres=<list>
# Specifies a comma delimited list of generic consumable resources.The format of each entry on
# the list is "name[:count[*cpu]]".The name is that of the consumable resource.The count is the
# number of those resources with a default value of 1.The specified resources will be allocated
# to the job on each nodeallocated unless "*cpu" is appended, in which case the resourceswill
# be allocated on a per cpu basis.The available generic consumable resources is configurable
# by the systemadministrator.A list of available generic consumable resources will be printed
# and thecommand will exit if the option argument is "help".Examples of use
# include "--gres=gpus:2*cpu,disk=40G" and "--gres=help".
#
# -t, --time=<time>
# Set a limit on the total run time of the job allocation. If therequested time limit exceeds
# the partition's time limit, the job willbe left in a PENDING state (possibly indefinitely).
# The default timelimit is the partition's default time limit. When the time limit is reached,
# each task in each job step is sent SIGTERM followed by SIGKILL. Theinterval between signals
# is specified by the SLURM configurationparameter KillWait. A time limit of zero requests
# that no timelimit be imposed. Acceptable time formats include "minutes","minutes:seconds",
# "hours:minutes:seconds", "days-hours","days-hours:minutes" and "days-hours:minutes:seconds".
#
# -p, --partition=<partition_names>
# Request a specific partition for the resource allocation. If not specified,the default
# behavior is to allow the slurm controller to select the defaultpartition as designated by
# the system administrator. If the job can use morethan one partition, specify their names
# in a comma separate list and the oneoffering earliest initiation will be used.
#
# -A, --account=<account>
# Charge resources used by this job to specified account.The account is an arbitrary string.
# The account name maybe changed after job submission using the scontrolcommand.
#
# --reservation=<name>
# Allocate resources for the job from the named reservation.
ALLOCATION_TIME = datetime.timedelta(hours=10)
DEALLOCATION_COMMAND = 'scancel %s'
CURRENT_NODES_COMMAND = 'squeue -u bbpnrsoa -t PENDING,RUNNING -h -o "%N"'
NODE_DOMAIN = '.cscs.ch'
# Timeout used for pexpect ssh connection calls.
TIMEOUT = 20
# Timeout used for pexpect calls that should return immediately (default pexpect timeout is 30
# seconds).
SMALL_TIMEOUT = 2
def __init__(self, processes, gpus=0, timezone=None, reservation=None):
# actual allocation info on cluster
self._allocation_process = None
self._job_ID = None
self._node = None
# Holds the state of the SLURM job. The states are defined in SLURM.
self._state = "UNDEFINED"
# common temporary directory, removed on deallocation
self._tmp_dir = None
# construct the allocation command based on given parameters
self._allocation_time = None
if timezone is not None:
self._allocation_time = datetime.datetime.now(
timezone) + LuganoVizCluster.ALLOCATION_TIME
if reservation is not None:
reservation = str(reservation)
else:
reservation = ''
LuganoVizCluster.ALLOCATION_COMMAND = (
"salloc --immediate=25" +
" --time=" + str(LuganoVizCluster.ALLOCATION_TIME) +
" --reservation=" + reservation +
" -p interactive -c " + str(processes) +
" --account=proj30 " +
(" --gres=gpu:" + str(gpus) if gpus > 0 else "")
)
def _spawn_ssh_SLURM_frontend(self):
"""
Return a pexpect object connected to the SLURM frontend.
SLURM (Simple Linux Utility for Resource Management) is the entry point to
allocate or manage jobs on the cluster.
"""
return self._spawn_ssh(self.CLUSTER_SLURM_FRONTEND)
def _spawn_ssh_node(self):
"""
Return a pexpect object connected to the allocated node.
"""
return self._spawn_ssh(self._node)
def _spawn_ssh(self, target):
"""
Return a pexpect object connected to the target node.
"""
ssh_SLURM_frontend_process = pexpect.spawn('bash',
logfile=logger)
ssh_SLURM_frontend_process.sendline(self.CLUSTER_SSH.format(node=target))
result = ssh_SLURM_frontend_process.expect(['[bbpnrsoa@%s ~]$' % target,
'password',
pexpect.TIMEOUT], self.TIMEOUT)
if result == 1:
raise Exception("SLURM front-end node can't be used without password.")
if result == 2:
raise Exception("Cannot connect to the SLURM front-end node.")
return ssh_SLURM_frontend_process
def _allocate_job(self, reuse_nodes=True):
"""
Allocate a new job on the cluster. Return a pexpect object with a ssh
connection to the allocated node. If once call exit on this object, the
allocation will be cancelled.
The reuse paramter specified if we can launch our process on already
allocated nodes.
"""
notificator.info("Requesting resources on the cluster")
self._allocation_process = self._spawn_ssh_SLURM_frontend()
# clear the buffer before issuing the commands, otherwise parsing the final
# list may fail with ssh banner header info filling the buffer
result = self._allocation_process.expect([pexpect.TIMEOUT,
'[bbpnrsoa@bbpviz1 ~]$'], self.TIMEOUT)
if result == 0:
raise Exception("Cannot connect to the SLURM front-end node.")
# if we cannot reuse nodes, generate a list of allocated nodes
used_nodes = None
if not reuse_nodes:
# determine which nodes are already in use, we can't currently support
# running multiple gzserver instances on the same backend (yet)
self._allocation_process.sendline(self.CURRENT_NODES_COMMAND)
result = self._allocation_process.expect([pexpect.TIMEOUT,
'[bbpnrsoa@bbpviz1 ~]$'], self.TIMEOUT)
if result == 0:
raise Exception("Cannot retrieve list of currently allocated SLURM nodes.")
# get the list of current nodes between our command and the new prompt
used_nodes = self._allocation_process.before.replace('\r', '') # replace all ^M
used_nodes = used_nodes.rsplit(self.CURRENT_NODES_COMMAND, 1)[1]
used_nodes = used_nodes.split('[bbpnrsoa@bbpviz1 ~]$', 1)[0].strip()
used_nodes = used_nodes.replace('\n', ',')
# exclude any currently used nodes if requested, otherwise allow any
if used_nodes:
logger.info('Currently allocated bbpnrsoa nodes: %s' % used_nodes)
self._allocation_process.sendline(self.ALLOCATION_COMMAND + (' -x %s' % used_nodes))
else:
self._allocation_process.sendline(self.ALLOCATION_COMMAND)
result = self._allocation_process.expect(['Granted job allocation ([0-9]+)',
'Submitted batch job [0-9]+',
'error: spank-auks: cred forwarding failed',
'error: Unable to allocate resources',
'error: .+'])
if result == 2:
raise Exception("Kerberos authentication missing")
if result == 3:
raise Exception("No resources available on the cluster. "
"Try again later.")
if result == 4:
raise Exception("Job allocation failed: " + str(self._allocation_process.after))
# Find out which node has been allocated
self._job_ID = self._allocation_process.match.groups()[0]
self._allocation_process.sendline("scontrol show job " + str(self._job_ID))
self._allocation_process.expect('JobState=[A-Z]+')
# FIXME unsubscriptable-object
self._state = self._allocation_process.after[9:] # pylint: disable=unsubscriptable-object
if self._state != 'RUNNING':
raise Exception("Job is not running.")
self._allocation_process.expect(r' NodeList=(\w+)')
self._node = self._allocation_process.match.groups()[0]
def _deallocate_job(self):
"""
Deallocate a job previously allocated through __allocate_job. The idea is
that exiting from the terminal opened through the allocation causes SLURM
to complete the Job.
"""
if self._allocation_process is not None:
self._allocation_process.sendline('exit')
self._allocation_process.terminate()
self._allocation_process = None
self._job_ID = None
self._state = "UNDEFINED"
self._node = None
def _copy_to_remote(self, local_path):
"""
Create a temporary directory if needed and copy remote files to it.
"""
# create a tmp dir on the remote host if we have not already
if not self._tmp_dir:
copy_process = self._spawn_ssh_node()
result = copy_process.expect(['[bbpnrsoa@%s ~]$' % self._node,
pexpect.TIMEOUT], self.TIMEOUT)
if result == 1:
raise Exception("Cannot connect to the node.")
copy_process.sendline('tmpdir=$(mktemp -d)')
copy_process.sendline('echo $tmpdir')
# Pexpect compiles strings to regular expressions
copy_process.expect(r'echo \$tmpdir')
copy_process.readline()
self._tmp_dir = copy_process.readline().rstrip()
copy_process.terminate()
# This is done outside the existing ssh connection, but will reuse the ssh connection
scp_cmd = self.CLUSTER_DIR_COPY.format(node=self._node, src=local_path, trg=self._tmp_dir)
logger.info(scp_cmd)
result = subprocess.call(scp_cmd.split())
self._check_scp_result(result)
def _clean_remote_files(self):
"""
Remove the temporary remote working files
"""
if self._node is not None and self._allocation_process is not None:
clean_process = self._spawn_ssh_node()
clean_process.sendline('rm -rf /tmp/.X*')
if self._tmp_dir is not None:
clean_process.sendline('rm -rf {0}'.format(self._tmp_dir))
clean_process.terminate()
@staticmethod
def _check_scp_result(result):
"""
Checks the result of an scp call and raise an exception in case of a problem
:param result: the return code of the scp execution
"""
if result != 0:
if result == 9:
error = "File transfer protocol mismatch"
elif result == 10:
error = "File not found"
elif result == 65:
error = "Host did not allow connection"
elif result == 66:
error = "General ssh protocol error"
else:
error = "Unknown error"
raise Exception("The robot could not be copied to the remote cluster: " + error)
@staticmethod
def _configure_environment(process):
"""
Configures the given process by setting necessary ENVIRONMENT and version variables
on the remote node as set by puppet on the backend.
:param process: the process to send commands to
"""
# determine deployment environment based on an environment variable defined by Puppet
# (either dev or staging)
environment = os.environ.get('ENVIRONMENT')
# set the ENVIRONMENT variable on the cluster side in order
# to load the appropriate modules there.
process.sendline('export ENVIRONMENT=' + environment)
# load modules versions specified in the nrp_variables script generated by Puppet
# (versions are environment dependent)
if environment == 'staging':
with open(os.environ.get('NRP_VARIABLES_PATH')) as f:
content = f.readlines()
versions = [x.strip() for x in content if '_VERSION=' in x]
for v in versions:
process.sendline(v)
# access the dev or staging project resources
proj_path = '/gpfs/bbp.cscs.ch/project/proj30/neurorobotics/%s/' % environment
return proj_path
[docs] def stop(self):
"""
Delete any temporary directory files and deallocate the node.
"""
# cluster node cleanup (this can fail, but make sure we always release the job below)
try:
# delete remote locks and any files
notificator.info('Cleaning up temporary files on the cluster node')
self._clean_remote_files()
# pylint: disable=broad-except
except Exception:
logger.exception('Error cleaning up cluster node.')
finally:
self._tmp_dir = None
# SLURM cleanup (not on the cluster node), always try even if cluster cleanup fails
if self._allocation_process:
notificator.info('Releasing resources on the cluster')
self._deallocate_job()