[dvsim] Use builtins wherever possible
This change gets rid of `os.system` calls, `grep`s and other things that
are better achieved by using python built-ins.
Apart from that, there are lint fixes and other very minor changes.
Signed-off-by: Srikrishna Iyer <sriyer@google.com>
diff --git a/util/dvsim/Deploy.py b/util/dvsim/Deploy.py
index 4ff7a1b..7ed1e01 100644
--- a/util/dvsim/Deploy.py
+++ b/util/dvsim/Deploy.py
@@ -8,12 +8,15 @@
import random
import re
import shlex
+import shutil
import subprocess
import sys
+from datetime import datetime
+from pathlib import Path
from sim_utils import get_cov_summary_table
from tabulate import tabulate
-from utils import VERBOSE, find_and_substitute_wildcards, run_cmd
+from utils import VERBOSE, find_and_substitute_wildcards, rm_path
class DeployError(Exception):
@@ -36,8 +39,7 @@
def __str__(self):
return (pprint.pformat(self.__dict__)
- if log.getLogger().isEnabledFor(VERBOSE)
- else self.cmd)
+ if log.getLogger().isEnabledFor(VERBOSE) else self.cmd)
def __init__(self, sim_cfg):
'''Initialize common class members.'''
@@ -195,9 +197,6 @@
if type(value) is str:
value = value.strip()
cmd += " " + attr + "=\"" + str(value) + "\""
-
- # TODO: If not running locally, redirect stdout and err to the log file
- # self.cmd += " > " + self.log + " 2>&1 &"
return cmd
def is_equivalent_job(self, item):
@@ -253,7 +252,7 @@
# If renew_odir flag is True - then move it.
if self.renew_odir:
self.odir_limiter(odir=self.odir)
- os.system("mkdir -p " + self.odir)
+ os.makedirs(self.odir, exist_ok=True)
# Dump all env variables for ease of debug.
with open(self.odir + "/env_vars",
"w",
@@ -262,8 +261,7 @@
for var in sorted(exports.keys()):
f.write("{}={}\n".format(var, exports[var]))
f.close()
- os.system("ln -s " + self.odir + " " + self.sim_cfg.links['D'] +
- '/' + self.odir_ln)
+ self._link_odir("D")
f = open(self.log, "w", encoding="UTF-8", errors="surrogateescape")
f.write("[Executing]:\n{}\n\n".format(self.cmd))
f.flush()
@@ -279,118 +277,141 @@
self.log_fd.close()
raise DeployError('IO Error: See {}'.format(self.log))
- def odir_limiter(self, odir, max_odirs=-1):
- '''Function to backup previously run output directory to maintain a
- history of a limited number of output directories. It deletes the output
- directory with the oldest timestamps, if the limit is reached. It returns
- a list of directories that remain after deletion.
- Arguments:
- odir: The output directory to backup
- max_odirs: Maximum output directories to maintain as history.
+ def odir_limiter(self, odir):
+ """Clean previous output directories.
- Returns:
- dirs: Space-separated list of directories that remain after deletion.
- '''
- try:
- # If output directory exists, back it up.
- if os.path.exists(odir):
- ts = run_cmd("date '+" + self.sim_cfg.ts_format + "' -d \"" +
- "$(stat -c '%y' " + odir + ")\"")
- os.system('mv ' + odir + " " + odir + "_" + ts)
- except IOError:
- log.error('Failed to back up existing output directory %s', odir)
+ When running jobs, we may want to maintain a limited history of
+ previous invocations. This method finds and deletes the output
+ directories at the base of input arg 'odir' with the oldest timestamps,
+ if that limit is reached. It returns a list of directories that
+ remain after deletion.
+ """
- dirs = ""
- # Delete older directories.
- try:
- pdir = os.path.realpath(odir + "/..")
- # Fatal out if pdir got set to root.
- if pdir == "/":
- log.fatal(
- "Something went wrong while processing \"%s\": odir = \"%s\"",
- self.name, odir)
- sys.exit(1)
+ if not os.path.exists(odir):
+ return []
- if os.path.exists(pdir):
- find_cmd = "find " + pdir + " -mindepth 1 -maxdepth 1 -type d "
- dirs = run_cmd(find_cmd)
- dirs = dirs.replace('\n', ' ')
- list_dirs = dirs.split()
- num_dirs = len(list_dirs)
- if max_odirs == -1:
- max_odirs = self.max_odirs
- num_rm_dirs = num_dirs - max_odirs
- if num_rm_dirs > -1:
- rm_dirs = run_cmd(find_cmd +
- "-printf '%T+ %p\n' | sort | head -n " +
- str(num_rm_dirs + 1) +
- " | awk '{print $2}'")
- rm_dirs = rm_dirs.replace('\n', ' ')
- dirs = dirs.replace(rm_dirs, "")
- os.system("/bin/rm -rf " + rm_dirs)
- except IOError:
- log.error("Failed to delete old run directories!")
- return dirs
+ # If output directory exists, back it up.
+ ts = datetime.fromtimestamp(os.stat(odir).st_ctime)
+ ts = ts.strftime(self.sim_cfg.ts_format)
+ shutil.move(odir, odir + "_" + ts)
+
+ # Get list of past output directories sorted by creation time.
+ pdir = Path(odir).resolve().parent
+ dirs = sorted([old for old in pdir.iterdir() if old.is_dir()],
+ key=os.path.getctime,
+ reverse=True)
+
+ for old in dirs[self.max_odirs - 1:]:
+ rm_path(old)
+
+ return dirs[0:self.max_odirs - 2]
def _test_passed(self):
- '''Return True if the job passed, False otherwise
+ """Determine the outcome of the job (P/F if it ran to completion).
- This is called by poll() just after the job finishes.
+ Return True if the job passed, False otherwise. This is called by
+ poll() just after the job finishes.
- '''
+ """
+ def log_fail_msg(msg):
+ '''Logs the fail msg to the final report.'''
+ self.fail_msg += msg
+ log.log(VERBOSE, msg)
+
+ def _find_patterns(patterns, line):
+ '''Helper function that returns true if all or any of the given
+ patterns is found, else False.'''
+
+ assert patterns
+ for pattern in patterns:
+ match = re.search(r"{}".format(pattern), line)
+ if match:
+ return pattern
+ return None
+
+ def _get_n_lines(pos, num):
+ "Helper function that returns next N lines starting at pos index."
+
+ return ''.join(lines[pos:pos + num - 1]).strip()
+
if self.dry_run:
return True
- seen_fail_pattern = False
- for fail_pattern in self.fail_patterns:
- # Return error message with the following 4 lines.
- grep_cmd = "grep -m 1 -A 4 -E \'" + fail_pattern + "\' " + self.log
- (status, rslt) = subprocess.getstatusoutput(grep_cmd)
- if rslt:
- msg = "```\n{}\n```\n".format(rslt)
- self.fail_msg += msg
- log.log(VERBOSE, msg)
- seen_fail_pattern = True
- break
+ # Only one fail pattern needs to be seen.
+ failed = False
+ chk_failed = bool(self.fail_patterns)
- if seen_fail_pattern:
+ # All pass patterns need to be seen, so we replicate the list and remove
+ # patterns as we encounter them.
+ pass_patterns = self.pass_patterns.copy()
+ chk_passed = bool(pass_patterns) and (self.process.returncode == 0)
+
+ try:
+ with open(self.log, "r", encoding="UTF-8") as f:
+ lines = f.readlines()
+ except OSError as e:
+ log_fail_msg("Error opening file {!r}:\n{}".format(self.log, e))
+ return False
+
+ if chk_failed or chk_passed:
+ for cnt, line in enumerate(lines):
+ if chk_failed:
+ if _find_patterns(self.fail_patterns, line) is not None:
+ # Print 4 additional lines to help debug more easily.
+ log_fail_msg("```\n{}\n```\n".format(
+ _get_n_lines(cnt, 5)))
+ failed = True
+ chk_failed = False
+ chk_passed = False
+
+ if chk_passed:
+ pattern = _find_patterns(pass_patterns, line)
+ if pattern is not None:
+ pass_patterns.remove(pattern)
+ chk_passed = bool(pass_patterns)
+
+ # If failed, then nothing else to do. Just return.
+ if failed:
return False
# If no fail patterns were seen, but the job returned with non-zero
# exit code for whatever reason, then show the last 10 lines of the log
# as the failure message, which might help with the debug.
if self.process.returncode != 0:
- msg = "Last 10 lines of the log:<br>\n"
- self.fail_msg += msg
- log.log(VERBOSE, msg)
- get_fail_msg_cmd = "tail -n 10 " + self.log
- msg = run_cmd(get_fail_msg_cmd)
- msg = "```\n{}\n```\n".format(msg)
- self.fail_msg += msg
- log.log(VERBOSE, msg)
+ msg = ''.join(lines[-10:]).strip()
+ log_fail_msg("Process returned non-zero exit code. "
+ "Last 10 lines:\n```\n{}\n```\n".format(msg))
return False
- # If we get here, we've not seen anything explicitly wrong, but we
- # might have "pass patterns": patterns that must occur in the log for
- # the run to be considered successful.
- for pass_pattern in self.pass_patterns:
- grep_cmd = "grep -c -m 1 -E \'" + pass_pattern + "\' " + self.log
- (status, rslt) = subprocess.getstatusoutput(grep_cmd)
- if rslt == "0":
- msg = "Pass pattern {!r} not found.<br>\n".format(pass_pattern)
- self.fail_msg += msg
- log.log(VERBOSE, msg)
- return False
+ # Ensure all pass patterns were seen.
+ if chk_passed:
+ msg = ''.join(lines[-10:]).strip()
+ log_fail_msg("One or more pass patterns not found:\n{}\n"
+ "Last 10 lines:\n```\n{}\n```\n".format(
+ pass_patterns, msg))
+ return False
return True
def _link_odir(self, status):
- old_link = self.sim_cfg.links['D'] + "/" + self.odir_ln
- new_link = self.sim_cfg.links[status] + "/" + self.odir_ln
- cmd = "ln -s " + self.odir + " " + new_link + "; "
- cmd += "rm " + old_link
- if os.system(cmd):
- log.error("Cmd \"%s\" could not be run", cmd)
+ '''Soft-links the job's directory based on job's status, into
+ dispatched, running, passed, failed or killed directories in the
+ scratch area.'''
+
+ dest = Path(self.sim_cfg.links[status], self.odir_ln)
+
+ # If dest exists, then atomically remove it and link the odir again.
+ while True:
+ try:
+ os.symlink(self.odir, dest)
+ break
+ except FileExistsError:
+ rm_path(dest)
+
+ # Delete the symlink from dispatched directory if it exists.
+ if status != "D":
+ old = Path(self.sim_cfg.links['D'], self.odir_ln)
+ rm_path(old)
def _on_finish(self, status):
'''Called when the process finishes or is killed'''
@@ -524,9 +545,9 @@
CompileSim.items.append(self)
def dispatch_cmd(self):
- # Delete previous cov_db_dir if it exists before dispatching new build.
- if os.path.exists(self.cov_db_dir):
- os.system("rm -rf " + self.cov_db_dir)
+ # Delete old coverage database directories before building again. We
+ # need to do this becuase build directory is not 'renewed'.
+ rm_path(self.cov_db_dir)
super().dispatch_cmd()
@@ -663,10 +684,7 @@
super()._on_finish(status)
if status != 'P':
# Delete the coverage data if available.
- if os.path.exists(self.cov_db_test_dir):
- log.log(VERBOSE, "Deleting coverage data of failing test:\n%s",
- self.cov_db_test_dir)
- os.system("/bin/rm -rf " + self.cov_db_test_dir)
+ rm_path(self.cov_db_test_dir)
@staticmethod
def get_seed():
@@ -882,7 +900,7 @@
colalign=colalign)
# Delete the cov report - not needed.
- os.system("rm -rf " + self.log)
+ rm_path(self.log)
return True
diff --git a/util/dvsim/FlowCfg.py b/util/dvsim/FlowCfg.py
index 1f7e442..199db9b 100644
--- a/util/dvsim/FlowCfg.py
+++ b/util/dvsim/FlowCfg.py
@@ -14,7 +14,7 @@
from CfgJson import set_target_attribute
from Scheduler import Scheduler
from utils import (VERBOSE, find_and_substitute_wildcards, md_results_to_html,
- subst_wildcards)
+ rm_path, subst_wildcards)
# Interface class for extensions.
@@ -209,11 +209,8 @@
self.cfgs.append(self.create_instance(mk_config, temp_cfg_file))
# Delete the temp_cfg_file once the instance is created
- try:
- log.log(VERBOSE, "Deleting temp cfg file:\n%s", temp_cfg_file)
- os.system("/bin/rm -rf " + temp_cfg_file)
- except IOError:
- log.error("Failed to remove temp cfg file:\n%s", temp_cfg_file)
+ log.log(VERBOSE, "Deleting temp cfg file:\n%s", temp_cfg_file)
+ rm_path(temp_cfg_file, ignore_error=True)
else:
log.error(
@@ -549,11 +546,10 @@
md_results_to_html(self.results_title, self.css_file,
publish_results_md))
f.close()
- rm_cmd += "/bin/rm -rf " + results_html_file + "; "
log.info("Publishing results to %s", results_page_url)
cmd = (self.results_server_cmd + " cp " + results_html_file + " " +
- self.results_server_page + "; " + rm_cmd)
+ self.results_server_page)
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
@@ -563,6 +559,7 @@
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
+ rm_path(results_html_file)
def publish_results(self):
'''Public facing API for publishing results to the opentitan web
@@ -589,11 +586,10 @@
md_results_to_html(self.results_title, self.css_file,
self.results_summary_md))
f.close()
- rm_cmd = "/bin/rm -rf " + results_html_file + "; "
log.info("Publishing results summary to %s", results_page_url)
cmd = (self.results_server_cmd + " cp " + results_html_file + " " +
- self.results_summary_server_page + "; " + rm_cmd)
+ self.results_summary_server_page)
log.log(VERBOSE, cmd)
try:
cmd_output = subprocess.run(args=cmd,
@@ -603,6 +599,7 @@
log.log(VERBOSE, cmd_output.stdout.decode("utf-8"))
except Exception as e:
log.error("%s: Failed to publish results:\n\"%s\"", e, str(cmd))
+ rm_path(results_html_file)
def has_errors(self):
return self.errors_seen
diff --git a/util/dvsim/OneShotCfg.py b/util/dvsim/OneShotCfg.py
index 1612734..2dc3d5a 100644
--- a/util/dvsim/OneShotCfg.py
+++ b/util/dvsim/OneShotCfg.py
@@ -7,12 +7,12 @@
import logging as log
import os
-import sys
from collections import OrderedDict
from Deploy import CompileOneShot
from FlowCfg import FlowCfg
from Modes import BuildModes, Modes
+from utils import rm_path
class OneShotCfg(FlowCfg):
@@ -110,13 +110,9 @@
# Purge the output directories. This operates on self.
def _purge(self):
- if self.scratch_path:
- try:
- log.info("Purging scratch path %s", self.scratch_path)
- os.system("/bin/rm -rf " + self.scratch_path)
- except IOError:
- log.error('Failed to purge scratch directory %s',
- self.scratch_path)
+ assert self.scratch_path
+ log.info("Purging scratch path %s", self.scratch_path)
+ rm_path(self.scratch_path)
def _create_objects(self):
# Create build and run modes objects
@@ -142,21 +138,9 @@
def _create_dirs(self):
'''Create initial set of directories
'''
- # Invoking system calls has a performance penalty.
- # Construct a single command line chained with '&&' to invoke
- # the system call only once, rather than multiple times.
- create_link_dirs_cmd = ""
for link in self.links.keys():
- create_link_dirs_cmd += "/bin/rm -rf " + self.links[link] + " && "
- create_link_dirs_cmd += "mkdir -p " + self.links[link] + " && "
- create_link_dirs_cmd += " true"
-
- try:
- os.system(create_link_dirs_cmd)
- except IOError:
- log.error("Error running when running the cmd \"%s\"",
- create_link_dirs_cmd)
- sys.exit(1)
+ rm_path(self.links[link])
+ os.makedirs(self.links[link])
def _create_deploy_objects(self):
'''Create deploy objects from build modes
diff --git a/util/dvsim/SimCfg.py b/util/dvsim/SimCfg.py
index 15cb631..c9f9074 100644
--- a/util/dvsim/SimCfg.py
+++ b/util/dvsim/SimCfg.py
@@ -16,10 +16,9 @@
from FlowCfg import FlowCfg
from Modes import BuildModes, Modes, Regressions, RunModes, Tests
from tabulate import tabulate
-from utils import VERBOSE
-
-from testplanner.class_defs import TestResult, Testplan
+from testplanner.class_defs import Testplan, TestResult
from testplanner.testplan_utils import parse_testplan
+from utils import VERBOSE, rm_path
def pick_wave_format(fmts):
@@ -282,13 +281,9 @@
# Purge the output directories. This operates on self.
def _purge(self):
- if self.scratch_path:
- try:
- log.info("Purging scratch path %s", self.scratch_path)
- os.system("/bin/rm -rf " + self.scratch_path)
- except IOError:
- log.error('Failed to purge scratch directory %s',
- self.scratch_path)
+ assert self.scratch_path
+ log.info("Purging scratch path %s", self.scratch_path)
+ rm_path(self.scratch_path)
def _create_objects(self):
# Create build and run modes objects
@@ -444,21 +439,9 @@
def _create_dirs(self):
'''Create initial set of directories
'''
- # Invoking system calls has a performance penalty.
- # Construct a single command line chained with '&&' to invoke
- # the system call only once, rather than multiple times.
- create_link_dirs_cmd = ""
for link in self.links.keys():
- create_link_dirs_cmd += "/bin/rm -rf " + self.links[link] + " && "
- create_link_dirs_cmd += "mkdir -p " + self.links[link] + " && "
- create_link_dirs_cmd += " true"
-
- try:
- os.system(create_link_dirs_cmd)
- except IOError:
- log.error("Error running when running the cmd \"%s\"",
- create_link_dirs_cmd)
- sys.exit(1)
+ rm_path(self.links[link])
+ os.makedirs(self.links[link])
def _expand_run_list(self, build_map):
'''Generate a list of tests to be run
@@ -532,8 +515,8 @@
test.build_mode = Modes.find_mode(
build_map[test.build_mode].name, self.build_modes)
- self.runs = ([] if self.build_only
- else self._expand_run_list(build_map))
+ self.runs = ([] if self.build_only else
+ self._expand_run_list(build_map))
self.deploy = self.builds + self.runs
diff --git a/util/dvsim/dvsim.py b/util/dvsim/dvsim.py
index 64e6e3e..8385aed 100755
--- a/util/dvsim/dvsim.py
+++ b/util/dvsim/dvsim.py
@@ -23,18 +23,17 @@
import datetime
import logging as log
import os
-from pathlib import Path
-import shutil
import shlex
import subprocess
import sys
import textwrap
+from pathlib import Path
-import Deploy
+from CfgFactory import make_cfg
+from Deploy import Deploy, RunTest
from Scheduler import Scheduler
from Timer import Timer
-import utils
-from CfgFactory import make_cfg
+from utils import VERBOSE, rm_path, run_cmd_with_timeout
# TODO: add dvsim_cfg.hjson to retrieve this info
version = 0.1
@@ -60,10 +59,9 @@
# Scratch space could be mounted in a filesystem (such as NFS) on a network drive.
# If the network is down, it could cause the access access check to hang. So run a
# simple ls command with a timeout to prevent the hang.
- (out,
- status) = utils.run_cmd_with_timeout(cmd="ls -d " + scratch_root,
- timeout=1,
- exit_on_failure=0)
+ (out, status) = run_cmd_with_timeout(cmd="ls -d " + scratch_root,
+ timeout=1,
+ exit_on_failure=0)
if status == 0 and out != "":
arg_scratch_root = scratch_root
else:
@@ -76,13 +74,17 @@
arg_scratch_root = os.path.realpath(arg_scratch_root)
try:
- os.system("mkdir -p " + arg_scratch_root)
- except OSError:
- log.fatal(
- "Invalid --scratch-root=\"%s\" switch - failed to create directory!",
- arg_scratch_root)
+ os.makedirs(arg_scratch_root, exist_ok=True)
+ except PermissionError as e:
+ log.fatal("Failed to create scratch root {}:\n{}.".format(
+ arg_scratch_root, e))
sys.exit(1)
- return (arg_scratch_root)
+
+ if not os.access(arg_scratch_root, os.W_OK):
+ log.fatal("Scratch root {} is not writable!".format(arg_scratch_root))
+ sys.exit(1)
+
+ return arg_scratch_root
def read_max_parallel(arg):
@@ -178,7 +180,7 @@
proj_root_dest = os.path.join(args.scratch_root, args.branch,
"repo_top")
if args.purge:
- shutil.rmtree(proj_root_dest, ignore_errors=True)
+ rm_path(proj_root_dest)
copy_repo(proj_root_src, proj_root_dest, args.dry_run)
else:
proj_root_dest = proj_root_src
@@ -194,17 +196,19 @@
exclude patterns to skip certain things from being copied over. With GitHub
repos, an existing `.gitignore` serves this purpose pretty well.
'''
- rsync_cmd = ["rsync",
- "--recursive", "--links", "--checksum", "--update",
- "--inplace", "--no-group"]
+ rsync_cmd = [
+ "rsync", "--recursive", "--links", "--checksum", "--update",
+ "--inplace", "--no-group"
+ ]
# Supply `.gitignore` from the src area to skip temp files.
ignore_patterns_file = os.path.join(src, ".gitignore")
if os.path.exists(ignore_patterns_file):
# TODO: hack - include hw/foundry since it is excluded in .gitignore.
- rsync_cmd += ["--include=hw/foundry",
- "--exclude-from={}".format(ignore_patterns_file),
- "--exclude=.*"]
+ rsync_cmd += [
+ "--include=hw/foundry",
+ "--exclude-from={}".format(ignore_patterns_file), "--exclude=.*"
+ ]
rsync_cmd += [src + "/.", dest]
rsync_str = ' '.join([shlex.quote(w) for w in rsync_cmd])
@@ -212,7 +216,7 @@
cmd = ["flock", "--timeout", "600", dest, "--command", rsync_str]
log.info("[copy_repo] [dest]: %s", dest)
- log.log(utils.VERBOSE, "[copy_repo] [cmd]: \n%s", ' '.join(cmd))
+ log.log(VERBOSE, "[copy_repo] [cmd]: \n%s", ' '.join(cmd))
if not dry_run:
# Make sure the dest exists first.
os.makedirs(dest, exist_ok=True)
@@ -582,12 +586,12 @@
args = parse_args()
# Add log level 'VERBOSE' between INFO and DEBUG
- log.addLevelName(utils.VERBOSE, 'VERBOSE')
+ log.addLevelName(VERBOSE, 'VERBOSE')
log_format = '%(levelname)s: [%(module)s] %(message)s'
log_level = log.INFO
if args.verbose == "default":
- log_level = utils.VERBOSE
+ log_level = VERBOSE
elif args.verbose == "debug":
log_level = log.DEBUG
log.basicConfig(format=log_format, level=log_level)
@@ -628,16 +632,16 @@
setattr(args, "timestamp", timestamp)
# Register the seeds from command line with RunTest class.
- Deploy.RunTest.seeds = args.seeds
+ RunTest.seeds = args.seeds
# If we are fixing a seed value, no point in tests having multiple reseeds.
if args.fixed_seed:
args.reseed = 1
- Deploy.RunTest.fixed_seed = args.fixed_seed
+ RunTest.fixed_seed = args.fixed_seed
# Register the common deploy settings.
Timer.print_interval = args.print_interval
Scheduler.max_parallel = args.max_parallel
- Deploy.Deploy.max_odirs = args.max_odirs
+ Deploy.max_odirs = args.max_odirs
# Build infrastructure from hjson file and create the list of items to
# be deployed.
diff --git a/util/dvsim/utils.py b/util/dvsim/utils.py
index d18dd53..241b322 100644
--- a/util/dvsim/utils.py
+++ b/util/dvsim/utils.py
@@ -9,6 +9,7 @@
import os
import re
import shlex
+import shutil
import subprocess
import sys
import time
@@ -103,7 +104,8 @@
try:
return ' '.join(_stringify_wildcard_value(x) for x in value)
except TypeError:
- raise ValueError('Wildcard had value {!r} which is not of a supported type.')
+ raise ValueError('Wildcard had value {!r} which is not of a supported '
+ 'type.'.format(value))
def _subst_wildcards(var, mdict, ignored, ignore_error, seen):
@@ -144,13 +146,12 @@
# That's not allowed!
if name in seen:
raise ValueError('String contains circular expansion of '
- 'wildcard {!r}.'
- .format(match.group(0)))
+ 'wildcard {!r}.'.format(match.group(0)))
# Treat eval_cmd specially
if name == 'eval_cmd':
- cmd = _subst_wildcards(right_str[match.end():],
- mdict, ignored, ignore_error, seen)[0]
+ cmd = _subst_wildcards(right_str[match.end():], mdict, ignored,
+ ignore_error, seen)[0]
# Are there any wildcards left in cmd? If not, we can run the
# command and we're done.
@@ -170,8 +171,7 @@
if bad_names:
raise ValueError('Cannot run eval_cmd because the command '
'expands to {!r}, which still contains a '
- 'wildcard.'
- .format(cmd))
+ 'wildcard.'.format(cmd))
# We can't run the command (because it still has wildcards), but we
# don't want to report an error either because ignore_error is true
@@ -193,20 +193,19 @@
continue
raise ValueError('String to be expanded contains '
- 'unknown wildcard, {!r}.'
- .format(match.group(0)))
+ 'unknown wildcard, {!r}.'.format(match.group(0)))
value = _stringify_wildcard_value(value)
# Do any recursive expansion of value, adding name to seen (to avoid
# circular recursion).
- value, saw_err = _subst_wildcards(value, mdict,
- ignored, ignore_error, seen + [name])
+ value, saw_err = _subst_wildcards(value, mdict, ignored, ignore_error,
+ seen + [name])
# Replace the original match with the result and go around again. If
# saw_err, increment idx past what we just inserted.
- var = (var[:idx] +
- right_str[:match.start()] + value + right_str[match.end():])
+ var = (var[:idx] + right_str[:match.start()] + value +
+ right_str[match.end():])
if saw_err:
any_err = True
idx += match.start() + len(value)
@@ -279,7 +278,8 @@
'''
try:
- return _subst_wildcards(var, mdict, ignored_wildcards, ignore_error, [])[0]
+ return _subst_wildcards(var, mdict, ignored_wildcards, ignore_error,
+ [])[0]
except ValueError as err:
log.error(str(err))
sys.exit(1)
@@ -518,3 +518,26 @@
break
md_results += "```\n"
return md_results
+
+
+def rm_path(path, ignore_error=False):
+ '''Removes the specified path if it exists.
+
+ 'path' is a Path-like object. If it does not exist, the function simply
+ returns. If 'ignore_error' is set, then exception caught by the remove
+ operation is raised, else it is ignored.
+ '''
+
+ try:
+ if os.path.islink(path):
+ os.remove(path)
+ elif os.path.isdir(path):
+ shutil.rmtree(path)
+ else:
+ os.remove(path)
+ except FileNotFoundError:
+ pass
+ except OSError as e:
+ log.error("Failed to remove {}:\n{}.".format(path, e))
+ if not ignore_error:
+ raise e