| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| import logging as log |
| import os |
| import re |
| import subprocess |
| import tarfile |
| from pathlib import Path |
| |
| from Launcher import ErrorMessage, Launcher, LauncherError |
| from utils import VERBOSE, clean_odirs |
| |
| |
| class LsfLauncher(Launcher): |
| |
| # A hidden directory specific to a cfg, where we put individual 'job' |
| # scripts. |
| jobs_dir = {} |
| |
| # All launcher instances available for lookup. |
| jobs = {} |
| |
| # When the job completes, we try to read the job script output to determine |
| # the outcome. It may not have been completely written the first time we |
| # read it so we retry on the next poll, no more than 10 times. |
| max_poll_retries = 10 |
| |
| # TODO: Add support for build/run/cov job specific resource requirements: |
| # cpu, mem, disk, stack. |
| # TODO: Allow site-specific job resource usage setting using |
| # `DVSIM_LSF_CFG` environment variable. |
| |
| @staticmethod |
| def prepare_workspace(project, repo_top, args): |
| # Since we dispatch to remote machines, a project specific python |
| # virtualenv is exists, needs to be activated when launching the job. |
| Launcher.set_pyvenv(project) |
| if Launcher.pyvenv is None: |
| return |
| |
| # If it is already a dir, then nothing to be done. |
| if os.path.isdir(Launcher.pyvenv): |
| return |
| |
| # If not, then it needs to be a valid tarball. Extract it in the |
| # scratch area if it does not exist. |
| stem = Path(Launcher.pyvenv).stem |
| if stem.endswith("tar"): |
| stem = stem[:-4] |
| path = Path(args.scratch_root, stem) |
| if not path.is_dir(): |
| log.info("[prepare_workspace]: [pyvenv]: Extracting %s", |
| Launcher.pyvenv) |
| with tarfile.open(Launcher.pyvenv, mode='r') as tar: |
| tar.extractall(args.scratch_root) |
| log.info("[prepare_workspace]: [pyvenv]: Done: %s", path) |
| Launcher.pyvenv = path |
| |
| @staticmethod |
| def prepare_workspace_for_cfg(cfg): |
| # Create the job dir. |
| LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf", |
| cfg.timestamp) |
| clean_odirs(odir=LsfLauncher.jobs_dir[cfg], max_odirs=2) |
| os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True) |
| |
| @staticmethod |
| def make_job_script(cfg, job_name): |
| """Creates the job script. |
| |
| Once all jobs in the array are launched, the job script can be created. |
| It is a bash script that takes the job index as a single argument. |
| This index is set in the bsub command as '$LSB_JOBINDEX', which bsub |
| sets as the actual index when launching that job in the array. This |
| script is super simple - it is just a giant case statement that |
| switches on the job index to run that specific job. This preferred over |
| creating individual scripts for each job which incurs additional file |
| I/O overhead when the scratch area is on NFS, causing a slowdown. |
| |
| Returns the path to the job script. |
| """ |
| |
| lines = ["#!/usr/bin/env bash\nset -e\n"] |
| |
| # Activate the python virtualenv if it exists. |
| if Launcher.pyvenv: |
| lines += ["source {}/bin/activate\n".format(Launcher.pyvenv)] |
| |
| lines += ["case $1 in\n"] |
| for job in LsfLauncher.jobs[cfg][job_name]: |
| # Redirect the job's stdout and stderr to its log file. |
| cmd = "{} > {} 2>&1".format(job.deploy.cmd, |
| job.deploy.get_log_path()) |
| lines += [" {})\n".format(job.index), " {};;\n".format(cmd)] |
| |
| # Throw error as a sanity check if the job index is invalid. |
| lines += [ |
| " *)\n", |
| " echo \"ERROR: Illegal job index: $1\" 1>&2; exit 1;;\n", |
| "esac\n" |
| ] |
| if Launcher.pyvenv: |
| lines += ["deactivate\n"] |
| |
| job_script = Path(LsfLauncher.jobs_dir[cfg], job_name) |
| try: |
| with open(job_script, "w", encoding='utf-8') as f: |
| f.writelines(lines) |
| except IOError as e: |
| err_msg = "ERROR: Failed to write {}:\n{}".format(job_script, e) |
| LsfLauncher._post_finish_job_array(cfg, job_name, err_msg) |
| raise LauncherError(err_msg) |
| |
| log.log(VERBOSE, "[job_script]: %s", job_script) |
| return job_script |
| |
| def __init__(self, deploy): |
| super().__init__(deploy) |
| |
| # Maintain the job script output as an instance variable for polling |
| # and cleanup. |
| self.bsub_out = None |
| |
| # If we already opened the job script output file (but have not |
| # determined the outcome), then we maintain the file descriptor rather |
| # then reopening it and starting all over again on the next poll. |
| self.bsub_out_fd = None |
| self.bsub_out_err_msg = [] |
| self.bsub_out_err_msg_found = False |
| |
| # Set the job id. |
| self.job_id = None |
| |
| # Polling retry counter.. |
| self.num_poll_retries = 0 |
| |
| # Add self to the list of jobs. |
| cfg_dict = LsfLauncher.jobs.setdefault(deploy.sim_cfg, {}) |
| job_name_list = cfg_dict.setdefault(deploy.job_name, []) |
| job_name_list.append(self) |
| |
| # Job's index in the array. |
| self.index = len(job_name_list) |
| |
| def _do_launch(self): |
| # Add self to the list of jobs. |
| job_name = self.deploy.job_name |
| cfg = self.deploy.sim_cfg |
| job_total = len(LsfLauncher.jobs[cfg][job_name]) |
| |
| # The actual launching of the bsub command cannot happen until the |
| # Scheduler has dispatched ALL jobs in the array. |
| if self.index < job_total: |
| return |
| |
| job_script = self.make_job_script(cfg, job_name) |
| |
| # Update the shell's env vars with self.exports. Values in exports must |
| # replace the values in the shell's env vars if the keys match. |
| exports = os.environ.copy() |
| if self.deploy.exports: |
| exports.update(self.deploy.exports) |
| |
| # Clear the magic MAKEFLAGS variable from exports if necessary. This |
| # variable is used by recursive Make calls to pass variables from one |
| # level to the next. Here, self.cmd is a call to Make but it's |
| # logically a top-level invocation: we don't want to pollute the flow's |
| # Makefile with Make variables from any wrapper that called dvsim. |
| if 'MAKEFLAGS' in exports: |
| del exports['MAKEFLAGS'] |
| |
| self._dump_env_vars(exports) |
| |
| # TODO: Arbitrarily set the max slot-limit to 100. |
| job_array = "{}[1-{}]".format(job_name, job_total) |
| if job_total > 100: |
| job_array += "%100" |
| |
| # TODO: This needs to be moved to a HJson. |
| if self.deploy.sim_cfg.tool == "vcs": |
| job_rusage = "\'rusage[vcssim=1,vcssim_dynamic=1:duration=1]\'" |
| |
| elif self.deploy.sim_cfg.tool == "xcelium": |
| job_rusage = "\'rusage[xcelium=1,xcelium_dynamic=1:duration=1]\'" |
| |
| else: |
| job_rusage = None |
| |
| # Launch the job array. |
| cmd = [ |
| "bsub", |
| # TODO: LSF project name could be site specific! |
| "-P", |
| cfg.project, |
| "-J", |
| job_array, |
| "-oo", |
| "{}.%I.out".format(job_script), |
| "-eo", |
| "{}.%I.out".format(job_script) |
| ] |
| if self.deploy.get_timeout_mins(): |
| cmd += ["-c", self.deploy.get_timeout_mins()] |
| |
| if job_rusage: |
| cmd += ["-R", job_rusage] |
| |
| cmd += ["/usr/bin/bash {} $LSB_JOBINDEX".format(job_script)] |
| |
| try: |
| p = subprocess.run(cmd, |
| check=True, |
| timeout=60, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, |
| env=exports) |
| except subprocess.CalledProcessError as e: |
| # Need to mark all jobs in this range with this fail pattern. |
| err_msg = e.stderr.decode("utf-8").strip() |
| self._post_finish_job_array(cfg, job_name, err_msg) |
| raise LauncherError(err_msg) |
| |
| # Extract the job ID. |
| result = p.stdout.decode("utf-8").strip() |
| job_id = result.split('Job <')[1].split('>')[0] |
| if not job_id: |
| self._post_finish_job_array(cfg, job_name, "Job ID not found!") |
| raise LauncherError(err_msg) |
| |
| for job in LsfLauncher.jobs[cfg][job_name]: |
| job.bsub_out = Path("{}.{}.out".format(job_script, job.index)) |
| job.job_id = "{}[{}]".format(job_id, job.index) |
| job._link_odir("D") |
| |
| def poll(self): |
| # It is possible we may have determined the status already. |
| if self.status: |
| return self.status |
| |
| if not self.bsub_out_fd: |
| # If job id is not set, the bsub command has not been sent yet. |
| if not self.job_id: |
| return 'D' |
| |
| # We redirect the job's output to the log file, so the job script |
| # output remains empty until the point it finishes. This is a very |
| # quick way to check if the job has completed. If nothing has been |
| # written to the job script output yet (or if it is not yet |
| # created), then the job is still running. |
| try: |
| if not self.bsub_out.stat().st_size: |
| return "D" |
| except FileNotFoundError: |
| return "D" |
| |
| # If we got to this point, we can now open the job script output |
| # file for reading. |
| try: |
| self.bsub_out_fd = open(self.bsub_out, "r") |
| except IOError as e: |
| self._post_finish( |
| "F", |
| ErrorMessage( |
| line_number=None, |
| message="ERROR: Failed to open {}\n{}.".format( |
| self.bsub_out, e), |
| context=[e])) |
| return "F" |
| |
| # Now that the job has completed, we need to determine its status. |
| # |
| # If the job successfully launched and it failed, the failure message |
| # will appear in its log file (because of the stderr redirection). |
| # But, in some cases, if there is something wrong with the command |
| # itself, bsub might return immediately with an error message, which |
| # will appear in the job script output file. We want to retrieve that |
| # so that we can report the status accurately. |
| # |
| # At this point, we could run bjobs or bhist to determine the status, |
| # but it has been found to be too slow, expecially when running 1000s |
| # of jobs. Plus, we have to read the job script output anyway to look |
| # for those error messages. |
| # |
| # So we just read this file to determine both, the status and extract |
| # the error message, rather than running bjobs or bhist. But there is |
| # one more complication to deal with - if we read the file now, it is |
| # possible that it may not have been fully updated. We will try reading |
| # it anyway. If we are unable to find what we are looking for, then we |
| # will resume reading it again at the next poll. We will do this upto |
| # max_poll_retries times before giving up and flagging an error. |
| # |
| # TODO: Consider using the IBM Plarform LSF Python APIs instead. |
| # (deferred due to shortage of time / resources). |
| # TODO: Parse job telemetry data for performance insights. |
| |
| exit_code = self._get_job_exit_code() |
| if exit_code is not None: |
| self.exit_code = exit_code |
| status, err_msg = self._check_status() |
| # Prioritize error messages from bsub over the job's log file. |
| if self.bsub_out_err_msg: |
| err_msg = ErrorMessage(line_number=None, |
| message=self.bsub_out_err_msg, |
| context=[self.bsub_out_err_msg]) |
| self._post_finish(status, err_msg) |
| return status |
| |
| else: |
| self.num_poll_retries += 1 |
| # Fail the test if we have reached the max polling retries. |
| if self.num_poll_retries == LsfLauncher.max_poll_retries: |
| self._post_finish( |
| "F", "ERROR: Reached max retries while " |
| "reading job script output {} to determine" |
| " the outcome.".format(self.bsub_out)) |
| return "F" |
| |
| return "D" |
| |
| def _get_job_exit_code(self): |
| '''Read the job script output to retrieve the exit code. |
| |
| Also read the error message if any, which will appear at the beginning |
| of the log file followed by bsub's standard 'email' format output. It |
| looks something like this: |
| |
| <stderr messages> |
| ------------------------------------------------------------ |
| Sender: LSF System <...> |
| Subject: ... |
| ... |
| |
| Successfully completed. |
| <OR> |
| Exited with exit code 1. |
| |
| ... |
| |
| The extracted stderr messages are logged to self.fail_msg. The line |
| indicating whether it was successful or it failed with an exit code |
| is used to return the exit code. |
| |
| Returns the exit code if found, else None. |
| ''' |
| |
| # Job script output must have been opened already. |
| assert self.bsub_out_fd |
| |
| for line in self.bsub_out_fd: |
| if not self.bsub_out_err_msg_found: |
| m = re.match("^Sender", line) |
| if m: |
| # Pop the line before the sender line. |
| self.bsub_out_err_msg = "\n".join( |
| self.bsub_out_err_msg[:-1]) |
| self.bsub_out_err_msg_found = True |
| else: |
| self.bsub_out_err_msg.append(line.strip()) |
| |
| else: |
| m = re.match(r"^Exited with exit code (\d+).\n$", line) |
| if m: |
| return m.group(1) |
| |
| if not self.bsub_out_err_msg: |
| m = re.match(r"^Successfully completed.\n$", line) |
| if m: |
| return 0 |
| return None |
| |
| def kill(self): |
| if self.job_id: |
| try: |
| subprocess.run(["bkill", "-s", "SIGTERM", self.job_id], |
| check=True, |
| stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE) |
| except subprocess.CalledProcessError as e: |
| log.error("Failed to kill job: {}".format( |
| e.stderr.decode("utf-8").strip())) |
| else: |
| log.error("Job ID for %s not found", self.deploy.full_name) |
| |
| self._post_finish('K', "Job killed!") |
| |
| def _post_finish(self, status, err_msg): |
| if self.bsub_out_fd: |
| self.bsub_out_fd.close() |
| if self.exit_code is None: |
| self.exit_code = 0 if status == 'P' else 1 |
| super()._post_finish(status, err_msg) |
| |
| @staticmethod |
| def _post_finish_job_array(cfg, job_name, err_msg): |
| '''On LSF error, mark all jobs in this array as killed. |
| |
| err_msg is the error message indicating the cause of failure.''' |
| |
| for job in LsfLauncher.jobs[cfg][job_name]: |
| job._post_finish( |
| 'F', ErrorMessage(line_number=None, |
| message=err_msg, |
| context=[err_msg])) |