blob: cefcca490b0aa7bdcb536ec7ed159648da129cd1 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import logging as log
import os
import re
from pathlib import Path
from utils import VERBOSE, clean_odirs, rm_path
class LauncherError(Exception):
def __init__(self, msg):
self.msg = msg
class Launcher:
"""
Abstraction for launching and maintaining a job.
An abstract class that provides methods to prepare a job's environment,
launch the job, poll for its completion and finally do some cleanup
activities. This class is not meant to be instantiated directly. Each
launcher object holds an instance of the deploy object.
"""
# Points to the python virtual env area.
python_venv = None
# If a history of previous invocations is to be maintained, then keep no
# more than this many directories.
max_odirs = 5
# Flag indicating the workspace preparation steps are complete.
workspace_prepared = False
workspace_prepared_for_cfg = set()
@staticmethod
def set_python_venv(project):
'''Activate a python virtualenv if available.
The env variable <PROJECT>_PYTHON_VENV if set, points to the path
containing the python virtualenv created specifically for this
project. We can activate it if needed, before launching jobs using
external compute machines.
This is not applicable when running jobs locally on the user's machine.
'''
if Launcher.python_venv is not None:
return
# If project-specific python virtualenv path is set, then activate it
# before running downstream tools. This is more relevant when not
# launching locally, but on external machines in a compute farm, which
# may not have access to the default python installation area on the
# host machine.
Launcher.python_venv = os.environ.get("{}_PYTHON_VENV".format(
project.upper()))
@staticmethod
def prepare_workspace(project, repo_top, args):
'''Prepare the workspace based on the chosen launcher's needs.
This is done once for the entire duration for the flow run.
'project' is the name of the project.
'repo_top' is the path to the repository.
'args' are the command line args passed to dvsim.
'''
pass
@staticmethod
def prepare_workspace_for_cfg(cfg):
'''Prepare the workspace for a cfg.
This is invoked once for each cfg.
'cfg' is the flow configuration object.
'''
pass
def __str__(self):
return self.deploy.full_name + ":launcher"
def __init__(self, deploy):
cfg = deploy.sim_cfg
# One-time preparation of the workspace.
if not Launcher.workspace_prepared:
self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args)
Launcher.workspace_prepared = True
# One-time preparation of the workspace, specific to the cfg.
if cfg not in Launcher.workspace_prepared_for_cfg:
self.prepare_workspace_for_cfg(cfg)
Launcher.workspace_prepared_for_cfg.add(cfg)
# Store the deploy object handle.
self.deploy = deploy
# Return status of the process running the job.
self.exit_code = None
# Flag to indicate whether to 'overwrite' if odir already exists,
# or to backup the existing one and create a new one.
# For builds, we want to overwrite existing to leverage the tools'
# incremental / partition compile features. For runs, we may want to
# create a new one.
self.renew_odir = False
# Construct failure message if the test fails.
self.fail_msg = "\n**{!r}:** {!r}<br>\n".format(
self.deploy.target.upper(), self.deploy.qual_name)
self.fail_msg += "**LOG:** {}<br>\n".format(self.deploy.get_log_path())
def _make_odir(self):
"""Create the output directory."""
# If renew_odir flag is True - then move it.
if self.renew_odir:
clean_odirs(odir=self.deploy.odir, max_odirs=self.max_odirs)
os.makedirs(self.deploy.odir, exist_ok=True)
def _dump_env_vars(self, exports):
"""Write env vars to a file for ease of debug.
Each extended class computes the list of exports and invokes this
method right before launching the job.
"""
with open(self.deploy.odir + "/env_vars",
"w",
encoding="UTF-8",
errors="surrogateescape") as f:
for var in sorted(exports.keys()):
f.write("{}={}\n".format(var, exports[var]))
def _pre_launch(self):
"""Do pre-launch activities.
Examples include such as preparing the job's environment, clearing
old runs, creating the output directory, dumping all env variables
etc. This method is already invoked by launch() as the first step.
"""
self.deploy.pre_launch()
self._make_odir()
def _do_launch(self):
"""Launch the job."""
raise NotImplementedError()
def launch(self):
"""Launch the job."""
self._pre_launch()
self._do_launch()
def _post_finish(self, status):
"""Do post-completion activities, such as preparing the results.
Must be invoked by poll().
"""
assert status in ['P', 'F', 'K']
if status in ['P', 'F']:
self._link_odir(status)
self.deploy.post_finish(status)
log.debug("Item %s has completed execution: %s", self, status)
def poll(self):
"""Poll the launched job for completion.
Invokes _has_passed() and _post_finish() when the job completes.
"""
raise NotImplementedError()
def kill(self):
"""Terminate the job."""
raise NotImplementedError()
def _has_passed(self):
"""Determine the outcome of the job (P/F if it ran to completion).
Return True if the job passed, False otherwise. This is called by
poll() just after the job finishes.
"""
def log_fail_msg(msg):
"""Logs the fail msg to the final report."""
self.fail_msg += msg
log.log(VERBOSE, msg)
def _find_patterns(patterns, line):
"""Helper function that returns the pattern if any of the given
patterns is found, else None."""
assert patterns
for pattern in patterns:
match = re.search(r"{}".format(pattern), line)
if match:
return pattern
return None
def _get_n_lines(pos, num):
"Helper function that returns next N lines starting at pos index."
return ''.join(lines[pos:pos + num - 1]).strip()
if self.deploy.dry_run:
return True
# Only one fail pattern needs to be seen.
failed = False
chk_failed = bool(self.deploy.fail_patterns)
# All pass patterns need to be seen, so we replicate the list and remove
# patterns as we encounter them.
pass_patterns = self.deploy.pass_patterns.copy()
chk_passed = bool(pass_patterns) and (self.exit_code == 0)
try:
with open(self.deploy.get_log_path(), "r", encoding="UTF-8") as f:
lines = f.readlines()
except OSError as e:
log_fail_msg("Error opening file {}:\n{}".format(
self.deploy.get_log_path(), e))
return False
if chk_failed or chk_passed:
for cnt, line in enumerate(lines):
if chk_failed:
if _find_patterns(self.deploy.fail_patterns,
line) is not None:
# Print 4 additional lines to help debug more easily.
log_fail_msg("```\n{}\n```\n".format(
_get_n_lines(cnt, 5)))
failed = True
chk_failed = False
chk_passed = False
if chk_passed:
pattern = _find_patterns(pass_patterns, line)
if pattern is not None:
pass_patterns.remove(pattern)
chk_passed = bool(pass_patterns)
# If failed, then nothing else to do. Just return.
if failed:
return False
# If no fail patterns were seen, but the job returned with non-zero
# exit code for whatever reason, then show the last 10 lines of the log
# as the failure message, which might help with the debug.
if self.exit_code != 0:
msg = ''.join(lines[-10:]).strip()
log_fail_msg("Job returned non-zero exit code. "
"Last 10 lines:\n```\n{}\n```\n".format(msg))
return False
# Ensure all pass patterns were seen.
if chk_passed:
msg = ''.join(lines[-10:]).strip()
log_fail_msg("One or more pass patterns not found:\n{}\n"
"Last 10 lines:\n```\n{}\n```\n".format(
pass_patterns, msg))
return False
return True
def _link_odir(self, status):
"""Soft-links the job's directory based on job's status.
The dispatched, passed and failed directories in the scratch area
provide a quick way to get to the job that was executed.
"""
dest = Path(self.deploy.sim_cfg.links[status], self.deploy.qual_name)
# If dest exists, then atomically remove it and link the odir again.
while True:
try:
os.symlink(self.deploy.odir, dest)
break
except FileExistsError:
rm_path(dest)
# Delete the symlink from dispatched directory if it exists.
if status != "D":
old = Path(self.deploy.sim_cfg.links['D'], self.deploy.qual_name)
rm_path(old)