[dvsim] Cosmetic updates to launcher methods
- Rearranged APIs to make them uniform
- Updated `_has_passed()`
- Renamed to `_check_status()`
- Returns a tuple as opposed to bool
- Tuple returned is status, err_msg, where status is "P" or "F"
- Makes it easy to pick the 'right' error message to report
especially when there are multiple points of failure in more complex
launcher system such as LSF
- _post_finish() now takes err_msg as additional arg to reuse more code
- LsfLauncher:
- Renamed some vars / methods
- Removed the bsub output file existence check since it adds runtime
overhead and is not needed.
- Some more cosmetic changes
Signed-off-by: Srikrishna Iyer <sriyer@google.com>
diff --git a/util/dvsim/Launcher.py b/util/dvsim/Launcher.py
index cefcca4..063cc27 100644
--- a/util/dvsim/Launcher.py
+++ b/util/dvsim/Launcher.py
@@ -108,7 +108,7 @@
# create a new one.
self.renew_odir = False
- # Construct failure message if the test fails.
+ # Error message if the job fails.
self.fail_msg = "\n**{!r}:** {!r}<br>\n".format(
self.deploy.target.upper(), self.deploy.qual_name)
self.fail_msg += "**LOG:** {}<br>\n".format(self.deploy.get_log_path())
@@ -121,6 +121,28 @@
clean_odirs(odir=self.deploy.odir, max_odirs=self.max_odirs)
os.makedirs(self.deploy.odir, exist_ok=True)
+ def _link_odir(self, status):
+ """Soft-links the job's directory based on job's status.
+
+ The dispatched, passed and failed directories in the scratch area
+ provide a quick way to get to the job that was executed.
+ """
+
+ dest = Path(self.deploy.sim_cfg.links[status], self.deploy.qual_name)
+
+ # If dest exists, then atomically remove it and link the odir again.
+ while True:
+ try:
+ os.symlink(self.deploy.odir, dest)
+ break
+ except FileExistsError:
+ rm_path(dest)
+
+ # Delete the symlink from dispatched directory if it exists.
+ if status != "D":
+ old = Path(self.deploy.sim_cfg.links['D'], self.deploy.qual_name)
+ rm_path(old)
+
def _dump_env_vars(self, exports):
"""Write env vars to a file for ease of debug.
@@ -157,22 +179,10 @@
self._pre_launch()
self._do_launch()
- def _post_finish(self, status):
- """Do post-completion activities, such as preparing the results.
-
- Must be invoked by poll().
- """
-
- assert status in ['P', 'F', 'K']
- if status in ['P', 'F']:
- self._link_odir(status)
- self.deploy.post_finish(status)
- log.debug("Item %s has completed execution: %s", self, status)
-
def poll(self):
"""Poll the launched job for completion.
- Invokes _has_passed() and _post_finish() when the job completes.
+ Invokes _check_status() and _post_finish() when the job completes.
"""
raise NotImplementedError()
@@ -182,18 +192,13 @@
raise NotImplementedError()
- def _has_passed(self):
+ def _check_status(self):
"""Determine the outcome of the job (P/F if it ran to completion).
- Return True if the job passed, False otherwise. This is called by
- poll() just after the job finishes.
+ Returns (status, err_msg) extracted from the log, where the status is
+ "P" if the it passed, "F" otherwise. This is invoked by poll() just
+ after the job finishes.
"""
- def log_fail_msg(msg):
- """Logs the fail msg to the final report."""
-
- self.fail_msg += msg
- log.log(VERBOSE, msg)
-
def _find_patterns(patterns, line):
"""Helper function that returns the pattern if any of the given
patterns is found, else None."""
@@ -211,11 +216,12 @@
return ''.join(lines[pos:pos + num - 1]).strip()
if self.deploy.dry_run:
- return True
+ return "P", None
# Only one fail pattern needs to be seen.
failed = False
chk_failed = bool(self.deploy.fail_patterns)
+ err_msg = None
# All pass patterns need to be seen, so we replicate the list and remove
# patterns as we encounter them.
@@ -226,9 +232,9 @@
with open(self.deploy.get_log_path(), "r", encoding="UTF-8") as f:
lines = f.readlines()
except OSError as e:
- log_fail_msg("Error opening file {}:\n{}".format(
- self.deploy.get_log_path(), e))
- return False
+ err_msg = "Error opening file {}:\n{}".format(
+ self.deploy.get_log_path(), e)
+ return "F", err_msg
if chk_failed or chk_passed:
for cnt, line in enumerate(lines):
@@ -236,8 +242,7 @@
if _find_patterns(self.deploy.fail_patterns,
line) is not None:
# Print 4 additional lines to help debug more easily.
- log_fail_msg("```\n{}\n```\n".format(
- _get_n_lines(cnt, 5)))
+ err_msg = "```\n{}\n```\n".format(_get_n_lines(cnt, 5))
failed = True
chk_failed = False
chk_passed = False
@@ -250,45 +255,49 @@
# If failed, then nothing else to do. Just return.
if failed:
- return False
+ assert err_msg is not None
+ return "F", err_msg
# If no fail patterns were seen, but the job returned with non-zero
# exit code for whatever reason, then show the last 10 lines of the log
# as the failure message, which might help with the debug.
if self.exit_code != 0:
- msg = ''.join(lines[-10:]).strip()
- log_fail_msg("Job returned non-zero exit code. "
- "Last 10 lines:\n```\n{}\n```\n".format(msg))
- return False
+ err_msg = "Job returned non-zero exit code:\nLast 10 lines:\n"
+ "```\n{}\n```\n"
+ err_msg = err_msg.format("".join(lines[-10:]).strip())
+ return "F", err_msg
# Ensure all pass patterns were seen.
if chk_passed:
- msg = ''.join(lines[-10:]).strip()
- log_fail_msg("One or more pass patterns not found:\n{}\n"
- "Last 10 lines:\n```\n{}\n```\n".format(
- pass_patterns, msg))
- return False
+ err_msg = "Some pass patterns missing:\n{}\nLast 10 lines:\n"
+ "```\n{}\n```\n"
+ err_msg = err_msg.format(pass_patterns,
+ "".join(lines[-10:]).strip())
+ return "F", err_msg
- return True
+ assert err_msg is None
+ return "P", None
- def _link_odir(self, status):
- """Soft-links the job's directory based on job's status.
+ def _post_finish(self, status, err_msg):
+ """Do post-completion activities, such as preparing the results.
- The dispatched, passed and failed directories in the scratch area
- provide a quick way to get to the job that was executed.
+ Must be invoked by poll(), after the job outcome is determined.
"""
- dest = Path(self.deploy.sim_cfg.links[status], self.deploy.qual_name)
+ assert status in ['P', 'F', 'K']
+ if status in ['P', 'F']:
+ self._link_odir(status)
+ self.deploy.post_finish(status)
+ log.debug("Item %s has completed execution: %s", self, status)
+ if status != "P":
+ self._log_fail_msg(err_msg)
- # If dest exists, then atomically remove it and link the odir again.
- while True:
- try:
- os.symlink(self.deploy.odir, dest)
- break
- except FileExistsError:
- rm_path(dest)
+ def _log_fail_msg(self, msg):
+ """Logs the fail msg for the final report.
- # Delete the symlink from dispatched directory if it exists.
- if status != "D":
- old = Path(self.deploy.sim_cfg.links['D'], self.deploy.qual_name)
- rm_path(old)
+ Invoked in _post_finish() only if the job did not pass.
+ """
+
+ assert msg is not None
+ self.fail_msg += msg
+ log.log(VERBOSE, msg)
diff --git a/util/dvsim/LocalLauncher.py b/util/dvsim/LocalLauncher.py
index c93e966..bf6c8f5 100644
--- a/util/dvsim/LocalLauncher.py
+++ b/util/dvsim/LocalLauncher.py
@@ -79,16 +79,10 @@
return 'D'
self.exit_code = self.process.returncode
- status = 'P' if self._has_passed() else 'F'
-
- self._post_finish(status)
+ status, err_msg = self._check_status()
+ self._post_finish(status, err_msg)
return status
- def _post_finish(self, status):
- super()._post_finish(status)
- self._close_process()
- self.process = None
-
def kill(self):
'''Kill the running process.
@@ -106,7 +100,12 @@
except subprocess.TimeoutExpired:
self.process.kill()
- self._post_finish('K')
+ self._post_finish('K', 'Job killed!')
+
+ def _post_finish(self, status, err_msg):
+ super()._post_finish(status, err_msg)
+ self._close_process()
+ self.process = None
def _close_process(self):
'''Close the file descriptors associated with the process.'''
diff --git a/util/dvsim/LsfLauncher.py b/util/dvsim/LsfLauncher.py
index 1a65fa1..51c1f64 100644
--- a/util/dvsim/LsfLauncher.py
+++ b/util/dvsim/LsfLauncher.py
@@ -5,7 +5,6 @@
import logging as log
import os
import re
-import shlex
import subprocess
import tarfile
from pathlib import Path
@@ -33,42 +32,8 @@
# TODO: Allow site-specific job resource usage setting using
# `DVSIM_LSF_CFG` environment variable.
- def __init__(self, deploy):
- super().__init__(deploy)
-
- # Set the status. Only update after the job is done - i.e. status will
- # transition from None to P/F/K.
- self.status = None
-
- # Maintain the job script output as an instance variables for polling
- # and cleanup.
- self.job_script_out = None
-
- # If we already opened the job script output file (but have not
- # determined the outcome), then we maintain the file descriptor rather
- # then reopening it and starting all over again on the next poll.
- self.job_script_out_fd = None
- self.job_script_out_err_msg = []
- self.job_script_out_err_msg_found = False
-
- # Set the job id.
- self.job_id = None
-
- # Polling retry counter..
- self.num_poll_retries = 0
-
- # Add self to the list of jobs.
- cfg_dict = LsfLauncher.jobs.setdefault(deploy.sim_cfg, {})
- job_name_list = cfg_dict.setdefault(deploy.job_name, [])
- job_name_list.append(self)
-
- # Job's index in the array.
- self.index = len(job_name_list)
-
@staticmethod
def prepare_workspace(project, repo_top, args):
- '''Overrides Launcher.prepare_workspace.'''
-
# Since we dispatch to remote machines, a project specific python
# virtualenv is exists, needs to be activated when launching the job.
Launcher.set_python_venv(project)
@@ -78,21 +43,15 @@
# Python_venv needs to be a valid tarfile. Extract it in the scratch
# area if it does not exist. It is upto the user to delete it if it is
# stale.
- if tarfile.is_tarfile(Launcher.python_venv):
- path = Path(args.scratch_root, Path(Launcher.python_venv).stem)
- if not path.is_dir():
- with tarfile.open(Launcher.python_venv, mode='r') as tar:
- tar.extractall(path=args.scratch_root)
- Launcher.python_venv = path
-
- else:
- raise LauncherError("{} is not a valid tar file".format(
- Launcher.python_venv))
+ stem = Path(Launcher.python_venv).stem.split('.')[0]
+ path = Path(args.scratch_root, stem)
+ if not path.is_dir():
+ with tarfile.open(Launcher.python_venv, mode='r') as tar:
+ tar.extractall(path=args.scratch_root)
+ Launcher.python_venv = path
@staticmethod
def prepare_workspace_for_cfg(cfg):
- '''Overrides Launcher.prepare_workspace_for_cfg.'''
-
# Create the job dir.
LsfLauncher.jobs_dir[cfg] = Path(cfg.scratch_path, "lsf",
cfg.timestamp)
@@ -100,19 +59,19 @@
os.makedirs(Path(LsfLauncher.jobs_dir[cfg]), exist_ok=True)
@staticmethod
- def make_job_array_script_text(cfg, job_name):
- """Creates the job array script text.
+ def make_job_script(cfg, job_name):
+ """Creates the job script.
- Once all jobs in the array are dispatched, the job array script is
- constructed. It is a bash script that takes the job index as a single
- argument. This index is set in the bsub command as '$LSB_JOBINDEX',
- which bsub sets as the actual index when launching that job in the
- array. This script is super simple - it is just a giant case statement
- that switches on the job index to run that specific job. This preferred
- over creating individual scripts for each job which incurs additional
- file I/O overhead when the scratch area is on NFS, causing a slowdown.
+ Once all jobs in the array are launched, the job script can be created.
+ It is a bash script that takes the job index as a single argument.
+ This index is set in the bsub command as '$LSB_JOBINDEX', which bsub
+ sets as the actual index when launching that job in the array. This
+ script is super simple - it is just a giant case statement that
+ switches on the job index to run that specific job. This preferred over
+ creating individual scripts for each job which incurs additional file
+ I/O overhead when the scratch area is on NFS, causing a slowdown.
- Returns an iterable representing the lines of the script.
+ Returns the path to the job script.
"""
lines = ["#!/usr/bin/env bash\nset -e\n"]
@@ -136,11 +95,52 @@
]
if Launcher.python_venv:
lines += ["deactivate\n"]
- return lines
- def launch(self):
- self._pre_launch()
+ job_script = Path(LsfLauncher.jobs_dir[cfg], job_name)
+ try:
+ with open(job_script, "w", encoding='utf-8') as f:
+ f.writelines(lines)
+ except IOError as e:
+ err_msg = "ERROR: Failed to write {}:\n{}".format(job_script, e)
+ LsfLauncher._post_finish_job_array(cfg, job_name, err_msg)
+ raise LauncherError(err_msg)
+ log.log(VERBOSE, "[job_script]: %s", job_script)
+ return job_script
+
+ def __init__(self, deploy):
+ super().__init__(deploy)
+
+ # Set the status. Only update after the job is done - i.e. status will
+ # transition from None to P/F/K.
+ self.status = None
+
+ # Maintain the job script output as an instance variable for polling
+ # and cleanup.
+ self.bsub_out = None
+
+ # If we already opened the job script output file (but have not
+ # determined the outcome), then we maintain the file descriptor rather
+ # then reopening it and starting all over again on the next poll.
+ self.bsub_out_fd = None
+ self.bsub_out_err_msg = []
+ self.bsub_out_err_msg_found = False
+
+ # Set the job id.
+ self.job_id = None
+
+ # Polling retry counter..
+ self.num_poll_retries = 0
+
+ # Add self to the list of jobs.
+ cfg_dict = LsfLauncher.jobs.setdefault(deploy.sim_cfg, {})
+ job_name_list = cfg_dict.setdefault(deploy.job_name, [])
+ job_name_list.append(self)
+
+ # Job's index in the array.
+ self.index = len(job_name_list)
+
+ def _do_launch(self):
# Add self to the list of jobs.
job_name = self.deploy.job_name
cfg = self.deploy.sim_cfg
@@ -151,16 +151,7 @@
if self.index < job_total:
return
- # Write the job array script.
- job_script_wo_idx = Path(LsfLauncher.jobs_dir[cfg], job_name)
- try:
- with open(job_script_wo_idx, "w", encoding='utf-8') as f:
- f.writelines(self.make_job_array_script_text(cfg, job_name))
- except IOError as e:
- err_msg = "ERROR: Failed to write job script {}:\n{}".format(
- job_script_wo_idx, e)
- self._kill_job_array(err_msg)
- raise LauncherError(err_msg)
+ job_script = self.make_job_script(cfg, job_name)
# Update the shell's env vars with self.exports. Values in exports must
# replace the values in the shell's env vars if the keys match.
@@ -202,22 +193,19 @@
"-J",
job_array,
"-oo",
- "{}.%I.out".format(job_script_wo_idx),
+ "{}.%I.out".format(job_script),
"-eo",
- "{}.%I.out".format(job_script_wo_idx)
+ "{}.%I.out".format(job_script)
]
if job_rusage:
cmd += ["-R", job_rusage]
- cmd.append(
- shlex.quote(
- "/usr/bin/bash {} $LSB_JOBINDEX".format(job_script_wo_idx)))
+ cmd += ["/usr/bin/bash {} $LSB_JOBINDEX".format(job_script)]
try:
- p = subprocess.run(' '.join(cmd),
+ p = subprocess.run(cmd,
check=True,
- shell=True,
timeout=60,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@@ -225,69 +213,51 @@
except subprocess.CalledProcessError as e:
# Need to mark all jobs in this range with this fail pattern.
err_msg = e.stderr.decode("utf-8").strip()
- self._kill_job_array(err_msg)
+ self._post_finish_job_array(cfg, job_name, err_msg)
raise LauncherError(err_msg)
# Extract the job ID.
result = p.stdout.decode("utf-8").strip()
job_id = result.split('Job <')[1].split('>')[0]
if not job_id:
- self._kill_job_array("Job ID not found!")
+ self._post_finish_job_array(cfg, job_name, "Job ID not found!")
raise LauncherError(err_msg)
for job in LsfLauncher.jobs[cfg][job_name]:
- job.job_script_out = Path("{}.{}.out".format(
- job_script_wo_idx, job.index))
+ job.bsub_out = Path("{}.{}.out".format(job_script, job.index))
job.job_id = "{}[{}]".format(job_id, job.index)
job._link_odir("D")
- def _finish(self, status, err_msg=None):
- '''Helper function that sets the status, exit code and err msg.'''
-
- assert status in ['P', 'F', 'K']
- if self.job_script_out_fd:
- self.job_script_out_fd.close()
- self.status = status
- if self.exit_code is None:
- self.exit_code = 0 if status == 'P' else 1
- if err_msg:
- self.fail_msg += err_msg
- log.log(VERBOSE, err_msg)
- self._post_finish(status)
- return status
-
def poll(self):
# It is possible we may have determined the status already.
if self.status:
return self.status
- if not self.job_script_out_fd:
+ if not self.bsub_out_fd:
# If job id is not set, the bsub command has not been sent yet.
if not self.job_id:
return 'D'
- # If the bsub output file is not created, we are still in
- # dispatched state.
- if not self.job_script_out.is_file():
- return "D"
-
# We redirect the job's output to the log file, so the job script
# output remains empty until the point it finishes. This is a very
# quick way to check if the job has completed. If nothing has been
- # written to the job script output yet, then the job is still
- # running.
- if not self.job_script_out.stat().st_size:
+ # written to the job script output yet (or if it is not yet
+ # created), then the job is still running.
+ try:
+ if not self.bsub_out.stat().st_size:
+ return "D"
+ except FileNotFoundError:
return "D"
# If we got to this point, we can now open the job script output
# file for reading.
try:
- self.job_script_out_fd = open(self.job_script_out, "r")
+ self.bsub_out_fd = open(self.bsub_out, "r")
except IOError as e:
- return self._finish(
- status="F",
- err_msg="ERROR: Failed to open {}\n{}.".format(
- self.job_script_out, e))
+ self._post_finish(
+ "F",
+ "ERROR: Failed to open {}\n{}.".format(self.bsub_out, e))
+ return "F"
# Now that the job has completed, we need to determine its status.
#
@@ -318,18 +288,23 @@
exit_code = self._get_job_exit_code()
if exit_code is not None:
self.exit_code = exit_code
- status = "F" if exit_code else "P" if self._has_passed() else "F"
- return self._finish(status=status)
+ status, err_msg = self._check_status()
+ # Prioritize error messages from bsub over the job's log file.
+ if self.bsub_out_err_msg:
+ err_msg = self.bsub_out_err_msg
+ self._post_finish(status, err_msg)
+ return status
else:
self.num_poll_retries += 1
# Fail the test if we have reached the max polling retries.
if self.num_poll_retries == LsfLauncher.max_poll_retries:
- return self._finish(status="F",
- err_msg="ERROR: Reached max retries while "
- "reading job script output {} to determine"
- " the outcome.".format(
- self.job_script_out))
+ self._post_finish(
+ "F", "ERROR: Reached max retries while "
+ "reading job script output {} to determine"
+ " the outcome.".format(self.bsub_out))
+ return "F"
+
return "D"
def _get_job_exit_code(self):
@@ -359,37 +334,30 @@
'''
# Job script output must have been opened already.
- assert self.job_script_out_fd
+ assert self.bsub_out_fd
- for line in self.job_script_out_fd:
- if not self.job_script_out_err_msg_found:
+ for line in self.bsub_out_fd:
+ if not self.bsub_out_err_msg_found:
m = re.match("^Sender", line)
if m:
- self.job_script_out_err_msg = "".join(
- self.job_script_out_err_msg[:-1]).strip()
- self.job_script_out_err_msg_found = True
+ # Pop the line before the sender line.
+ self.bsub_out_err_msg = "\n".join(
+ self.bsub_out_err_msg[:-1])
+ self.bsub_out_err_msg_found = True
else:
- self.job_script_out_err_msg.append(line)
+ self.bsub_out_err_msg.append(line.strip())
else:
m = re.match(r"^Exited with exit code (\d+).\n$", line)
if m:
- self.fail_msg += self.job_script_out_err_msg
return m.group(1)
- if not self.job_script_out_err_msg:
+ if not self.bsub_out_err_msg:
m = re.match(r"^Successfully completed.\n$", line)
if m:
return 0
return None
- def _kill_job_array(self, err_msg):
- '''If there is an LSF error, then kill all jobs in the array this job
- belongs to.'''
-
- for job in LsfLauncher.jobs[self.deploy.sim_cfg][self.deploy.job_name]:
- job._finish("K", err_msg)
-
def kill(self):
if self.job_id:
try:
@@ -401,6 +369,23 @@
log.error("Failed to kill job: {}".format(
e.stderr.decode("utf-8").strip()))
else:
- log.error("Job ID for %s not found", self.name)
+ log.error("Job ID for %s not found", self.deploy.full_name)
- self._post_finish('K')
+ self._post_finish('K', "Job killed!")
+
+ def _post_finish(self, status, err_msg):
+ if self.bsub_out_fd:
+ self.bsub_out_fd.close()
+ self.status = status
+ if self.exit_code is None:
+ self.exit_code = 0 if status == 'P' else 1
+ super()._post_finish(status, err_msg)
+
+ @staticmethod
+ def _post_finish_job_array(cfg, job_name, err_msg):
+ '''On LSF error, mark all jobs in this array as killed.
+
+ err_msg is the error message indicating the cause of failure.'''
+
+ for job in LsfLauncher.jobs[cfg][job_name]:
+ job._post_finish("K", err_msg)