[dvsim] Scheduler updates - max_parallel, max_poll
The max_parallel setting previously was a Scheduler class variable. It
has now been moved to the Launcher class, since each launcher variant
has different needs.
Likewise, 2 new features are added - `max_poll` and `poll_freq`. By
default, the max_poll 10K, and the polling frequency is 1 second.
The purpose of adding these knobs is to reduce the polling rate and the
number of jobs polled at a given time. With external launchers such as
LSF and cloud, polling for a job completion could involve multiple steps
and some external command invocations that are time-consuming. These
features help limit the time taken by the Scheduler to poll the entire
set of jobs. If we do not provide a way to fine-tune / adjust these
parameters, DVSim will appear to have hung (or worse, it could crash the
system).
The max_poll needs to cycle through the list of running jobs, rather
than just the first N ones. For example, if there are 5 jobs running [a,
b, c, d, e], and max_poll is set to 2, on the first poll window, it
polls for jobs a and b to complete. Then it polls b and c, followed by e
and a and so on. To allow this to happen, the list of running jobs now
needs to be maintained as a circular linked list, that supports
iterating through the list, while allowing jobs to be added or removed
from the list in realtime. This is achieved in the newly added
`CircularList` class.
Signed-off-by: Srikrishna Iyer <sriyer@google.com>
diff --git a/util/dvsim/FlowCfg.py b/util/dvsim/FlowCfg.py
index 49443cc..498fa62 100644
--- a/util/dvsim/FlowCfg.py
+++ b/util/dvsim/FlowCfg.py
@@ -12,6 +12,7 @@
import hjson
from CfgJson import set_target_attribute
+from LauncherFactory import get_launcher_cls
from Scheduler import Scheduler
from utils import (VERBOSE, find_and_substitute_wildcards, md_results_to_html,
rm_path, subst_wildcards)
@@ -371,7 +372,7 @@
log.fatal("Nothing to run!")
sys.exit(1)
- return Scheduler(deploy).run()
+ return Scheduler(deploy, get_launcher_cls()).run()
def _gen_results(self, results):
'''
diff --git a/util/dvsim/Launcher.py b/util/dvsim/Launcher.py
index ebd7683..5580ebe 100644
--- a/util/dvsim/Launcher.py
+++ b/util/dvsim/Launcher.py
@@ -6,6 +6,7 @@
import logging as log
import os
import re
+import sys
from pathlib import Path
from utils import VERBOSE, clean_odirs, rm_path
@@ -16,10 +17,11 @@
self.msg = msg
-class ErrorMessage(collections.namedtuple(
- 'ErrorMessage',
- ['line_number', 'message', 'context'],
-)):
+class ErrorMessage(
+ collections.namedtuple(
+ 'ErrorMessage',
+ ['line_number', 'message', 'context'],
+ )):
"""Contains error-related information.
This support classification of failures into buckets. The message field
@@ -42,6 +44,15 @@
# Type of launcher used as string.
variant = None
+ # Max jobs running at one time
+ max_parallel = sys.maxsize
+
+ # Max jobs polled at one time
+ max_poll = 10000
+
+ # Poll job's completion status every this many seconds
+ poll_freq = 1
+
# Points to the python virtual env area.
pyvenv = None
@@ -217,7 +228,8 @@
Returns (status, err_msg) extracted from the log, where the status is
"P" if the it passed, "F" otherwise. This is invoked by poll() just
- after the job finishes.
+ after the job finishes. err_msg is an instance of the named tuple
+ ErrorMessage.
"""
def _find_patterns(patterns, line):
"""Helper function that returns the pattern if any of the given
@@ -242,7 +254,9 @@
chk_passed = bool(pass_patterns) and (self.exit_code == 0)
try:
- with open(self.deploy.get_log_path(), "r", encoding="UTF-8",
+ with open(self.deploy.get_log_path(),
+ "r",
+ encoding="UTF-8",
errors="surrogateescape") as f:
lines = f.readlines()
except OSError as e:
@@ -288,6 +302,9 @@
"""Do post-completion activities, such as preparing the results.
Must be invoked by poll(), after the job outcome is determined.
+
+ status is the status of the job, either 'P', 'F' or 'K'.
+ err_msg is an instance of the named tuple ErrorMessage.
"""
assert status in ['P', 'F', 'K']
@@ -296,14 +313,6 @@
self.deploy.post_finish(status)
log.debug("Item %s has completed execution: %s", self, status)
if status != "P":
- self._log_fail_msg(err_msg)
-
- def _log_fail_msg(self, msg):
- """Logs the fail msg for the final report.
-
- Invoked in _post_finish() only if the job did not pass.
- """
-
- assert msg and isinstance(msg, ErrorMessage)
- self.fail_msg = msg
- log.log(VERBOSE, msg)
+ assert err_msg and isinstance(err_msg, ErrorMessage)
+ self.fail_msg = err_msg
+ log.log(VERBOSE, err_msg.message)
diff --git a/util/dvsim/LauncherFactory.py b/util/dvsim/LauncherFactory.py
index fdfaaeb..0e7479b 100644
--- a/util/dvsim/LauncherFactory.py
+++ b/util/dvsim/LauncherFactory.py
@@ -4,12 +4,10 @@
import logging as log
import os
-import sys
from Launcher import Launcher
from LocalLauncher import LocalLauncher
from LsfLauncher import LsfLauncher
-from Scheduler import Scheduler
try:
from EdaCloudLauncher import EdaCloudLauncher
@@ -18,7 +16,7 @@
EDACLOUD_LAUNCHER_EXISTS = False
# The chosen launcher class.
-launcher_cls = None
+_LAUNCHER_CLS = None
def set_launcher_type(is_local=False):
@@ -36,29 +34,29 @@
launcher = "local"
Launcher.variant = launcher
- global launcher_cls
+ global _LAUNCHER_CLS
if launcher == "local":
- launcher_cls = LocalLauncher
+ _LAUNCHER_CLS = LocalLauncher
elif launcher == "lsf":
- launcher_cls = LsfLauncher
-
- # The max_parallel setting is not relevant when dispatching with LSF.
- Scheduler.max_parallel = sys.maxsize
+ _LAUNCHER_CLS = LsfLauncher
# These custom launchers are site specific. They may not be committed to
# the open source repo.
elif launcher == "edacloud" and EDACLOUD_LAUNCHER_EXISTS:
- launcher_cls = EdaCloudLauncher
-
- # The max_parallel setting is not relevant when dispatching with
- # EDACloud.
- Scheduler.max_parallel = sys.maxsize
+ _LAUNCHER_CLS = EdaCloudLauncher
else:
log.error("Launcher {} set using DVSIM_LAUNCHER env var does not "
"exist. Using local launcher instead.".format(launcher))
- launcher_cls = LocalLauncher
+ _LAUNCHER_CLS = LocalLauncher
+
+
+def get_launcher_cls():
+ '''Returns the chosen launcher class.'''
+
+ assert _LAUNCHER_CLS is not None
+ return _LAUNCHER_CLS
def get_launcher(deploy):
@@ -67,6 +65,5 @@
'deploy' is an instance of the deploy class to with the launcher is paired.
'''
- global launcher_cls
- assert launcher_cls is not None
- return launcher_cls(deploy)
+ assert _LAUNCHER_CLS is not None
+ return _LAUNCHER_CLS(deploy)
diff --git a/util/dvsim/Scheduler.py b/util/dvsim/Scheduler.py
index f7605bd..eb900ff 100644
--- a/util/dvsim/Scheduler.py
+++ b/util/dvsim/Scheduler.py
@@ -19,13 +19,35 @@
return sum([len(d[k]) for k in d])
+def get_next_item(arr, index):
+ '''Perpetually get an item from a list.
+
+ Returns the next item on the list by advancing the index by 1. If the index
+ is already the last item on the list, it loops back to the start, thus
+ implementing a circular list.
+
+ arr is a subscriptable list.
+ index is the index of the last item returned.
+
+ Returns (item, index) if successful.
+ Raises IndexError if arr is empty.
+ '''
+ index += 1
+ try:
+ item = arr[index]
+ except IndexError:
+ index = 0
+ try:
+ item = arr[index]
+ except IndexError:
+ raise IndexError("List is empty!")
+
+ return item, index
+
+
class Scheduler:
'''An object that runs one or more Deploy items'''
-
- # Max jobs running at one time
- max_parallel = 16
-
- def __init__(self, items):
+ def __init__(self, items, launcher_cls):
self.items = items
# 'scheduled[target][cfg]' is a list of Deploy objects for the chosen
@@ -48,19 +70,29 @@
# _queued is a list so that we dispatch things in order (relevant
# for things like tests where we have ordered things cleverly to
# try to see failures early). They are maintained for each target.
+
+ # The list of available targets and the list of running items in each
+ # target are polled in a circular fashion, looping back to the start.
+ # This is done to allow us to poll a smaller subset of jobs rather than
+ # the entire regression. We keep rotating through our list of running
+ # items, picking up where we left off on the last poll.
+ self._targets = list(self._scheduled.keys())
self._queued = {}
self._running = {}
self._passed = {}
self._failed = {}
self._killed = {}
self._total = {}
+ self.last_target_polled_idx = -1
+ self.last_item_polled_idx = {}
for target in self._scheduled:
self._queued[target] = []
- self._running[target] = set()
+ self._running[target] = []
self._passed[target] = set()
self._failed[target] = set()
self._killed[target] = set()
self._total[target] = sum_dict_lists(self._scheduled[target])
+ self.last_item_polled_idx[target] = -1
# Stuff for printing the status.
width = len(str(self._total[target]))
@@ -76,6 +108,10 @@
# per-target.
self.item_to_status = {}
+ # The chosen launcher class. This allows us to access launcher
+ # variant-specific settings such as max parallel jobs & poll rate.
+ self.launcher_cls = launcher_cls
+
def run(self):
'''Run all scheduled jobs and return the results.
@@ -122,7 +158,8 @@
# polling loop. But we do it with a bounded wait on stop_now so
# that we jump back to the polling loop immediately on a
# signal.
- stop_now.wait(timeout=1)
+ stop_now.wait(timeout=self.launcher_cls.poll_freq)
+
finally:
signal(SIGINT, old_handler)
@@ -303,49 +340,54 @@
'''
changed = False
- for target in self._scheduled:
- to_pass = []
- to_fail = []
- for item in self._running[target]:
+ max_poll = min(self.launcher_cls.max_poll,
+ sum_dict_lists(self._running))
+
+ while max_poll:
+ target, self.last_target_polled_idx = get_next_item(
+ self._targets, self.last_target_polled_idx)
+
+ while self._running[target] and max_poll:
+ max_poll -= 1
+ item, self.last_item_polled_idx[target] = get_next_item(
+ self._running[target], self.last_item_polled_idx[target])
status = item.launcher.poll()
- assert status in ['D', 'P', 'F']
+ level = VERBOSE
+
+ assert status in ['D', 'P', 'F', 'K']
if status == 'D':
- # Still running
continue
elif status == 'P':
- log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]", hms,
- target, item.full_name)
- to_pass.append(item)
+ self._passed[target].add(item)
+ elif status == 'F':
+ self._failed[target].add(item)
+ level = log.ERROR
else:
- log.error("[%s]: [%s]: [status] [%s: F]", hms, target,
- item.full_name)
- to_fail.append(item)
+ self._killed[target].add(item)
+ level = log.ERROR
- for item in to_pass:
- self._passed[target].add(item)
- self._running[target].remove(item)
- self.item_to_status[item] = 'P'
- self._enqueue_successors(item)
+ self._running[target].pop(self.last_item_polled_idx[target])
+ self.last_item_polled_idx[target] -= 1
+ self.item_to_status[item] = status
+ log.log(level, "[%s]: [%s]: [status] [%s: %s]", hms, target,
+ item.full_name, status)
- for item in to_fail:
- self._failed[target].add(item)
- self._running[target].remove(item)
- self.item_to_status[item] = 'F'
+ # Enqueue item's successors regardless of its status.
+ #
# It may be possible that a failed item's successor may not
# need all of its dependents to pass (if it has other dependent
# jobs). Hence we enqueue all successors rather than canceling
- # them right here. We leave it to `_dispatch()` to figure out
+ # them right here. We leave it to _dispatch() to figure out
# whether an enqueued item can be run or not.
self._enqueue_successors(item)
-
- changed = changed or to_pass or to_fail
+ changed = True
return changed
def _dispatch(self, hms):
'''Dispatch some queued items if possible.'''
- slots = Scheduler.max_parallel - sum_dict_lists(self._running)
+ slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running)
if slots <= 0:
return
@@ -404,7 +446,7 @@
", ".join(item.full_name for item in to_dispatch))
for item in to_dispatch:
- self._running[target].add(item)
+ self._running[target].append(item)
self.item_to_status[item] = 'D'
try:
item.launcher.launch()
@@ -423,7 +465,7 @@
# Kill any running items. Again, take a copy of the set to avoid
# modifying it while iterating over it.
- for target in self._queued:
+ for target in self._running:
for item in [item for item in self._running[target]]:
self._kill_item(item)
diff --git a/util/dvsim/dvsim.py b/util/dvsim/dvsim.py
index add4d21..3a6f5bf 100755
--- a/util/dvsim/dvsim.py
+++ b/util/dvsim/dvsim.py
@@ -31,9 +31,9 @@
import Launcher
import LauncherFactory
+import LocalLauncher
from CfgFactory import make_cfg
from Deploy import RunTest
-from Scheduler import Scheduler
from Timer import Timer
from utils import (TS_FORMAT, TS_FORMAT_LONG, VERBOSE, rm_path,
run_cmd_with_timeout)
@@ -327,7 +327,8 @@
help=('Run only up to N builds/tests at a time. '
'Default value 16, unless the DVSIM_MAX_PARALLEL '
'environment variable is set, in which case that '
- 'is used.'))
+ 'is used. Only applicable when launching jobs '
+ 'locally.'))
pathg = parser.add_argument_group('File management')
@@ -645,7 +646,7 @@
# Register the common deploy settings.
Timer.print_interval = args.print_interval
- Scheduler.max_parallel = args.max_parallel
+ LocalLauncher.LocalLauncher.max_parallel = args.max_parallel
Launcher.Launcher.max_odirs = args.max_odirs
LauncherFactory.set_launcher_type(args.local)