| # Copyright lowRISC contributors. | 
 | # Licensed under the Apache License, Version 2.0, see LICENSE for details. | 
 | # SPDX-License-Identifier: Apache-2.0 | 
 | # ---------------------------------- | 
 | # SGE.py | 
 | #       _JobData Class | 
 | # ---------------------------------- | 
 | import logging | 
 | import pwd | 
 | import os | 
 | import re | 
 | import subprocess | 
 | import time | 
 |  | 
 | # | 
 | from qsubopts import qsubOptions | 
 | sgeJob_t = qsubOptions() | 
 |  | 
 |  | 
 | class _JobData: | 
 |     """ | 
 |     Internal helper class to manage job data from qstat | 
 |     """ | 
 |     def __init__(self, qstat_job_line): | 
 |         # The format of the line goes like | 
 |         # job-ID prior name  user  state submit/start at queue slots ja-task-ID | 
 |  | 
 |         tokens = qstat_job_line.split() | 
 |  | 
 |         assert len(tokens) >= 8, 'Not a valid qstat line: ' + qstat_job_line | 
 |         # Job line must have at least 8 tokens | 
 |         try: | 
 |             self.id = int(tokens[0]) | 
 |         except ValueError: | 
 |             error_msg = "Could not convert data to an integer." | 
 |             raise ValueError(error_msg) | 
 |         self.priority = float(tokens[1]) | 
 |         self.name = tokens[2] | 
 |         self.user = tokens[3] | 
 |         self.state = tokens[4] | 
 |         self.time = ' '.join(tokens[5:7]) | 
 |  | 
 |         if '@' in qstat_job_line: | 
 |             # Has queue assigned, e.g. core2.q@q2.caspian.mit.edu | 
 |             self.queue = tokens[7] | 
 |             self.slots = tokens[8] | 
 |             if len(tokens) == 9: | 
 |                 self.ja_task_id = None | 
 |             elif len(tokens) == 10: | 
 |                 self.ja_task_id = tokens[-1] | 
 |             else: | 
 |                 raise ValueError(f"Could not parse line: {qstat_job_line}") | 
 |         else: | 
 |             # No queue assigned | 
 |             self.slots = tokens[7] | 
 |             if len(tokens) == 8: | 
 |                 self.ja_task_id = None | 
 |             elif len(tokens) == 9: | 
 |                 self.ja_task_id = tokens[-1] | 
 |             else: | 
 |                 raise ValueError(f"Could not parse line: {qstat_job_line}") | 
 |  | 
 |         # Convert array indices ja_task_id into python list format | 
 |         ja_task_id = [] | 
 |         if self.ja_task_id is not None: | 
 |             for blob in self.ja_task_id.split(','): | 
 |                 # Parse data of form '193-349:1' or '1934' | 
 |                 x = blob.split(':') | 
 |                 if len(x) == 1: | 
 |                     ja_task_id += x | 
 |                 else: | 
 |                     subjobs, step = x | 
 |                     begin, end = subjobs.split('-') | 
 |                     ja_task_id += range(int(begin), int(end) + 1, int(step)) | 
 |  | 
 |         self._ja_tasklist = ja_task_id | 
 |  | 
 |     def __repr__(self): | 
 |         repr = ['{'] | 
 |         for key, value in self.__dict__.items(): | 
 |             if key[0] != '_': | 
 |                 repr.append(key + '=' + str(value)) | 
 |         repr.append('}') | 
 |         return '\n'.join(repr) | 
 |  | 
 |  | 
 | class JobList: | 
 |     """ | 
 |     Internal helper class to manage job lists | 
 |     """ | 
 |  | 
 |     def __init__(self, qstat_output=None): | 
 |         self._joblist = [] | 
 |         for line in qstat_output.split('\n')[2:-1]: | 
 |             self._joblist.append(_JobData(line)) | 
 |  | 
 |     def __iter__(self): | 
 |         for job in self._joblist: | 
 |             yield job | 
 |  | 
 |     def __repr__(self): | 
 |         return '\n'.join([str(job) for job in self._joblist]) | 
 |  | 
 |  | 
 | class SGE: | 
 |     """External system call handler for Sun Grid Engine environment.""" | 
 |  | 
 |     def __init__(self, q=None, path='', ): | 
 |  | 
 |         logger = logging.getLogger('SGE.__init__') | 
 |  | 
 |         if q is None: | 
 |             # No queue specified. By default, submit to all available queues. | 
 |             self.cmd_qconf = os.path.join(path, 'qconf') | 
 |  | 
 |             try: | 
 |                 qliststr = _exec(self.cmd_qconf + ' -sql') | 
 |             except IOError: | 
 |                 error_msg = 'Error querying queue configuration' | 
 |                 logger.error(error_msg) | 
 |                 raise IOError(error_msg) | 
 |  | 
 |             self.q = qliststr.replace('\n', ',')[:-1] | 
 |  | 
 |             logger.info("""Sun Grid Engine handler initialized | 
 | Queues detected: %s""", self.q) | 
 |  | 
 |         else: | 
 |             self.q = q | 
 |  | 
 |         self.cmd_qsub = os.path.join(path, 'qsub') | 
 |         self.cmd_qstat = os.path.join(path, 'qstat') | 
 |  | 
 |     def wait(self, jobid, interval=10, name=None, pbar=None, | 
 |              pbar_mode=None): | 
 |         """Waits for job running on SGE Grid Engine environment to finish. | 
 |  | 
 |         If you are just waiting for one job, this becomes a dumb substitute for | 
 |         the -sync y option which can be specified to qsub. | 
 |  | 
 |         Inputs: | 
 |  | 
 |         jobid | 
 |         interval - Polling interval of SGE queue, in seconds. (Default: 10) | 
 |         """ | 
 |  | 
 |         logger = logging.getLogger('SGE.wait') | 
 |  | 
 |         dowait = True | 
 |         while dowait: | 
 |             p = subprocess.Popen(self.cmd_qstat, shell=True, | 
 |                                  stdout=subprocess.PIPE) | 
 |             pout, _ = p.communicate() | 
 |  | 
 |             if pbar is not None: | 
 |                 logger.error('Progress bar handling not implemented') | 
 |  | 
 |             dowait = False | 
 |             for line in pout.split('\n'): | 
 |                 t = line.split() | 
 |                 if len(t) >= 5 and t[0] == str(jobid): | 
 |                     # Find a line with useful info | 
 |                     if re.search(t[4], 'qwrt'): | 
 |                         # Job must be queued, running or being transferred | 
 |                         dowait = True | 
 |                         break | 
 |                     if re.search(t[4], 'acuE'): | 
 |                         # Job or host in error state | 
 |                         logger.warning('Job %d in error state', str(jobid)) | 
 |  | 
 |             if dowait: | 
 |                 time.sleep(interval) | 
 |                 if name is None: | 
 |                     logger.info("Time %s: waiting for jobid %s to finish", | 
 |                                 time.ctime(), str(jobid)) | 
 |                 else: | 
 |                     logger.info("Time %s: waiting for job '%s' (jobid %s) to \ | 
 | finish", time.ctime(), name, str(jobid)) | 
 |  | 
 |     def submit(self, job, array=False, useenvironment=True, usecwd=True, | 
 |                name=None, stdin=None, stdout=None, stderr=None, | 
 |                joinstdouterr=True, nproc=1, wait=True, lammpi=True): | 
 |         """ | 
 |         Submits a job to SGE | 
 |         Returns jobid as a number | 
 |         """ | 
 |  | 
 |         logger = logging.getLogger('SGE.submit') | 
 |  | 
 |         logger.info("Submitting job:   " + str(job) + " stdout: %s \ | 
 | Stderr: %s", stdout, stderr) | 
 |  | 
 |         # Parameters to qsub specified as the header of the job specified on | 
 |         # STDIN | 
 |         lamstring = lammpi and f" -pe lammpi {nproc}" or "" | 
 |         qsuboptslist = ['-cwd -V ', lamstring] | 
 |  | 
 |         if name is not None: | 
 |             qsuboptslist.append('-N ' + name) | 
 |         if stdin is not None: | 
 |             qsuboptslist.append('-i ' + stdin) | 
 |         if stdout is not None: | 
 |             qsuboptslist.append('-o ' + stdout) | 
 |         if stderr is not None: | 
 |             qsuboptslist.append('-e ' + stderr) | 
 |         if joinstdouterr: | 
 |             qsuboptslist.append('-j') | 
 |         if wait: | 
 |             qsuboptslist.append('-sync y') | 
 |         if usecwd: | 
 |             qsuboptslist.append('-cwd') | 
 |         if useenvironment: | 
 |             qsuboptslist.append('-V') | 
 |         if array is not False: | 
 |             try: | 
 |                 n = int(array[0]) | 
 |             except IndexError: | 
 |                 n = int(array) | 
 |                 raise IndexError("List is empty!") | 
 |             except ValueError: | 
 |                 error_msg = "array[0] being an out of bounds access." | 
 |                 logger.error(error_msg) | 
 |                 raise ValueError(error_msg) | 
 |             try: | 
 |                 m = int(array[1]) | 
 |             except ValueError: | 
 |                 m = None | 
 |                 raise ValueError("Could not convert data to an integer.") | 
 |             except IndexError: | 
 |                 m = None | 
 |                 raise IndexError("array[1] being an out of bounds access.") | 
 |             try: | 
 |                 s = int(array[2]) | 
 |             except IndexError: | 
 |                 s = None | 
 |                 raise IndexError("array[2] being an out of bounds access.") | 
 |             except ValueError: | 
 |                 s = None | 
 |                 raise ValueError("Could not convert data to an integer.") | 
 |             if m == s is None: | 
 |                 qsuboptslist.append('-t %d' % n) | 
 |             elif s is None: | 
 |                 qsuboptslist.append('-t %d-%d' % (n, m)) | 
 |             else: | 
 |                 qsuboptslist.append('-t %d-%d:%d' % (n, m, s)) | 
 |  | 
 |         qsubopts = ' '.join(qsuboptslist) | 
 |  | 
 |         pout = _exec(self.cmd_qsub, stdin=qsubopts + '\n' + job, | 
 |                      print_command=False) | 
 |  | 
 |         try: | 
 |             # Next to last line should be | 
 |             # "Your job 1389 (name) has been submitted" | 
 |             # parse for job id | 
 |             jobid = int(pout.split('\n')[-2].split()[2]) | 
 |             return jobid | 
 |         # except (ValueErrorValueError, IndexError, AttributeError) (e): | 
 |         except (ValueError, IndexError, AttributeError): | 
 |             error_msg = """Error submitting SGE job: | 
 | %s | 
 | %s | 
 |  | 
 | Output was: | 
 | %s""" % (qsubopts, job, pout) | 
 |             logger.error(error_msg) | 
 |             raise IOError(error_msg) | 
 |  | 
 |     def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]): | 
 |         """Returns a list of SGE jobids run by a specific user | 
 |  | 
 |         Inputs | 
 |             user - SGE user to poll (Default = '', i.e. current user) | 
 |             qstat - path to qstat binary (Default = 'qstat') | 
 |         """ | 
 |  | 
 |         p = subprocess.Popen(self.cmd_qstat + " -u " + user, shell=True, | 
 |                              stdout=subprocess.PIPE) | 
 |         qstat_output, _ = p.communicate() | 
 |         joblist = JobList(qstat_output) | 
 |         return [job for job in joblist if job.user == user] | 
 |  | 
 |     def run_job(self, command, name='default', logfnm='default.log', | 
 |                 wait=True): | 
 |         """Run job on SGE with piped logging.""" | 
 |         jobid = self.submit(command, name=name, stdout=logfnm, | 
 |                             stderr=logfnm, wait=wait) | 
 |         return jobid | 
 |  | 
 |     def get_queue_instance_status(self): | 
 |         """ | 
 |         Get loads for each queue instance | 
 |         """ | 
 |         output = _exec(' '.join([self.cmd_qstat, '-f'])) | 
 |  | 
 |         data = [] | 
 |         for line in output.split('\n')[1:]: | 
 |             t = line.split() | 
 |             if len(t) != 5: | 
 |                 continue | 
 |  | 
 |             nodename = t[0].split('@')[1].split('.')[0] | 
 |             maxslots = int(t[2].split('/')[2]) | 
 |             load = float(t[3]) | 
 |  | 
 |             data.append({'name': nodename, 'maxslots': maxslots, 'load': load}) | 
 |  | 
 |         return data | 
 |  | 
 |  | 
 | def _exec(command, print_to_screen=False, logfnm=None, stdin='', | 
 |           print_command=False): | 
 |     """ | 
 |     Runs command line using subprocess, optionally returning stdout | 
 |     """ | 
 |     def _call_cmd(command, stdin=''): | 
 |         p = subprocess.Popen(command, shell=True, stdin=subprocess.PIPE, | 
 |                              stdout=subprocess.PIPE, | 
 |                              stderr=subprocess.STDOUT) | 
 |         output, _ = p.communicate(stdin) | 
 |         return output | 
 |  | 
 |     logger = logging.getLogger('_exec') | 
 |  | 
 |     if print_command: | 
 |         logger.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s", | 
 |                     command, logfnm) | 
 |  | 
 |     output = "" | 
 |     if logfnm is not None: | 
 |         try: | 
 |             with open(logfnm, 'a') as f: | 
 |                 if print_command: | 
 |                     print(f, "Executing process: %s" % command) | 
 |                 output = _call_cmd(command, stdin) | 
 |                 f.write(output) | 
 |         except IOError: | 
 |             error_msg = 'Error: File: ' + str(logfnm) + ' does not appear to exist.' | 
 |             logger.error(error_msg) | 
 |             raise IOError(error_msg) | 
 |     else: | 
 |         output = _call_cmd(command, stdin) | 
 |  | 
 |     logger.info('Output of command is:\n%s', output) | 
 |  | 
 |     if print_to_screen: | 
 |         print(output) | 
 |  | 
 |     return output |