[dvsim] Schedule jobs by dependency

The current Scheduler implementation splits jobs by targets - builds,
runs, cov etc. All jobs added to each target are scheduled ONLY after
all jobs in the previous target are complete. What that means for
example is, AES tests cannot run immediately after AES simulation
executable is built - they have to wait until ALL other builds are
complete. This results in the max parallelizable queue to starve, making
the process slightly inefficient.

The changes in the PR reverts back to the original approach, where jobs
are enqueued in the dependency order independently for each DUT
(HJson config). The dispatch queue size is still limited to the
`max_parallel` setting, but at least it is no longer starved. The
benefit of this may not be visible if max_parallel setting is << number
of jobs in a given target. At Google, we set `max_parallel` to 200,
which far exceeds the number of builds (~25). So regressions now finish
a lot quicker (~40% speedup).

Most of the changes revolve around segregating the items and maintaining
target specific status queues (since jobs in several targets my be running
in a given time). Apart from that, I moved the code around a bit to make
things more prominently visible. Other updates are as follows:

- TargetScheduler is removed
- Helper methods added to find a finished item's successors
- A failed item recursively cancels its successors (with exceptions)
- `_cancel_item()` recursively cancels all successors of a canceled item
- Likewise for `_kill_item()`
- `kill()` recursively kills / cancels item and all successors
- On SIGINT (with above bullet), `Scheduler::run()` returns the final
  results back rather than exiting immediately - this allows the full
  result table to be printed - which might be useful info to look at.
- Linter fixes

Signed-off-by: Srikrishna Iyer <sriyer@google.com>
diff --git a/util/dvsim/Scheduler.py b/util/dvsim/Scheduler.py
index 5c4ab64..17b400c 100644
--- a/util/dvsim/Scheduler.py
+++ b/util/dvsim/Scheduler.py
@@ -4,7 +4,6 @@
 
 import logging as log
 import threading
-from collections import OrderedDict
 from signal import SIGINT, signal
 
 from Launcher import LauncherError
@@ -12,188 +11,58 @@
 from utils import VERBOSE
 
 
-class TargetScheduler:
-    '''A scheduler for the jobs of a given target'''
-    def __init__(self, name):
-        self.name = name
+class Scheduler:
+    '''An object that runs one or more Deploy items'''
 
-        # Sets of items, split up by their current state. The sets are disjoint
-        # and their union equals the keys of self.item_to_status. _queued is a
-        # list so that we dispatch things in order (relevant for things like
-        # tests where we have ordered things cleverly to try to see failures
-        # early)
-        self._queued = []
-        self._running = set()
-        self._passed = set()
-        self._failed = set()
-        self._killed = set()
+    # Max jobs running at one time
+    max_parallel = 16
 
-        # A map from the Deploy objects tracked by this class to their current
-        # status. This status is 'Q', 'D', 'P', 'F' or 'K', corresponding to
-        # membership in the dicts above.
+    def __init__(self, items):
+        # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen
+        # target and cfg. As items in _scheduled are ready to be run (once
+        # their dependencies pass), they are moved to the _queued list, where
+        # they wait until slots are available for them to be dispatched.
+        # When all items (in all cfgs) of a target are done, it is removed from
+        # this dictionary.
+        self._scheduled = {}
+        for item in items:
+            self.add_to_scheduled(item)
+
+        # Sets of items, split up by their current state. The sets are
+        # disjoint and their union equals the keys of self.item_to_status.
+        # _queued is a list so that we dispatch things in order (relevant
+        # for things like tests where we have ordered things cleverly to
+        # try to see failures early). They are maintained for each target.
+        self._queued = {}
+        self._running = {}
+        self._passed = {}
+        self._failed = {}
+        self._killed = {}
+        for target in self._scheduled:
+            self._queued[target] = []
+            self._running[target] = set()
+            self._passed[target] = set()
+            self._failed[target] = set()
+            self._killed[target] = set()
+
+        # A map from the Deploy objects tracked by this class to their
+        # current status. This status is 'Q', 'D', 'P', 'F' or 'K',
+        # corresponding to membership in the dicts above. This is not
+        # per-target.
         self.item_to_status = {}
 
