blob: b0dde6ac76ddc90237eefd2984a2836d46458cfc [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import logging as log
import os
import pprint
import random
import re
import shlex
import subprocess
import sys
import time
from collections import OrderedDict
from tabulate import tabulate
from sim_utils import get_cov_summary_table
from utils import VERBOSE, find_and_substitute_wildcards, run_cmd
class Deploy():
"""
Abstraction for deploying builds and runs.
"""
# Timer in hours, minutes and seconds.
hh = 0
mm = 0
ss = 0
# Maintain a list of dispatched items.
dispatch_counter = 0
# Misc common deploy settings.
print_legend = True
print_interval = 5
max_parallel = 16
max_odirs = 5
# Max jobs dispatched in one go.
slot_limit = 20
def __self_str__(self):
if log.getLogger().isEnabledFor(VERBOSE):
return pprint.pformat(self.__dict__)
else:
ret = self.cmd
if self.sub != []:
ret += "\nSub:\n" + str(self.sub)
return ret
def __str__(self):
return self.__self_str__()
def __repr__(self):
return self.__self_str__()
def __init__(self, sim_cfg):
# Cross ref the whole cfg object for ease.
self.sim_cfg = sim_cfg
# Common vars
self.identifier = ""
self.cmd = ""
self.odir = ""
self.log = ""
self.fail_msg = ""
# Flag to indicate whether to 'overwrite' if odir already exists,
# or to backup the existing one and create a new one.
# For builds, we want to overwrite existing to leverage the tools'
# incremental / partition compile features. For runs, we may want to
# create a new one.
self.renew_odir = False
# List of vars required to be exported to sub-shell
self.exports = {}
# Deploy sub commands
self.sub = []
# Process
self.process = None
self.log_fd = None
self.status = None
# These are command, output directory and log file
self.mandatory_misc_attrs.update({
"name": False,
"build_mode": False,
"flow_makefile": False,
"exports": False,
"dry_run": False
})
# Function to parse a dict and extract the mandatory cmd and misc attrs.
def parse_dict(self, ddict):
if not hasattr(self, "target"):
log.error(
"Class %s does not have the mandatory attribute \"target\" defined",
self.__class__.__name__)
sys.exit(1)
ddict_keys = ddict.keys()
for key in self.mandatory_cmd_attrs.keys():
if self.mandatory_cmd_attrs[key] is False:
if key in ddict_keys:
setattr(self, key, ddict[key])
self.mandatory_cmd_attrs[key] = True
for key in self.mandatory_misc_attrs.keys():
if self.mandatory_misc_attrs[key] is False:
if key in ddict_keys:
setattr(self, key, ddict[key])
self.mandatory_misc_attrs[key] = True
def __post_init__(self):
# Ensure all mandatory attrs are set
for attr in self.mandatory_cmd_attrs.keys():
if self.mandatory_cmd_attrs[attr] is False:
log.error("Attribute \"%s\" not found for \"%s\".", attr,
self.name)
sys.exit(1)
for attr in self.mandatory_misc_attrs.keys():
if self.mandatory_misc_attrs[attr] is False:
log.error("Attribute \"%s\" not found for \"%s\".", attr,
self.name)
sys.exit(1)
# Recursively search and replace wildcards
self.__dict__ = find_and_substitute_wildcards(self.__dict__,
self.__dict__)
# Set identifier.
self.identifier = self.sim_cfg.name + ":" + self.name
# Set the command, output dir and log
self.odir = getattr(self, self.target + "_dir")
# Set the output dir link name to the basename of odir (by default)
self.odir_ln = os.path.basename(os.path.normpath(self.odir))
self.log = self.odir + "/" + self.target + ".log"
# If using LSF, redirect stdout and err to the log file
self.cmd = self.construct_cmd()
def construct_cmd(self):
cmd = "make -f " + self.flow_makefile + " " + self.target
if self.dry_run is True:
cmd += " -n"
for attr in self.mandatory_cmd_attrs.keys():
value = getattr(self, attr)
if type(value) is list:
pretty_value = []
for item in value:
pretty_value.append(item.strip())
value = " ".join(pretty_value)
if type(value) is bool:
value = int(value)
if type(value) is str:
value = value.strip()
cmd += " " + attr + "=\"" + str(value) + "\""
# TODO: If not running locally, redirect stdout and err to the log file
# self.cmd += " > " + self.log + " 2>&1 &"
return cmd
def dispatch_cmd(self):
self.exports.update(os.environ)
args = shlex.split(self.cmd)
try:
# If renew_odir flag is True - then move it.
if self.renew_odir:
self.odir_limiter(odir=self.odir)
os.system("mkdir -p " + self.odir)
# Dump all env variables for ease of debug.
with open(self.odir + "/env_vars",
"w",
encoding="UTF-8",
errors="surrogateescape") as f:
for var in sorted(self.exports.keys()):
f.write("{}={}\n".format(var, self.exports[var]))
f.close()
os.system("ln -s " + self.odir + " " + self.sim_cfg.links['D'] +
'/' + self.odir_ln)
f = open(self.log, "w", encoding="UTF-8", errors="surrogateescape")
f.write("[Executing]:\n{}\n\n".format(self.cmd))
f.flush()
self.process = subprocess.Popen(args,
bufsize=4096,
universal_newlines=True,
stdout=f,
stderr=f,
env=self.exports)
self.log_fd = f
self.status = "D"
Deploy.dispatch_counter += 1
except IOError:
log.error('IO Error: See %s', self.log)
if self.log_fd:
self.log_fd.close()
self.status = "K"
def odir_limiter(self, odir, max_odirs=-1):
'''Function to backup previously run output directory to maintain a
history of a limited number of output directories. It deletes the output
directory with the oldest timestamps, if the limit is reached. It returns
a list of directories that remain after deletion.
Arguments:
odir: The output directory to backup
max_odirs: Maximum output directories to maintain as history.
Returns:
dirs: Space-separated list of directories that remain after deletion.
'''
try:
# If output directory exists, back it up.
if os.path.exists(odir):
ts = run_cmd("date '+" + self.sim_cfg.ts_format + "' -d \"" +
"$(stat -c '%y' " + odir + ")\"")
os.system('mv ' + odir + " " + odir + "_" + ts)
except IOError:
log.error('Failed to back up existing output directory %s', odir)
dirs = ""
# Delete older directories.
try:
pdir = os.path.realpath(odir + "/..")
# Fatal out if pdir got set to root.
if pdir == "/":
log.fatal(
"Something went wrong while processing \"%s\": odir = \"%s\"",
self.name, odir)
sys.exit(1)
if os.path.exists(pdir):
find_cmd = "find " + pdir + " -mindepth 1 -maxdepth 1 -type d "
dirs = run_cmd(find_cmd)
dirs = dirs.replace('\n', ' ')
list_dirs = dirs.split()
num_dirs = len(list_dirs)
if max_odirs == -1:
max_odirs = self.max_odirs
num_rm_dirs = num_dirs - max_odirs
if num_rm_dirs > -1:
rm_dirs = run_cmd(find_cmd +
"-printf '%T+ %p\n' | sort | head -n " +
str(num_rm_dirs + 1) +
" | awk '{print $2}'")
rm_dirs = rm_dirs.replace('\n', ' ')
dirs = dirs.replace(rm_dirs, "")
os.system("/bin/rm -rf " + rm_dirs)
except IOError:
log.error("Failed to delete old run directories!")
return dirs
def set_status(self):
self.status = 'P'
if self.dry_run is False:
seen_fail_pattern = False
for fail_pattern in self.fail_patterns:
# Return error message with the following 4 lines.
grep_cmd = "grep -m 1 -A 4 -E \'" + fail_pattern + "\' " + self.log
(status, rslt) = subprocess.getstatusoutput(grep_cmd)
if rslt:
msg = "```\n{}\n```\n".format(rslt)
self.fail_msg += msg
log.log(VERBOSE, msg)
self.status = 'F'
seen_fail_pattern = True
break
# If fail patterns were not encountered, but the job returned with non-zero exit code
# for whatever reason, then show the last 10 lines of the log as the failure message,
# which might help with the debug.
if self.process.returncode != 0 and not seen_fail_pattern:
msg = "Last 10 lines of the log:<br>\n"
self.fail_msg += msg
log.log(VERBOSE, msg)
get_fail_msg_cmd = "tail -n 10 " + self.log
msg = run_cmd(get_fail_msg_cmd)
msg = "```\n{}\n```\n".format(msg)
self.fail_msg += msg
log.log(VERBOSE, msg)
self.status = "F"
# Return if status is fail - no need to look for pass patterns.
if self.status == 'F':
return
# If fail patterns were not found, ensure pass patterns indeed were.
for pass_pattern in self.pass_patterns:
grep_cmd = "grep -c -m 1 -E \'" + pass_pattern + "\' " + self.log
(status, rslt) = subprocess.getstatusoutput(grep_cmd)
if rslt == "0":
msg = "Pass pattern \"{}\" not found.<br>\n".format(
pass_pattern)
self.fail_msg += msg
log.log(VERBOSE, msg)
self.status = 'F'
break
# Recursively set sub-item's status if parent item fails
def set_sub_status(self, status):
for sub_item in self.sub:
sub_item.status = status
sub_item.set_sub_status(status)
def link_odir(self):
if self.status == '.':
log.error("Method unexpectedly called!")
else:
old_link = self.sim_cfg.links['D'] + "/" + self.odir_ln
new_link = self.sim_cfg.links[self.status] + "/" + self.odir_ln
cmd = "ln -s " + self.odir + " " + new_link + "; "
cmd += "rm " + old_link
if os.system(cmd):
log.error("Cmd \"%s\" could not be run", cmd)
def get_status(self):
if self.status != "D":
return
if self.process.poll() is not None:
self.log_fd.close()
self.set_status()
log.debug("Item %s has completed execution: %s", self.name,
self.status)
Deploy.dispatch_counter -= 1
self.link_odir()
del self.process
def kill(self):
'''Kill running processes.
'''
if self.status == "D" and self.process.poll() is None:
self.kill_remote_job()
self.process.kill()
if self.log_fd:
self.log_fd.close()
self.status = "K"
# recurisvely kill sub target
elif len(self.sub):
for item in self.sub:
item.kill()
def kill_remote_job(self):
'''
If jobs are run in remote server, need to use another command to kill them.
'''
# TODO: Currently only support lsf, may need to add support for GCP later.
# If use lsf, kill it by job ID.
if re.match("^bsub", self.sim_cfg.job_prefix):
# get job id from below string
# Job <xxxxxx> is submitted to default queue
grep_cmd = "grep -m 1 -E \'" + "^Job <" + "\' " + self.log
(status, rslt) = subprocess.getstatusoutput(grep_cmd)
if rslt != "":
job_id = rslt.split('Job <')[1].split('>')[0]
try:
subprocess.run(["bkill", job_id], check=True)
except Exception as e:
log.error("%s: Failed to run bkill\n", e)
@staticmethod
def increment_timer():
# sub function that increments with overflow = 60
def _incr_ovf_60(val):
if val >= 59:
val = 0
return val, True
else:
val += 1
return val, False
incr_hh = False
Deploy.ss, incr_mm = _incr_ovf_60(Deploy.ss)
if incr_mm:
Deploy.mm, incr_hh = _incr_ovf_60(Deploy.mm)
if incr_hh:
Deploy.hh += 1
@staticmethod
def deploy(items):
dispatched_items = []
queued_items = []
# Print timer val in hh:mm:ss.
def get_timer_val():
return "%02i:%02i:%02i" % (Deploy.hh, Deploy.mm, Deploy.ss)
# Check if elapsed time has reached the next print interval.
def has_print_interval_reached():
# Deploy.print_interval is expected to be < 1 hour.
return (((Deploy.mm * 60 + Deploy.ss) %
Deploy.print_interval) == 0)
def dispatch_items(items):
item_names = OrderedDict()
for item in items:
if item.target not in item_names.keys():
item_names[item.target] = ""
if item.status is None:
item_names[item.target] += item.identifier + ", "
item.dispatch_cmd()
for target in item_names.keys():
if item_names[target] != "":
item_names[target] = " [" + item_names[target][:-2] + "]"
log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s",
get_timer_val(), target, item_names[target])
# Initialize status for a target, add '_stats_' for the said target
# and initialize counters for queued, dispatched, passed, failed,
# killed and total to 0. Also adds a boolean key to indicate if all
# items in a given target are done.
def init_status_target_stats(status, target):
status[target] = OrderedDict()
status[target]['_stats_'] = OrderedDict()
status[target]['_stats_']['Q'] = 0
status[target]['_stats_']['D'] = 0
status[target]['_stats_']['P'] = 0
status[target]['_stats_']['F'] = 0
status[target]['_stats_']['K'] = 0
status[target]['_stats_']['T'] = 0
status[target]['_done_'] = False
# Update status counter for a newly queued item.
def add_status_target_queued(status, item):
if item.target not in status.keys():
init_status_target_stats(status, item.target)
status[item.target][item] = "Q"
status[item.target]['_stats_']['Q'] += 1
status[item.target]['_stats_']['T'] += 1
# Update status counters for a target.
def update_status_target_stats(status, item):
old_status = status[item.target][item]
status[item.target]['_stats_'][old_status] -= 1
status[item.target]['_stats_'][item.status] += 1
status[item.target][item] = item.status
def check_if_done_and_print_status(status, print_status_flag):
all_done = True
for target in status.keys():
target_done_prev = status[target]['_done_']
target_done_curr = ((status[target]['_stats_']["Q"] == 0) and
(status[target]['_stats_']["D"] == 0))
status[target]['_done_'] = target_done_curr
all_done &= target_done_curr
# Print if flag is set and target_done is not True for two
# consecutive times.
if not (target_done_prev and
target_done_curr) and print_status_flag:
stats = status[target]['_stats_']
width = "0{}d".format(len(str(stats["T"])))
msg = "["
for s in stats.keys():
msg += s + ": {:{}}, ".format(stats[s], width)
msg = msg[:-2] + "]"
log.info("[%s]: [%s]: %s", get_timer_val(), target, msg)
return all_done
# Print legend once at the start of the run.
if Deploy.print_legend:
log.info("[legend]: [Q: queued, D: dispatched, "
"P: passed, F: failed, K: killed, T: total]")
Deploy.print_legend = False
status = OrderedDict()
print_status_flag = True
# Queue all items
queued_items = items
for item in queued_items:
add_status_target_queued(status, item)
all_done = False
while not all_done:
# Get status of dispatched items.
for item in dispatched_items:
if item.status == "D":
item.get_status()
if item.status != status[item.target][item]:
print_status_flag = True
if item.status != "D":
if item.status != "P":
# Kill its sub items if item did not pass.
item.set_sub_status("K")
log.error("[%s]: [%s]: [status] [%s: %s]",
get_timer_val(), item.target,
item.identifier, item.status)
else:
log.log(VERBOSE, "[%s]: [%s]: [status] [%s: %s]",
get_timer_val(), item.target,
item.identifier, item.status)
# Queue items' sub-items if it is done.
queued_items.extend(item.sub)
for sub_item in item.sub:
add_status_target_queued(status, sub_item)
update_status_target_stats(status, item)
# Dispatch items from the queue as slots free up.
all_done = (len(queued_items) == 0)
if not all_done:
num_slots = Deploy.max_parallel - Deploy.dispatch_counter
if num_slots > Deploy.slot_limit:
num_slots = Deploy.slot_limit
if num_slots > 0:
if len(queued_items) > num_slots:
dispatch_items(queued_items[0:num_slots])
dispatched_items.extend(queued_items[0:num_slots])
queued_items = queued_items[num_slots:]
else:
dispatch_items(queued_items)
dispatched_items.extend(queued_items)
queued_items = []
# Check if we are done and print the status periodically.
all_done &= check_if_done_and_print_status(status,
print_status_flag)
# Advance time by 1s if there is more work to do.
if not all_done:
time.sleep(1)
Deploy.increment_timer()
print_status_flag = has_print_interval_reached()
class CompileSim(Deploy):
"""
Abstraction for building the simulation executable.
"""
# Register all builds with the class
items = []
def __init__(self, build_mode, sim_cfg):
self.target = "build"
self.pass_patterns = []
self.fail_patterns = []
self.mandatory_cmd_attrs = {
# tool srcs
"tool_srcs": False,
"tool_srcs_dir": False,
# Flist gen
"sv_flist_gen_cmd": False,
"sv_flist_gen_dir": False,
"sv_flist_gen_opts": False,
# Build
"build_dir": False,
"build_cmd": False,
"build_opts": False
}
self.mandatory_misc_attrs = {
"cov_db_dir": False,
"build_pass_patterns": False,
"build_fail_patterns": False
}
# Initialize
super().__init__(sim_cfg)
super().parse_dict(build_mode.__dict__)
# Call this method again with the sim_cfg dict passed as the object,
# since it may contain additional mandatory attrs.
super().parse_dict(sim_cfg.__dict__)
self.build_mode = self.name
self.pass_patterns = self.build_pass_patterns
self.fail_patterns = self.build_fail_patterns
self.__post_init__()
# Start fail message construction
self.fail_msg = "\n**BUILD:** {}<br>\n".format(self.name)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
CompileSim.items.append(self)
def dispatch_cmd(self):
# Delete previous cov_db_dir if it exists before dispatching new build.
if os.path.exists(self.cov_db_dir):
os.system("rm -rf " + self.cov_db_dir)
super().dispatch_cmd()
class CompileOneShot(Deploy):
"""
Abstraction for building the simulation executable.
"""
# Register all builds with the class
items = []
def __init__(self, build_mode, sim_cfg):
self.target = "build"
self.pass_patterns = []
self.fail_patterns = []
self.mandatory_cmd_attrs = {
# tool srcs
"tool_srcs": False,
"tool_srcs_dir": False,
# Flist gen
"sv_flist_gen_cmd": False,
"sv_flist_gen_dir": False,
"sv_flist_gen_opts": False,
# Build
"build_dir": False,
"build_cmd": False,
"build_opts": False,
"build_log": False,
# Report processing
"report_cmd": False,
"report_opts": False
}
self.mandatory_misc_attrs = {}
# Initialize
super().__init__(sim_cfg)
super().parse_dict(build_mode.__dict__)
# Call this method again with the sim_cfg dict passed as the object,
# since it may contain additional mandatory attrs.
super().parse_dict(sim_cfg.__dict__)
self.build_mode = self.name
self.__post_init__()
# Start fail message construction
self.fail_msg = "\n**BUILD:** {}<br>\n".format(self.name)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
CompileOneShot.items.append(self)
class RunTest(Deploy):
"""
Abstraction for running tests. This is one per seed for each test.
"""
# Initial seed values when running tests (if available).
seeds = []
fixed_seed = None
# Register all runs with the class
items = []
def __init__(self, index, test, sim_cfg):
self.target = "run"
self.pass_patterns = []
self.fail_patterns = []
self.mandatory_cmd_attrs = {
"proj_root": False,
"uvm_test": False,
"uvm_test_seq": False,
"run_opts": False,
"sw_test": False,
"sw_test_is_prebuilt": False,
"sw_build_device": False,
"sw_build_dir": False,
"run_dir": False,
"run_cmd": False,
"run_opts": False
}
self.mandatory_misc_attrs = {
"run_dir_name": False,
"cov_db_test_dir": False,
"run_pass_patterns": False,
"run_fail_patterns": False
}
self.index = index
self.seed = RunTest.get_seed()
# Initialize
super().__init__(sim_cfg)
super().parse_dict(test.__dict__)
# Call this method again with the sim_cfg dict passed as the object,
# since it may contain additional mandatory attrs.
super().parse_dict(sim_cfg.__dict__)
self.test = self.name
self.renew_odir = True
self.build_mode = test.build_mode.name
self.pass_patterns = self.run_pass_patterns
self.fail_patterns = self.run_fail_patterns
self.__post_init__()
# For output dir link, use run_dir_name instead.
self.odir_ln = self.run_dir_name
# Start fail message construction
self.fail_msg = "\n**TEST:** {}, ".format(self.name)
self.fail_msg += "**SEED:** {}<br>\n".format(self.seed)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
RunTest.items.append(self)
def __post_init__(self):
super().__post_init__()
# Set identifier.
self.identifier = self.sim_cfg.name + ":" + self.run_dir_name
def get_status(self):
'''Override base class get_status implementation for additional post-status
actions.'''
super().get_status()
if self.status not in ["D", "P"]:
# Delete the coverage data if available.
if os.path.exists(self.cov_db_test_dir):
log.log(VERBOSE, "Deleting coverage data of failing test:\n%s",
self.cov_db_test_dir)
os.system("/bin/rm -rf " + self.cov_db_test_dir)
@staticmethod
def get_seed():
# If --seeds option is passed, then those custom seeds are consumed
# first. If --fixed-seed <val> is also passed, the subsequent tests
# (once the custom seeds are consumed) will be run with the fixed seed.
if not RunTest.seeds:
if RunTest.fixed_seed:
return RunTest.fixed_seed
for i in range(1000):
seed = random.getrandbits(32)
RunTest.seeds.append(seed)
return RunTest.seeds.pop(0)
class CovMerge(Deploy):
"""
Abstraction for merging coverage databases. An item of this class is created AFTER
the regression is completed.
"""
# Register all builds with the class
items = []
def __init__(self, sim_cfg):
self.target = "cov_merge"
self.pass_patterns = []
self.fail_patterns = []
# Construct local 'special' variable from cov directories that need to
# be merged.
self.cov_db_dirs = ""
self.mandatory_cmd_attrs = {
"cov_merge_cmd": False,
"cov_merge_opts": False
}
self.mandatory_misc_attrs = {
"cov_merge_dir": False,
"cov_merge_db_dir": False
}
# Initialize
super().__init__(sim_cfg)
super().parse_dict(sim_cfg.__dict__)
self.__post_init__()
# Override standard output and log patterns.
self.odir = self.cov_merge_db_dir
self.odir_ln = os.path.basename(os.path.normpath(self.odir))
# Start fail message construction
self.fail_msg = "\n**COV_MERGE:** {}<br>\n".format(self.name)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
CovMerge.items.append(self)
def __post_init__(self):
# Add cov db dirs from all the builds that were kicked off.
for bld in self.sim_cfg.builds:
self.cov_db_dirs += bld.cov_db_dir + " "
# Recursively search and replace wildcards, ignoring cov_db_dirs for now.
# We need to resolve it later based on cov_db_dirs value set below.
self.__dict__ = find_and_substitute_wildcards(
self.__dict__, self.__dict__, ignored_wildcards=["cov_db_dirs"])
# Prune previous merged cov directories.
prev_cov_db_dirs = self.odir_limiter(odir=self.cov_merge_db_dir)
# If a merged cov data base exists from a previous run, then consider
# that as well for merging, if the --cov-merge-previous command line
# switch is passed.
if self.sim_cfg.cov_merge_previous:
self.cov_db_dirs += prev_cov_db_dirs
# Append cov_db_dirs to the list of exports.
self.exports["cov_db_dirs"] = "\"{}\"".format(self.cov_db_dirs)
# Call base class __post_init__ to do checks and substitutions
super().__post_init__()
class CovReport(Deploy):
"""
Abstraction for coverage report generation. An item of this class is created AFTER
the regression is completed.
"""
# Register all builds with the class
items = []
def __init__(self, sim_cfg):
self.target = "cov_report"
self.pass_patterns = []
self.fail_patterns = []
self.cov_total = ""
self.cov_results = ""
self.mandatory_cmd_attrs = {
"cov_report_cmd": False,
"cov_report_opts": False
}
self.mandatory_misc_attrs = {
"cov_report_dir": False,
"cov_merge_db_dir": False,
"cov_report_txt": False
}
# Initialize
super().__init__(sim_cfg)
super().parse_dict(sim_cfg.__dict__)
self.__post_init__()
# Start fail message construction
self.fail_msg = "\n**COV_REPORT:** {}<br>\n".format(self.name)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
CovReport.items.append(self)
def get_status(self):
super().get_status()
# Once passed, extract the cov results summary from the dashboard.
if self.status == "P":
results, self.cov_total, ex_msg = get_cov_summary_table(
self.cov_report_txt, self.sim_cfg.tool)
if not ex_msg:
# Succeeded in obtaining the coverage data.
colalign = (("center", ) * len(results[0]))
self.cov_results = tabulate(results,
headers="firstrow",
tablefmt="pipe",
colalign=colalign)
else:
self.fail_msg += ex_msg
log.error(ex_msg)
self.status = "F"
if self.status == "P":
# Delete the cov report - not needed.
os.system("rm -rf " + self.log)
class CovAnalyze(Deploy):
"""
Abstraction for coverage analysis tool.
"""
# Register all builds with the class
items = []
def __init__(self, sim_cfg):
self.target = "cov_analyze"
self.pass_patterns = []
self.fail_patterns = []
self.mandatory_cmd_attrs = {
# tool srcs
"tool_srcs": False,
"tool_srcs_dir": False,
"cov_analyze_cmd": False,
"cov_analyze_opts": False
}
self.mandatory_misc_attrs = {
"cov_analyze_dir": False,
"cov_merge_db_dir": False
}
# Initialize
super().__init__(sim_cfg)
super().parse_dict(sim_cfg.__dict__)
self.__post_init__()
# Start fail message construction
self.fail_msg = "\n**COV_ANALYZE:** {}<br>\n".format(self.name)
log_sub_path = self.log.replace(self.sim_cfg.scratch_path + '/', '')
self.fail_msg += "**LOG:** $scratch_path/{}<br>\n".format(log_sub_path)
CovAnalyze.items.append(self)