| # Copyright lowRISC contributors. | 
 | # Licensed under the Apache License, Version 2.0, see LICENSE for details. | 
 | # SPDX-License-Identifier: Apache-2.0 | 
 |  | 
 | import logging as log | 
 | import os | 
 | import re | 
 | import subprocess | 
 | import tarfile | 
 | from pathlib import Path | 
 |  | 
 | from Launcher import ErrorMessage, Launcher, LauncherError | 
 | from utils import VERBOSE, clean_odirs | 
 |  | 
 |  | 
 | class LsfLauncher(Launcher): | 
 |  | 
 |     # A hidden directory specific to a cfg, where we put individual 'job' | 
 |     # scripts. | 
 |     jobs_dir = {} | 
 |  | 
 |     # All launcher instances available for lookup. | 
 |     jobs = {} | 
 |  | 
 |     # When the job completes, we try to read the job script output to determine | 
 |     # the outcome. It may not have been completely written the first time we | 
 |     # read it so we retry on the next poll, no more than 10 times. | 
 |     max_poll_retries = 10 | 
 |  | 
 |     # TODO: Add support for build/run/cov job specific resource requirements: | 
 |     #       cpu, mem, disk, stack. | 
 |     # TODO: Allow site-specific job resource usage setting using | 
 |     #       `DVSIM_LSF_CFG` environment variable. | 
 |  | 
 |     @staticmethod | 
 |     def prepare_workspace(project, repo_top, args): | 
 |         # Since we dispatch to remote machines, a project specific python | 
 |         # virtualenv is exists, needs to be activated when launching the job. | 
 |         Launcher.set_pyvenv(project) | 
 |         if Launcher.pyvenv is None: | 
 |             return | 
 |  | 
 |         # If it is already a dir, then nothing to be done. | 
 |         if os.path.isdir(Launcher.pyvenv): | 
 |             return | 
 |  | 
 |         # If not, then it needs to be a valid tarball. Extract it in the | 
 |         # scratch area if it does not exist. | 
 |         stem = Path(Launcher.pyvenv).stem | 
 |         if stem.endswith("tar"): | 
 |             stem = stem[:-4] | 
 |         path = Path(args.scratch_root, stem) | 
 |         if not path.is_dir(): | 
 |             log.info("[prepare_workspace]: [pyvenv]: Extracting %s", | 
 |                      Launcher.pyvenv) | 
 |             with tarfile.open(Launcher.pyvenv, mode='r') as tar: | 
 |                 tar.extractall(args.scratch_root) | 
 |             log.info("[prepare_workspace]: [pyvenv]: Done: %s", path) | 
 |         Launcher.pyvenv = path | 
 |  | 
 |     @staticmethod | 
 |     def prepare_workspace_for_cfg(cfg): | 
 |         # Create the job dir. | 
 |         LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf", | 
 |                                          cfg.timestamp) | 
 |         clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2) | 
 |         os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True) | 
 |  | 
 |     @staticmethod | 
 |     def make_job_script(cfg, job_name): | 
 |         """Creates the job script. | 
 |  | 
 |         Once all jobs in the array are launched, the job script can be created. | 
 |         It is a bash script that takes the job index as a single argument. | 
 |         This index is set in the bsub command as '$LSB_JOBINDEX', which bsub | 
 |         sets as the actual index when launching that job in the array. This | 
 |         script is super simple - it is just a giant case statement that | 
 |         switches on the job index to run that specific job. This preferred over | 
 |         creating individual scripts for each job which incurs additional file | 
 |         I/O overhead when the scratch area is on NFS, causing a slowdown. | 
 |  | 
 |         Returns the path to the job script. | 
 |         """ | 
 |  | 
 |         lines = ["#!/usr/bin/env bash\nset -e\n"] | 
 |  | 
 |         # Activate the python virtualenv if it exists. | 
 |         if Launcher.pyvenv: | 
 |             lines += ["source {}/bin/activate\n".format(Launcher.pyvenv)] | 
 |  | 
 |         lines += ["case $1 in\n"] | 
 |         for job in LsfLauncher.jobs[cfg][job_name]: | 
 |             # Redirect the job's stdout and stderr to its log file. | 
 |             cmd = "{} > {} 2>&1".format(job.deploy.cmd, | 
 |                                         job.deploy.get_log_path()) | 
 |             lines += ["  {})\n".format(job.index), "    {};;\n".format(cmd)] | 
 |  | 
 |         # Throw error as a sanity check if the job index is invalid. | 
 |         lines += [ | 
 |             "  *)\n", | 
 |             "    echo \"ERROR: Illegal job index: $1\" 1>&2; exit 1;;\n", | 
 |             "esac\n" | 
 |         ] | 
 |         if Launcher.pyvenv: | 
 |             lines += ["deactivate\n"] | 
 |  | 
 |         job_script = Path(LsfLauncher.jobs_dir[cfg], job_name) | 
 |         try: | 
 |             with open(job_script, "w", encoding='utf-8') as f: | 
 |                 f.writelines(lines) | 
 |         except IOError as e: | 
 |             err_msg = "ERROR: Failed to write {}:\n{}".format(job_script, e) | 
 |             LsfLauncher._post_finish_job_array(cfg, job_name, err_msg) | 
 |             raise LauncherError(err_msg) | 
 |  | 
 |         log.log(VERBOSE, "[job_script]: %s", job_script) | 
 |         return job_script | 
 |  | 
 |     def __init__(self, deploy): | 
 |         super().__init__(deploy) | 
 |  | 
 |         # Maintain the job script output as an instance variable for polling | 
 |         # and cleanup. | 
 |         self.bsub_out = None | 
 |  | 
 |         # If we already opened the job script output file (but have not | 
 |         # determined the outcome), then we maintain the file descriptor rather | 
 |         # then reopening it and starting all over again on the next poll. | 
 |         self.bsub_out_fd = None | 
 |         self.bsub_out_err_msg = [] | 
 |         self.bsub_out_err_msg_found = False | 
 |  | 
 |         # Set the job id. | 
 |         self.job_id = None | 
 |  | 
 |         # Polling retry counter.. | 
 |         self.num_poll_retries = 0 | 
 |  | 
 |         # Add self to the list of jobs. | 
 |         cfg_dict = LsfLauncher.jobs.setdefault(deploy.sim_cfg, {}) | 
 |         job_name_list = cfg_dict.setdefault(deploy.job_name, []) | 
 |         job_name_list.append(self) | 
 |  | 
 |         # Job's index in the array. | 
 |         self.index = len(job_name_list) | 
 |  | 
 |     def _do_launch(self): | 
 |         # Add self to the list of jobs. | 
 |         job_name = self.deploy.job_name | 
 |         cfg = self.deploy.sim_cfg | 
 |         job_total = len(LsfLauncher.jobs[cfg][job_name]) | 
 |  | 
 |         # The actual launching of the bsub command cannot happen until the | 
 |         # Scheduler has dispatched ALL jobs in the array. | 
 |         if self.index < job_total: | 
 |             return | 
 |  | 
 |         job_script = self.make_job_script(cfg, job_name) | 
 |  | 
 |         # Update the shell's env vars with self.exports. Values in exports must | 
 |         # replace the values in the shell's env vars if the keys match. | 
 |         exports = os.environ.copy() | 
 |         if self.deploy.exports: | 
 |             exports.update(self.deploy.exports) | 
 |  | 
 |         # Clear the magic MAKEFLAGS variable from exports if necessary. This | 
 |         # variable is used by recursive Make calls to pass variables from one | 
 |         # level to the next. Here, self.cmd is a call to Make but it's | 
 |         # logically a top-level invocation: we don't want to pollute the flow's | 
 |         # Makefile with Make variables from any wrapper that called dvsim. | 
 |         if 'MAKEFLAGS' in exports: | 
 |             del exports['MAKEFLAGS'] | 
 |  | 
 |         self._dump_env_vars(exports) | 
 |  | 
 |         # TODO: Arbitrarily set the max slot-limit to 100. | 
 |         job_array = "{}[1-{}]".format(job_name, job_total) | 
 |         if job_total > 100: | 
 |             job_array += "%100" | 
 |  | 
 |         # TODO: This needs to be moved to a HJson. | 
 |         if self.deploy.sim_cfg.tool == "vcs": | 
 |             job_rusage = "\'rusage[vcssim=1,vcssim_dynamic=1:duration=1]\'" | 
 |  | 
 |         elif self.deploy.sim_cfg.tool == "xcelium": | 
 |             job_rusage = "\'rusage[xcelium=1,xcelium_dynamic=1:duration=1]\'" | 
 |  | 
 |         else: | 
 |             job_rusage = None | 
 |  | 
 |         # Launch the job array. | 
 |         cmd = [ | 
 |             "bsub", | 
 |             # TODO: LSF project name could be site specific! | 
 |             "-P", | 
 |             cfg.project, | 
 |             "-J", | 
 |             job_array, | 
 |             "-oo", | 
 |             "{}.%I.out".format(job_script), | 
 |             "-eo", | 
 |             "{}.%I.out".format(job_script) | 
 |         ] | 
 |         if self.deploy.get_timeout_mins(): | 
 |             cmd += ["-c", self.deploy.get_timeout_mins()] | 
 |  | 
 |         if job_rusage: | 
 |             cmd += ["-R", job_rusage] | 
 |  | 
 |         cmd += ["/usr/bin/bash {} $LSB_JOBINDEX".format(job_script)] | 
 |  | 
 |         try: | 
 |             p = subprocess.run(cmd, | 
 |                                check=True, | 
 |                                timeout=60, | 
 |                                stdout=subprocess.PIPE, | 
 |                                stderr=subprocess.PIPE, | 
 |                                env=exports) | 
 |         except subprocess.CalledProcessError as e: | 
 |             # Need to mark all jobs in this range with this fail pattern. | 
 |             err_msg = e.stderr.decode("utf-8").strip() | 
 |             self._post_finish_job_array(cfg, job_name, err_msg) | 
 |             raise LauncherError(err_msg) | 
 |  | 
 |         # Extract the job ID. | 
 |         result = p.stdout.decode("utf-8").strip() | 
 |         job_id = result.split('Job <')[1].split('>')[0] | 
 |         if not job_id: | 
 |             self._post_finish_job_array(cfg, job_name, "Job ID not found!") | 
 |             raise LauncherError(err_msg) | 
 |  | 
 |         for job in LsfLauncher.jobs[cfg][job_name]: | 
 |             job.bsub_out = Path("{}.{}.out".format(job_script, job.index)) | 
 |             job.job_id = "{}[{}]".format(job_id, job.index) | 
 |             job._link_odir("D") | 
 |  | 
 |     def poll(self): | 
 |         # It is possible we may have determined the status already. | 
 |         if self.status: | 
 |             return self.status | 
 |  | 
 |         if not self.bsub_out_fd: | 
 |             # If job id is not set, the bsub command has not been sent yet. | 
 |             if not self.job_id: | 
 |                 return 'D' | 
 |  | 
 |             # We redirect the job's output to the log file, so the job script | 
 |             # output remains empty until the point it finishes. This is a very | 
 |             # quick way to check if the job has completed. If nothing has been | 
 |             # written to the job script output yet (or if it is not yet | 
 |             # created), then the job is still running. | 
 |             try: | 
 |                 if not self.bsub_out.stat().st_size: | 
 |                     return "D" | 
 |             except FileNotFoundError: | 
 |                 return "D" | 
 |  | 
 |             # If we got to this point,  we can now open the job script output | 
 |             # file for reading. | 
 |             try: | 
 |                 self.bsub_out_fd = open(self.bsub_out, "r") | 
 |             except IOError as e: | 
 |                 self._post_finish( | 
 |                     "F", | 
 |                     ErrorMessage( | 
 |                         line_number=None, | 
 |                         message="ERROR: Failed to open {}\n{}.".format( | 
 |                             self.bsub_out, e), | 
 |                         context=[e])) | 
 |                 return "F" | 
 |  | 
 |         # Now that the job has completed, we need to determine its status. | 
 |         # | 
 |         # If the job successfully launched and it failed, the failure message | 
 |         # will appear in its log file (because of the stderr redirection). | 
 |         # But, in some cases, if there is something wrong with the command | 
 |         # itself, bsub might return immediately with an error message, which | 
 |         # will appear in the job script output file. We want to retrieve that | 
 |         # so that we can report the status accurately. | 
 |         # | 
 |         # At this point, we could run bjobs or bhist to determine the status, | 
 |         # but it has been found to be too slow, expecially when running 1000s | 
 |         # of jobs. Plus, we have to read the job script output anyway to look | 
 |         # for those error messages. | 
 |         # | 
 |         # So we just read this file to determine both, the status and extract | 
 |         # the error message, rather than running bjobs or bhist. But there is | 
 |         # one more complication to deal with - if we read the file now, it is | 
 |         # possible that it may not have been fully updated. We will try reading | 
 |         # it anyway. If we are unable to find what we are looking for, then we | 
 |         # will resume reading it again at the next poll. We will do this upto | 
 |         # max_poll_retries times before giving up and flagging an error. | 
 |         # | 
 |         # TODO: Consider using the IBM Plarform LSF Python APIs instead. | 
 |         #       (deferred due to shortage of time / resources). | 
 |         # TODO: Parse job telemetry data for performance insights. | 
 |  | 
 |         exit_code = self._get_job_exit_code() | 
 |         if exit_code is not None: | 
 |             self.exit_code = exit_code | 
 |             status, err_msg = self._check_status() | 
 |             # Prioritize error messages from bsub over the job's log file. | 
 |             if self.bsub_out_err_msg: | 
 |                 err_msg = ErrorMessage(line_number=None, | 
 |                                        message=self.bsub_out_err_msg, | 
 |                                        context=[self.bsub_out_err_msg]) | 
 |             self._post_finish(status, err_msg) | 
 |             return status | 
 |  | 
 |         else: | 
 |             self.num_poll_retries += 1 | 
 |             # Fail the test if we have reached the max polling retries. | 
 |             if self.num_poll_retries == LsfLauncher.max_poll_retries: | 
 |                 self._post_finish( | 
 |                     "F", "ERROR: Reached max retries while " | 
 |                     "reading job script output {} to determine" | 
 |                     " the outcome.".format(self.bsub_out)) | 
 |                 return "F" | 
 |  | 
 |         return "D" | 
 |  | 
 |     def _get_job_exit_code(self): | 
 |         '''Read the job script output to retrieve the exit code. | 
 |  | 
 |         Also read the error message if any, which will appear at the beginning | 
 |         of the log file followed by bsub's standard 'email' format output. It | 
 |         looks something like this: | 
 |  | 
 |             <stderr messages> | 
 |             ------------------------------------------------------------ | 
 |             Sender: LSF System <...> | 
 |             Subject: ... | 
 |             ... | 
 |  | 
 |             Successfully completed. | 
 |             <OR> | 
 |             Exited with exit code 1. | 
 |  | 
 |             ... | 
 |  | 
 |         The extracted stderr messages are logged to self.fail_msg. The line | 
 |         indicating whether it was successful or it failed with an exit code | 
 |         is used to return the exit code. | 
 |  | 
 |         Returns the exit code if found, else None. | 
 |         ''' | 
 |  | 
 |         # Job script output must have been opened already. | 
 |         assert self.bsub_out_fd | 
 |  | 
 |         for line in self.bsub_out_fd: | 
 |             if not self.bsub_out_err_msg_found: | 
 |                 m = re.match("^Sender", line) | 
 |                 if m: | 
 |                     # Pop the line before the sender line. | 
 |                     self.bsub_out_err_msg = "\n".join( | 
 |                         self.bsub_out_err_msg[:-1]) | 
 |                     self.bsub_out_err_msg_found = True | 
 |                 else: | 
 |                     self.bsub_out_err_msg.append(line.strip()) | 
 |  | 
 |             else: | 
 |                 m = re.match(r"^Exited with exit code (\d+).\n$", line) | 
 |                 if m: | 
 |                     return m.group(1) | 
 |  | 
 |                 if not self.bsub_out_err_msg: | 
 |                     m = re.match(r"^Successfully completed.\n$", line) | 
 |                     if m: | 
 |                         return 0 | 
 |         return None | 
 |  | 
 |     def kill(self): | 
 |         if self.job_id: | 
 |             try: | 
 |                 subprocess.run(["bkill", "-s", "SIGTERM", self.job_id], | 
 |                                check=True, | 
 |                                stdout=subprocess.PIPE, | 
 |                                stderr=subprocess.PIPE) | 
 |             except subprocess.CalledProcessError as e: | 
 |                 log.error("Failed to kill job: {}".format( | 
 |                     e.stderr.decode("utf-8").strip())) | 
 |         else: | 
 |             log.error("Job ID for %s not found", self.deploy.full_name) | 
 |  | 
 |         self._post_finish('K', "Job killed!") | 
 |  | 
 |     def _post_finish(self, status, err_msg): | 
 |         if self.bsub_out_fd: | 
 |             self.bsub_out_fd.close() | 
 |         if self.exit_code is None: | 
 |             self.exit_code = 0 if status == 'P' else 1 | 
 |         super()._post_finish(status, err_msg) | 
 |  | 
 |     @staticmethod | 
 |     def _post_finish_job_array(cfg, job_name, err_msg): | 
 |         '''On LSF error, mark all jobs in this array as killed. | 
 |  | 
 |         err_msg is the error message indicating the cause of failure.''' | 
 |  | 
 |         for job in LsfLauncher.jobs[cfg][job_name]: | 
 |             job._post_finish( | 
 |                 'F', ErrorMessage(line_number=None, | 
 |                                   message=err_msg, | 
 |                                   context=[err_msg])) |