-    def add_item(self, item):
-        assert item not in self.item_to_status
-        assert item not in self._queued
-        self.item_to_status[item] = 'Q'
-        self._queued.append(item)
+    def run(self):
+        '''Run all scheduled jobs and return the results.
 
-    def _kill_item(self, item):
-        '''Kill a running item'''
-        self._running.remove(item)
-        item.launcher.kill()
-        self._killed.add(item)
-        self.item_to_status[item] = 'K'
-
-    def _poll(self, hms):
-        '''Check for running items that have finished.
-
-        Returns True if something changed.
+        Returns the results (status) of all items dispatched for all
+        targets and cfgs.
         '''
-        to_pass = []
-        to_fail = []
 
-        for item in self._running:
-            status = item.launcher.poll()
-            assert status in ['D', 'P', 'F']
-            if status == 'D':
-                # Still running
-                continue
-            elif status == 'P':
-                log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]", hms,
-                        item.target, item.full_name)
-                to_pass.append(item)
-            else:
-                log.error("[%s]: [%s]: [status] [%s: F]", hms, item.target,
-                          item.full_name)
-                to_fail.append(item)
+        timer = Timer()
 
-        for item in to_pass:
-            self._running.remove(item)
-            self._passed.add(item)
-            self.item_to_status[item] = 'P'
-        for item in to_fail:
-            self._running.remove(item)
-            self._failed.add(item)
-            self.item_to_status[item] = 'F'
+        log.info("[legend]: [Q: queued, D: dispatched, "
+                 "P: passed, F: failed, K: killed, T: total]")
 
-        return to_pass or to_fail
-
-    def _dispatch(self, hms, old_results):
-        '''Dispatch some queued items if possible.
-
-        See run() for the format of old_results.
-        '''
-        num_slots = min(Scheduler.slot_limit,
-                        Scheduler.max_parallel - len(self._running),
-                        len(self._queued))
-        if num_slots <= 0:
-            return
-
-        to_dispatch = []
-
-        while len(to_dispatch) < num_slots and self._queued:
-            next_item = self._queued.pop(0)
-            # Does next_item have any dependencies? Since we dispatch jobs by
-            # target, we can assume that each of those dependencies appears
-            # in old_results.
-            has_failed_dep = False
-            for dep in next_item.dependencies:
-                dep_status = old_results[dep]
-                assert dep_status in ['P', 'F', 'K']
-
-                if next_item.needs_all_dependencies_passing:
-                    if dep_status in ['F', 'K']:
-                        has_failed_dep = True
-                        break
-                else:
-                    # Set has_failed_dep default value to True only if the
-                    # next_item has dependencies, and next_item does not require
-                    # all dependencies to pass
-                    has_failed_dep = True
-                    if dep_status in ['P']:
-                        has_failed_dep = False
-                        break
-
-            # If has_failed_dep then at least one of the dependencies has been
-            # cancelled or has run and failed. Give up on this item too.
-            if has_failed_dep:
-                self._killed.add(next_item)
-                self.item_to_status[next_item] = 'K'
-                continue
-
-            to_dispatch.append(next_item)
-
-        if not to_dispatch:
-            return
-
-        log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, self.name,
-                ", ".join(item.full_name for item in to_dispatch))
-
-        for item in to_dispatch:
-            self._running.add(item)
-            self.item_to_status[item] = 'D'
-            try:
-                item.launcher.launch()
-            except LauncherError as err:
-                log.error('{}'.format(err))
-                self._kill_item(item)
-
-    def _kill(self):
-        '''Kill any running items and cancel any that are waiting'''
-
-        # Cancel any waiting items. We take a copy of self._queued to avoid
-        # iterating over the set as we modify it.
-        for item in [item for item in self._queued]:
-            self._cancel(item)
-
-        # Kill any running items. Again, take a copy of the set to avoid
-        # modifying it while iterating over it.
-        for item in [item for item in self._running]:
-            self._kill_item(item)
-
-    def _cancel(self, item):
-        '''Cancel an item that is currently queued'''
-        assert item in self._queued
-        self._queued.remove(item)
-        self._killed.add(item)
-        self.item_to_status[item] = 'K'
-
-    def _check_if_done(self, timer, hms, print_status):
-        '''Check whether we are finished.
-
-        If print_status or we've reached a time interval then print current
-        status for those jobs that weren't known to be finished already.
-        '''
-        if timer.check_time():
-            print_status = True
-
-        if print_status:
-            total_cnt = len(self.item_to_status)
-            width = len(str(total_cnt))
-
-            field_fmt = '{{:0{}d}}'.format(width)
-            msg_fmt = ('[Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}]'.
-                       format(field_fmt))
-            msg = msg_fmt.format(len(self._queued), len(self._running),
-                                 len(self._passed), len(self._failed),
-                                 len(self._killed), total_cnt)
-            log.info("[%s]: [%s]: %s", hms, self.name, msg)
-
-        return not (self._queued or self._running)
-
-    def run(self, timer, old_results):
-        '''Run the jobs for this target.
-
-        timer is a Timer that was started at the start of the Runner's run.
-
-        old_results is a dictionary mapping items (from previous targets) to
-        statuses. Every job that appears as a dependency will be in this list
-        (because it ran as part of a previous target).
-
-        Returns the results from this target (in the same format).
-        '''
         # Catch one SIGINT and tell the runner to quit. On a second, die.
         stop_now = threading.Event()
         old_handler = None
@@ -211,18 +80,19 @@
 
         old_handler = signal(SIGINT, on_sigint)
 
+        # Enqueue all items of the first target.
+        self._enqueue_successors(None)
+
         try:
             while True:
                 if stop_now.is_set():
-                    # We've had an interrupt. Kill any jobs that are running,
-                    # then exit.
+                    # We've had an interrupt. Kill any jobs that are running.
                     self._kill()
-                    exit(1)
 
                 hms = timer.hms()
-                changed = self._poll(hms)
-                self._dispatch(hms, old_results)
-                if self._check_if_done(timer, hms, changed):
+                changed = self._poll(hms) or timer.check_time()
+                self._dispatch(hms)
+                if self._check_if_done(hms, changed):
                     break
 
                 # This is essentially sleep(1) to wait a second between each
@@ -233,44 +103,349 @@
         finally:
             signal(SIGINT, old_handler)
 
-        # We got to the end without anything exploding. Return the results for our jobs.
+        # We got to the end without anything exploding. Return the results.
         return self.item_to_status
 
+    def add_to_scheduled(self, item):
+        '''Recursively add item and all of its dependencies to _scheduled.
 
