blob: 2086b78e7af915421d8aad7293e2de6fbfddee44 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
# ----------------------------------
# SGE.py
# _JobData Class
# ----------------------------------
import logging
import pwd
import os
import re
import subprocess
import time
#
from qsubopts import qsubOptions
sgeJob_t = qsubOptions()
class _JobData:
"""
Internal helper class to manage job data from qstat
"""
def __init__(self, qstat_job_line):
# The format of the line goes like
# job-ID prior name user state submit/start at queue slots ja-task-ID
tokens = qstat_job_line.split()
assert len(tokens) >= 8, 'Not a valid qstat line: ' + qstat_job_line
# Job line must have at least 8 tokens
try:
self.id = int(tokens[0])
except ValueError:
error_msg = "Could not convert data to an integer."
raise ValueError(error_msg)
self.priority = float(tokens[1])
self.name = tokens[2]
self.user = tokens[3]
self.state = tokens[4]
self.time = ' '.join(tokens[5:7])
if '@' in qstat_job_line:
# Has queue assigned, e.g. core2.q@q2.caspian.mit.edu
self.queue = tokens[7]
self.slots = tokens[8]
if len(tokens) == 9:
self.ja_task_id = None
elif len(tokens) == 10:
self.ja_task_id = tokens[-1]
else:
raise ValueError(f"Could not parse line: {qstat_job_line}")
else:
# No queue assigned
self.slots = tokens[7]
if len(tokens) == 8:
self.ja_task_id = None
elif len(tokens) == 9:
self.ja_task_id = tokens[-1]
else:
raise ValueError(f"Could not parse line: {qstat_job_line}")
# Convert array indices ja_task_id into python list format
ja_task_id = []
if self.ja_task_id is not None:
for blob in self.ja_task_id.split(','):
# Parse data of form '193-349:1' or '1934'
x = blob.split(':')
if len(x) == 1:
ja_task_id += x
else:
subjobs, step = x
begin, end = subjobs.split('-')
ja_task_id += range(int(begin), int(end) + 1, int(step))
self._ja_tasklist = ja_task_id
def __repr__(self):
repr = ['{']
for key, value in self.__dict__.items():
if key[0] != '_':
repr.append(key + '=' + str(value))
repr.append('}')
return '\n'.join(repr)
class JobList:
"""
Internal helper class to manage job lists
"""
def __init__(self, qstat_output=None):
self._joblist = []
for line in qstat_output.split('\n')[2:-1]:
self._joblist.append(_JobData(line))
def __iter__(self):
for job in self._joblist:
yield job
def __repr__(self):
return '\n'.join([str(job) for job in self._joblist])
class SGE:
"""External system call handler for Sun Grid Engine environment."""
def __init__(self, q=None, path='', ):
logger = logging.getLogger('SGE.__init__')
if q is None:
# No queue specified. By default, submit to all available queues.
self.cmd_qconf = os.path.join(path, 'qconf')
try:
qliststr = _exec(self.cmd_qconf + ' -sql')
except IOError:
error_msg = 'Error querying queue configuration'
logger.error(error_msg)
raise IOError(error_msg)
self.q = qliststr.replace('\n', ',')[:-1]
logger.info("""Sun Grid Engine handler initialized
Queues detected: %s""", self.q)
else:
self.q = q
self.cmd_qsub = os.path.join(path, 'qsub')
self.cmd_qstat = os.path.join(path, 'qstat')
def wait(self, jobid, interval=10, name=None, pbar=None,
pbar_mode=None):
"""Waits for job running on SGE Grid Engine environment to finish.
If you are just waiting for one job, this becomes a dumb substitute for
the -sync y option which can be specified to qsub.
Inputs:
jobid
interval - Polling interval of SGE queue, in seconds. (Default: 10)
"""
logger = logging.getLogger('SGE.wait')
dowait = True
while dowait:
p = subprocess.Popen(self.cmd_qstat, shell=True,
stdout=subprocess.PIPE)
pout, _ = p.communicate()
if pbar is not None:
logger.error('Progress bar handling not implemented')
dowait = False
for line in pout.split('\n'):
t = line.split()
if len(t) >= 5 and t[0] == str(jobid):
# Find a line with useful info
if re.search(t[4], 'qwrt'):
# Job must be queued, running or being transferred
dowait = True
break
if re.search(t[4], 'acuE'):
# Job or host in error state
logger.warning('Job %d in error state', str(jobid))
if dowait:
time.sleep(interval)
if name is None:
logger.info("Time %s: waiting for jobid %s to finish",
time.ctime(), str(jobid))
else:
logger.info("Time %s: waiting for job '%s' (jobid %s) to \
finish", time.ctime(), name, str(jobid))
def submit(self, job, array=False, useenvironment=True, usecwd=True,
name=None, stdin=None, stdout=None, stderr=None,
joinstdouterr=True, nproc=1, wait=True, lammpi=True):
"""
Submits a job to SGE
Returns jobid as a number
"""
logger = logging.getLogger('SGE.submit')
logger.info("Submitting job: " + str(job) + " stdout: %s \
Stderr: %s", stdout, stderr)
# Parameters to qsub specified as the header of the job specified on
# STDIN
lamstring = lammpi and f" -pe lammpi {nproc}" or ""
qsuboptslist = ['-cwd -V ', lamstring]
if name is not None:
qsuboptslist.append('-N ' + name)
if stdin is not None:
qsuboptslist.append('-i ' + stdin)
if stdout is not None:
qsuboptslist.append('-o ' + stdout)
if stderr is not None:
qsuboptslist.append('-e ' + stderr)
if joinstdouterr:
qsuboptslist.append('-j')
if wait:
qsuboptslist.append('-sync y')
if usecwd:
qsuboptslist.append('-cwd')
if useenvironment:
qsuboptslist.append('-V')
if array is not False:
try:
n = int(array[0])
except IndexError:
n = int(array)
raise IndexError("List is empty!")
except ValueError:
error_msg = "array[0] being an out of bounds access."
logger.error(error_msg)
raise ValueError(error_msg)
try:
m = int(array[1])
except ValueError:
m = None
raise ValueError("Could not convert data to an integer.")
except IndexError:
m = None
raise IndexError("array[1] being an out of bounds access.")
try:
s = int(array[2])
except IndexError:
s = None
raise IndexError("array[2] being an out of bounds access.")
except ValueError:
s = None
raise ValueError("Could not convert data to an integer.")
if m == s is None:
qsuboptslist.append('-t %d' % n)
elif s is None:
qsuboptslist.append('-t %d-%d' % (n, m))
else:
qsuboptslist.append('-t %d-%d:%d' % (n, m, s))
qsubopts = ' '.join(qsuboptslist)
pout = _exec(self.cmd_qsub, stdin=qsubopts + '\n' + job,
print_command=False)
try:
# Next to last line should be
# "Your job 1389 (name) has been submitted"
# parse for job id
jobid = int(pout.split('\n')[-2].split()[2])
return jobid
# except (ValueErrorValueError, IndexError, AttributeError) (e):
except (ValueError, IndexError, AttributeError):
error_msg = """Error submitting SGE job:
%s
%s
Output was:
%s""" % (qsubopts, job, pout)
logger.error(error_msg)
raise IOError(error_msg)
def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]):
"""Returns a list of SGE jobids run by a specific user
Inputs
user - SGE user to poll (Default = '', i.e. current user)
qstat - path to qstat binary (Default = 'qstat')
"""
p = subprocess.Popen(self.cmd_qstat + " -u " + user, shell=True,
stdout=subprocess.PIPE)
qstat_output, _ = p.communicate()
joblist = JobList(qstat_output)
return [job for job in joblist if job.user == user]
def run_job(self, command, name='default', logfnm='default.log',
wait=True):
"""Run job on SGE with piped logging."""
jobid = self.submit(command, name=name, stdout=logfnm,
stderr=logfnm, wait=wait)
return jobid
def get_queue_instance_status(self):
"""
Get loads for each queue instance
"""
output = _exec(' '.join([self.cmd_qstat, '-f']))
data = []
for line in output.split('\n')[1:]:
t = line.split()
if len(t) != 5:
continue
nodename = t[0].split('@')[1].split('.')[0]
maxslots = int(t[2].split('/')[2])
load = float(t[3])
data.append({'name': nodename, 'maxslots': maxslots, 'load': load})
return data
def _exec(command, print_to_screen=False, logfnm=None, stdin='',
print_command=False):
"""
Runs command line using subprocess, optionally returning stdout
"""
def _call_cmd(command, stdin=''):
p = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
output, _ = p.communicate(stdin)
return output
logger = logging.getLogger('_exec')
if print_command:
logger.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s",
command, logfnm)
output = ""
if logfnm is not None:
try:
with open(logfnm, 'a') as f:
if print_command:
print(f, "Executing process: %s" % command)
output = _call_cmd(command, stdin)
f.write(output)
except IOError:
error_msg = 'Error: File: ' + str(logfnm) + ' does not appear to exist.'
logger.error(error_msg)
raise IOError(error_msg)
else:
output = _call_cmd(command, stdin)
logger.info('Output of command is:\n%s', output)
if print_to_screen:
print(output)
return output