[dvsim] Factor deploy method out of Deploy object
A Deploy object is "something that can be deployed". This is kind of
orthogonal from a scheduler, which is what the deploy() method was.
This patch pulls the scheduling logic into a separate class,
Scheduler. It also defines a class for status tracking, used for
tracking the status of jobs for a given target. This replaces an
ordered dictionary, but does the same job (hopefully with
easier-to-read code).
This change is still sort of half-and-half, in that the number of
running jobs is still tracked in a class variable in Deploy. That's
pretty unusual - you'd normally expect to see that count tracked in
the scheduler - but I thought I'd stop at this for now to keep the
patch manageable.
Signed-off-by: Rupert Swarbrick <rswarbrick@lowrisc.org>
diff --git a/util/dvsim/Deploy.py b/util/dvsim/Deploy.py
index 9ff96f3..1bf07e1 100644
--- a/util/dvsim/Deploy.py
+++ b/util/dvsim/Deploy.py
@@ -10,12 +10,9 @@
import shlex
import subprocess
import sys
-import time
-from collections import OrderedDict
from sim_utils import get_cov_summary_table
from tabulate import tabulate
-from timer import Timer
from utils import VERBOSE, find_and_substitute_wildcards, run_cmd
@@ -24,18 +21,11 @@
Abstraction for deploying builds and runs.
"""
- # Timer in hours, minutes and seconds.
- timer = Timer()
-
# Maintain a list of dispatched items.
dispatch_counter = 0
# Misc common deploy settings.
- print_legend = True
- max_parallel = 16
max_odirs = 5
- # Max jobs dispatched in one go.
- slot_limit = 20
# List of variable names that are to be treated as "list of commands".
# This tells `construct_cmd` that these vars are lists that need to
@@ -463,146 +453,6 @@
except Exception as e:
log.error("%s: Failed to run bkill\n", e)
- @staticmethod
- def deploy(items):
- dispatched_items = []
- queued_items = []
-
- # Print timer val in hh:mm:ss.
- def get_timer_val():
- return Deploy.timer.hms()
-
- def dispatch_items(items):
- item_names = OrderedDict()
- for item in items:
- if item.target not in item_names.keys():
- item_names[item.target] = ""
- if item.status is None:
- item_names[item.target] += item.identifier + ", "
- item.dispatch_cmd()
-
- for target in item_names.keys():
- if item_names[target] != "":
- item_names[target] = " [" + item_names[target][:-2] + "]"
- log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s",
- get_timer_val(), target, item_names[target])
-
- # Initialize status for a target, add '_stats_' for the said target
- # and initialize counters for queued, dispatched, passed, failed,
- # killed and total to 0. Also adds a boolean key to indicate if all
- # items in a given target are done.
- def init_status_target_stats(status, target):
- status[target] = OrderedDict()
- status[target]['_stats_'] = OrderedDict()
- status[target]['_stats_']['Q'] = 0
- status[target]['_stats_']['D'] = 0
- status[target]['_stats_']['P'] = 0
- status[target]['_stats_']['F'] = 0
- status[target]['_stats_']['K'] = 0
- status[target]['_stats_']['T'] = 0
- status[target]['_done_'] = False
-
- # Update status counter for a newly queued item.
- def add_status_target_queued(status, item):
- if item.target not in status.keys():
- init_status_target_stats(status, item.target)
- status[item.target][item] = "Q"
- status[item.target]['_stats_']['Q'] += 1
- status[item.target]['_stats_']['T'] += 1
-
- # Update status counters for a target.
- def update_status_target_stats(status, item):
- old_status = status[item.target][item]
- status[item.target]['_stats_'][old_status] -= 1
- status[item.target]['_stats_'][item.status] += 1
- status[item.target][item] = item.status
-
- def check_if_done_and_print_status(status, print_status_flag):
- all_done = True
- for target in status.keys():
- target_done_prev = status[target]['_done_']
- target_done_curr = ((status[target]['_stats_']["Q"] == 0) and
- (status[target]['_stats_']["D"] == 0))
- status[target]['_done_'] = target_done_curr
- all_done &= target_done_curr
-
- # Print if flag is set and target_done is not True for two
- # consecutive times.
- if not (target_done_prev and
- target_done_curr) and print_status_flag:
- stats = status[target]['_stats_']
- width = "0{}d".format(len(str(stats["T"])))
- msg = "["
- for s in stats.keys():
- msg += s + ": {:{}}, ".format(stats[s], width)
- msg = msg[:-2] + "]"
- log.info("[%s]: [%s]: %s", get_timer_val(), target, msg)
- return all_done
-
- # Print legend once at the start of the run.
- if Deploy.print_legend:
- log.info("[legend]: [Q: queued, D: dispatched, "
- "P: passed, F: failed, K: killed, T: total]")
- Deploy.print_legend = False
-
- status = OrderedDict()
- print_status_flag = True
-
- # Queue all items
- queued_items = items
- for item in queued_items:
- add_status_target_queued(status, item)
-
- all_done = False
- while not all_done:
- # Get status of dispatched items.
- for item in dispatched_items:
- if item.status == "D":
- item.get_status()
- if item.status != status[item.target][item]:
- print_status_flag = True
- if item.status != "D":
- if item.status != "P":
- # Kill its sub items if item did not pass.
- item.set_sub_status("K")
- log.error("[%s]: [%s]: [status] [%s: %s]",
- get_timer_val(), item.target,
- item.identifier, item.status)
- else:
- log.log(VERBOSE, "[%s]: [%s]: [status] [%s: %s]",
- get_timer_val(), item.target,
- item.identifier, item.status)
- # Queue items' sub-items if it is done.
- queued_items.extend(item.sub)
- for sub_item in item.sub:
- add_status_target_queued(status, sub_item)
- update_status_target_stats(status, item)
-
- # Dispatch items from the queue as slots free up.
- all_done = (len(queued_items) == 0)
- if not all_done:
- num_slots = Deploy.max_parallel - Deploy.dispatch_counter
- if num_slots > Deploy.slot_limit:
- num_slots = Deploy.slot_limit
- if num_slots > 0:
- if len(queued_items) > num_slots:
- dispatch_items(queued_items[0:num_slots])
- dispatched_items.extend(queued_items[0:num_slots])
- queued_items = queued_items[num_slots:]
- else:
- dispatch_items(queued_items)
- dispatched_items.extend(queued_items)
- queued_items = []
-
- # Check if we are done and print the status periodically.
- all_done &= check_if_done_and_print_status(status,
- print_status_flag)
-
- # Advance time by 1s if there is more work to do.
- if not all_done:
- time.sleep(1)
- print_status_flag = Deploy.timer.check_time()
-
class CompileSim(Deploy):
"""
diff --git a/util/dvsim/FlowCfg.py b/util/dvsim/FlowCfg.py
index b23da23..d02d3f4 100644
--- a/util/dvsim/FlowCfg.py
+++ b/util/dvsim/FlowCfg.py
@@ -13,7 +13,7 @@
import hjson
from CfgJson import set_target_attribute
-from Deploy import Deploy
+import Scheduler
from utils import (VERBOSE, md_results_to_html,
subst_wildcards, find_and_substitute_wildcards)
@@ -381,7 +381,7 @@
def deploy_objects(self):
'''Public facing API for deploying all available objects.'''
- Deploy.deploy(self.deploy)
+ Scheduler.run(self.deploy)
def _gen_results(self, fmt="md"):
'''
diff --git a/util/dvsim/Scheduler.py b/util/dvsim/Scheduler.py
new file mode 100644
index 0000000..ccbe889
--- /dev/null
+++ b/util/dvsim/Scheduler.py
@@ -0,0 +1,196 @@
+# Copyright lowRISC contributors.
+# Licensed under the Apache License, Version 2.0, see LICENSE for details.
+# SPDX-License-Identifier: Apache-2.0
+
+
+from collections import OrderedDict
+import logging as log
+import time
+
+from utils import VERBOSE
+from Deploy import Deploy
+from Timer import Timer
+
+
+class TargetStatus:
+ '''An object to track the status of a run for a given target'''
+ def __init__(self):
+ self.counters = OrderedDict()
+ self.counters['Q'] = 0
+ self.counters['D'] = 0
+ self.counters['P'] = 0
+ self.counters['F'] = 0
+ self.counters['K'] = 0
+ self.counters['T'] = 0
+
+ self.done = True
+
+ self.by_item = OrderedDict()
+
+ def add_item(self, item):
+ self.by_item[item] = 'Q'
+ self.counters['T'] += 1
+ self.counters['Q'] += 1
+ self.done = False
+
+ def update_item(self, item):
+ '''Update the tracked status of the item. Return true on change.'''
+ old_status = self.by_item[item]
+ if item.status == old_status:
+ return False
+
+ self.by_item[item] = item.status
+
+ self.counters[old_status] -= 1
+ self.counters[item.status] += 1
+ return True
+
+ def check_if_done(self):
+ '''Update done flag to match counters. Returns done flag.'''
+ self.done = (self.counters['Q'] == 0) and (self.counters['D'] == 0)
+ return self.done
+
+
+class Scheduler:
+ '''An object to run one or more Deploy items'''
+ print_legend = True
+
+ # Max jobs running at one time
+ max_parallel = 16
+
+ # Max jobs dispatched in one go.
+ slot_limit = 20
+
+ def __init__(self, items):
+ self.timer = Timer()
+ self.queued_items = []
+ self.dispatched_items = []
+
+ # An ordered dictionary keyed by target ('build', 'run' or similar).
+ # The value for each target is a TargetStatus object.
+ self.status = OrderedDict()
+
+ for item in items:
+ self.add_item(item)
+
+ def add_item(self, item):
+ '''Add a queued item'''
+ self.queued_items.append(item)
+
+ # Like setdefault, but doesn't construct a TargetStatus object
+ # unnecessarily.
+ tgt_status = self.status.get(item.target)
+ if tgt_status is None:
+ tgt_status = TargetStatus()
+ self.status[item.target] = tgt_status
+
+ tgt_status.add_item(item)
+
+ def update_status(self):
+ '''Update status of dispatched items. Returns true on a change'''
+ status_changed = False
+ hms = self.timer.hms()
+
+ # Get status of dispatched items
+ for item in self.dispatched_items:
+ if item.status == 'D':
+ item.get_status()
+
+ tgt_status = self.status[item.target]
+ if not tgt_status.update_item(item):
+ continue
+
+ status_changed = True
+ if item.status == "D":
+ continue
+
+ if item.status != "P":
+ # Kill its sub items if item did not pass.
+ item.set_sub_status("K")
+ log.error("[%s]: [%s]: [status] [%s: %s]",
+ hms, item.target, item.identifier, item.status)
+ else:
+ log.log(VERBOSE, "[%s]: [%s]: [status] [%s: %s]",
+ hms, item.target, item.identifier, item.status)
+
+ # Queue items' sub-items if it is done.
+ for sub_item in item.sub:
+ self.add_item(sub_item)
+
+ return status_changed
+
+ def dispatch(self):
+ '''Dispatch some queued items if possible'''
+ num_slots = min(Scheduler.slot_limit,
+ Scheduler.max_parallel - Deploy.dispatch_counter,
+ len(self.queued_items))
+ if not num_slots:
+ return
+
+ items = self.queued_items[0:num_slots]
+ self.queued_items = self.queued_items[num_slots:]
+ self.dispatched_items.extend(items)
+
+ tgt_names = OrderedDict()
+ for item in items:
+ if item.status is None:
+ tgt_names.setdefault(item.target, []).append(item.identifier)
+ item.dispatch_cmd()
+
+ hms = self.timer.hms()
+ for target, names in tgt_names.items():
+ log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s",
+ hms, target, ", ".join(names))
+
+ def check_if_done(self, print_status):
+ '''Update the "done" flag for each TargetStatus object
+
+ If print_status or we've reached a time interval then print current
+ status for those that weren't known to be finished already.
+
+ '''
+ if self.timer.check_time():
+ print_status = True
+
+ hms = self.timer.hms()
+
+ all_done = True
+ for target, tgt_status in self.status.items():
+ was_done = tgt_status.done
+ tgt_status.check_if_done()
+ is_done = tgt_status.done
+
+ all_done &= is_done
+
+ if print_status and not (was_done and is_done):
+ stats = tgt_status.counters
+ width = "0{}d".format(len(str(stats["T"])))
+ msg = "["
+ for s in stats.keys():
+ msg += s + ": {:{}}, ".format(stats[s], width)
+ msg = msg[:-2] + "]"
+ log.info("[%s]: [%s]: %s", hms, target, msg)
+ return all_done
+
+ def run(self):
+ '''Run all items'''
+
+ # Print the legend just once (at the start of the first run)
+ if Scheduler.print_legend:
+ log.info("[legend]: [Q: queued, D: dispatched, "
+ "P: passed, F: failed, K: killed, T: total]")
+ Scheduler.print_legend = False
+
+ first_time = True
+ while True:
+ changed = self.update_status()
+ self.dispatch()
+ if self.check_if_done(changed or first_time):
+ break
+ first_time = False
+ time.sleep(1)
+
+
+def run(items):
+ '''Run the given items'''
+ Scheduler(items).run()
diff --git a/util/dvsim/SimCfg.py b/util/dvsim/SimCfg.py
index bad908d..2771fba 100644
--- a/util/dvsim/SimCfg.py
+++ b/util/dvsim/SimCfg.py
@@ -12,8 +12,8 @@
import sys
from collections import OrderedDict
-from Deploy import (CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr,
- Deploy, RunTest)
+from Deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, RunTest
+import Scheduler
from FlowCfg import FlowCfg
from Modes import BuildModes, Modes, Regressions, RunModes, Tests
from tabulate import tabulate
@@ -533,7 +533,7 @@
# If coverage is enabled, then deploy the coverage tasks.
if self.cov:
- Deploy.deploy(self.cov_deploys)
+ Scheduler.run(self.cov_deploys)
def _cov_analyze(self):
'''Use the last regression coverage data to open up the GUI tool to
diff --git a/util/dvsim/dvsim.py b/util/dvsim/dvsim.py
index dda3ee5..869f23b 100755
--- a/util/dvsim/dvsim.py
+++ b/util/dvsim/dvsim.py
@@ -31,7 +31,8 @@
from signal import SIGINT, signal
import Deploy
-import Timer
+from Scheduler import Scheduler
+from Timer import Timer
import utils
from CfgFactory import make_cfg
@@ -637,8 +638,8 @@
Deploy.RunTest.fixed_seed = args.fixed_seed
# Register the common deploy settings.
- Timer.Timer.print_interval = args.print_interval
- Deploy.Deploy.max_parallel = args.max_parallel
+ Timer.print_interval = args.print_interval
+ Scheduler.max_parallel = args.max_parallel
Deploy.Deploy.max_odirs = args.max_odirs
# Build infrastructure from hjson file and create the list of items to