-class Scheduler:
-    '''An object to run one or more Deploy items'''
-
-    # Max jobs running at one time
-    max_parallel = 16
-
-    # Max jobs dispatched in one go.
-    slot_limit = 20
-
-    def __init__(self, items):
-        # An ordered dictionary keyed by target ('build', 'run' or similar).
-        # The value for each target is a TargetScheduler object.
-        self.schedulers = OrderedDict()
-
-        for item in items:
-            # This works like setdefault, but doesn't construct a TargetScheduler
-            # object unnecessarily.
-            tgt_scheduler = self.schedulers.get(item.target)
-            if tgt_scheduler is None:
-                tgt_scheduler = TargetScheduler(item.target)
-                self.schedulers[item.target] = tgt_scheduler
-
-            tgt_scheduler.add_item(item)
-
-    def run(self):
-        '''Run all items
-
-        Returns a map from item to status.
+        'item' is a Deploy object.
         '''
-        timer = Timer()
 
-        log.info("[legend]: [Q: queued, D: dispatched, "
-                 "P: passed, F: failed, K: killed, T: total]")
-        results = {}
-        for scheduler in self.schedulers.values():
-            results.update(scheduler.run(timer, results))
-        return results
+        for dep in item.dependencies:
+            self.add_to_scheduled(dep)
+
+        target_dict = self._scheduled.setdefault(item.target, {})
+        cfg_list = target_dict.setdefault(item.sim_cfg, [])
+        if item not in cfg_list:
+            cfg_list.append(item)
+
+    def _remove_from_scheduled(self, item):
+        '''Removes the item from _scheduled[target][cfg] list.
+
+        When all items in _scheduled[target][cfg] are finally removed, the cfg
+        key is deleted.
+        '''
+        target_dict = self._scheduled[item.target]
+        cfg_list = target_dict.get(item.sim_cfg)
+        if cfg_list is not None:
+            try:
+                cfg_list.remove(item)
+            except ValueError:
+                pass
+            if not cfg_list:
+                del target_dict[item.sim_cfg]
+
+    def _get_next_target(self, curr_target):
+        '''Returns the target that succeeds the current one.
+
+        curr_target is the target of the job that just completed (example -
+        build). If it is None, then the first target in _scheduled is returned.
+        '''
+
+        if curr_target is None:
+            return next(iter(self._scheduled))
+
+        assert curr_target in self._scheduled
+        target_iterator = iter(self._scheduled)
+        target = next(target_iterator)
+
+        found = False
+        while not found:
+            if target == curr_target:
+                found = True
+            try:
+                target = next(target_iterator)
+            except StopIteration:
+                return None
+
+        return target
+
+    def _enqueue_successors(self, item=None):
+        '''Move an item's successors from _scheduled to _queued.
+
+        'item' is the recently run job that has completed. If None, then we
+        move all available items in all available cfgs in _scheduled's first
+        target. If 'item' is specified, then we find its successors and move
+        them to _queued.
+        '''
+
+        for next_item in self._get_successors(item):
+            assert next_item not in self.item_to_status
+            assert next_item not in self._queued[next_item.target]
+            self.item_to_status[next_item] = 'Q'
+            self._queued[next_item.target].append(next_item)
+            self._remove_from_scheduled(next_item)
+
+    def _cancel_successors(self, item):
+        '''Cancel an item's successors recursively by moving them from
+        _scheduled or _queued to _killed.'''
+
+        items = self._get_successors(item)
+        while items:
+            next_item = items.pop()
+            self._cancel_item(next_item, cancel_successors=False)
+            items.extend(self._get_successors(next_item))
+
+    def _get_successors(self, item=None):
+        '''Find immediate successors of an item.
+
+        'item' is a job that has completed. We choose the target that follows
+        the 'item''s current target and find the list of successors whose
+        dependency list contains 'item'. If 'item' is None, we pick successors
+        from all cfgs, else we pick successors only from the cfg to which the
+        item belongs.
+
+        Returns the list of item's successors, or an empty list if there are
+        none.
+        '''
+
+        if item is None:
+            target = self._get_next_target(None)
+            cfgs = set(self._scheduled[target])
+        else:
+            target = self._get_next_target(item.target)
+            cfgs = {item.sim_cfg}
+
+        if target is None:
+            return []
+
+        # Find item's successors that can be enqueued. We assume here that
+        # only the immediately succeeding target can be enqueued at this
+        # time.
+        successors = []
+        for cfg in cfgs:
+            for next_item in self._scheduled[target][cfg]:
+                if item is not None:
+                    # Something is terribly wrong if item exists but the
+                    # next_item's dependency list is empty.
+                    assert next_item.dependencies
+                    if item not in next_item.dependencies:
+                        continue
+
+                if self._ok_to_enqueue(next_item):
+                    successors.append(next_item)
+
+        return successors
+
+    def _ok_to_enqueue(self, item):
+        '''Returns true if ALL dependencies of item are complete.'''
+
+        for dep in item.dependencies:
+            # Has the dep even been enqueued?
+            if dep not in self.item_to_status:
+                return False
+
+            # Has the dep completed?
+            if self.item_to_status[dep] not in ["P", "F", "K"]:
+                return False
+
+        return True
+
+    def _ok_to_run(self, item):
+        '''Returns true if the required dependencies have passed.
+
+        The item's needs_all_dependencies_passing setting is used to figure
+        out whether we can run this item or not, based on its dependent jobs'
+        statuses.
+        '''
+        # 'item' can run only if its dependencies have passed (their results
+        # should already show up in the item to status map).
+        for dep in item.dependencies:
+            dep_status = self.item_to_status[dep]
+            assert dep_status in ['P', 'F', 'K']
+
+            if item.needs_all_dependencies_passing:
+                if dep_status in ['F', 'K']:
+                    return False
+            else:
+                if dep_status in ['P']:
+                    return True
+
+        return item.needs_all_dependencies_passing
+
+    def _poll(self, hms):
+        '''Check for running items that have finished
+
+        Returns True if something changed.
+
+        '''
+
+        changed = False
+
+        for target in self._scheduled:
+            to_pass = []
+            to_fail = []
+            for item in self._running[target]:
+                status = item.launcher.poll()
+                assert status in ['D', 'P', 'F']
+                if status == 'D':
+                    # Still running
+                    continue
+                elif status == 'P':
+                    log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]", hms,
+                            target, item.full_name)
+                    to_pass.append(item)
+                else:
+                    log.error("[%s]: [%s]: [status] [%s: F]", hms, target,
+                              item.full_name)
+                    to_fail.append(item)
+
+            for item in to_pass:
+                self._passed[target].add(item)
+                self._running[target].remove(item)
+                self.item_to_status[item] = 'P'
+                self._enqueue_successors(item)
+
+            for item in to_fail:
+                self._failed[target].add(item)
+                self._running[target].remove(item)
+                self.item_to_status[item] = 'F'
+                # It may be possible that a failed item's successor may not
+                # need all of its dependents to pass (if it has other dependent
+                # jobs). Hence we enqueue all successors rather than canceling
+                # them right here. We leave it to `_dispatch()` to figure out
+                # whether an enqueued item can be run or not.
+                self._enqueue_successors(item)
+
+            changed = changed or to_pass or to_fail
+
+        return changed
+
+    def _dispatch(self, hms):
+        '''Dispatch some queued items if possible.'''
+
+        # Compute sum of lengths of all lists in the given dict.
+        def __sum(d):
+            return sum([len(d[k]) for k in d])
+
+        num_slots = min(Scheduler.max_parallel - __sum(self._running),
+                        __sum(self._queued))
+        if num_slots <= 0:
+            return
+
+        for target in self._scheduled:
+            num_slots_per_target = min(
+                num_slots, Scheduler.max_parallel - len(self._running[target]))
+            if num_slots_per_target <= 0:
+                continue
+
+            to_dispatch = []
+            while self._queued[target] and num_slots_per_target > 0:
+                next_item = self._queued[target].pop(0)
+                if not self._ok_to_run(next_item):
+                    self._cancel_item(next_item, cancel_successors=False)
+                    self._enqueue_successors(next_item)
+                    continue
+
+                to_dispatch.append(next_item)
+                num_slots_per_target -= 1
+                num_slots -= 1
+
+            if not to_dispatch:
+                continue
+
+            log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, target,
+                    ", ".join(item.full_name for item in to_dispatch))
+
+            for item in to_dispatch:
+                self._running[target].add(item)
+                self.item_to_status[item] = 'D'
+                try:
+                    item.launcher.launch()
+                except LauncherError as err:
+                    log.error('{}'.format(err))
+                    self._kill_item(item)
+
+    def _kill(self):
+        '''Kill any running items and cancel any that are waiting'''
+
+        # Cancel any waiting items. We take a copy of self._queued to avoid
+        # iterating over the set as we modify it.
+        for target in self._queued:
+            for item in [item for item in self._queued[target]]:
+                self._cancel_item(item)
+
+        # Kill any running items. Again, take a copy of the set to avoid
+        # modifying it while iterating over it.
+        for target in self._queued:
+            for item in [item for item in self._running[target]]:
+                self._kill_item(item)
+
+    def _check_if_done(self, hms, changed):
+        '''Check whether we are finished.
+
+        If print_status or we've reached a time interval then print current
+        status for those jobs that weren't known to be finished already.
+
+        '''
+        # 'just_completed' is a 'precursor' to updating the _scheduled list.
+        # This is done in two separate for loops so that we can print the
+        # status of the targets that just completed one final time.
+        just_completed = {}
+        for target in self._scheduled:
+            # Target is done if scheduled, queued and running lists are empty.
+            just_completed[target] = not self._scheduled[target] and \
+                not self._queued[target] and not self._running[target]
+
+        changed = changed or any(just_completed.values())
+        if changed:
+            self._print_status(hms)
+
+        for target in just_completed:
+            if just_completed[target]:
+                self._scheduled.pop(target)
+
+        return all(just_completed.values())
+
+    def _print_status(self, hms):
+        '''Print the status of currently running jobs.'''
+
+        for target in self._scheduled:
+            total_cnt = sum([
+                len(self._queued[target]),
+                len(self._running[target]),
+                len(self._passed[target]),
+                len(self._failed[target]),
+                len(self._killed[target])
+            ])
+
+            if total_cnt > 0:
+                width = len(str(total_cnt))
+
+                field_fmt = '{{:0{}d}}'.format(width)
+                msg_fmt = ('[Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}]'.
+                           format(field_fmt))
+
+                msg = msg_fmt.format(len(self._queued[target]),
+                                     len(self._running[target]),
+                                     len(self._passed[target]),
+                                     len(self._failed[target]),
+                                     len(self._killed[target]), total_cnt)
+                log.info("[%s]: [%s]: %s", hms, target, msg)
+
+    def _cancel_item(self, item, cancel_successors=True):
+        '''Cancel an item and optionally all of its successors.
+
+        Supplied item may be in _scheduled list or the _queued list. From
+        either, we move it straight to _killed.
+        '''
+
+        self.item_to_status[item] = 'K'
+        self._killed[item.target].add(item)
+        if item in self._queued[item.target]:
+            self._queued[item.target].remove(item)
+        else:
+            self._remove_from_scheduled(item)
+
+        if cancel_successors:
+            self._cancel_successors(item)
+
+    def _kill_item(self, item):
+        '''Kill a running item and cancel all of its successors.'''
+
+        item.launcher.kill()
+        self.item_to_status[item] = 'K'
+        self._killed[item.target].add(item)
+        self._running[item.target].remove(item)
+        self._cancel_successors(item)