[dvsim] Scheduler updates - max_parallel, max_poll

The max_parallel setting previously was a Scheduler class variable. It
has now been moved to the Launcher class, since each launcher variant
has different needs.

Likewise, 2 new features are added - `max_poll` and `poll_freq`. By
default, the max_poll 10K, and the polling frequency is 1 second.
The purpose of adding these knobs is to reduce the polling rate and the
number of jobs polled at a given time. With external launchers such as
LSF and cloud, polling for a job completion could involve multiple steps
and some external command invocations that are time-consuming. These
features help limit the time taken by the Scheduler to poll the entire
set of jobs. If we do not provide a way to fine-tune / adjust these
parameters, DVSim will appear to have hung (or worse, it could crash the
system).

The max_poll needs to cycle through the list of running jobs, rather
than just the first N ones. For example, if there are 5 jobs running [a,
b, c, d, e], and max_poll is set to 2, on the first poll window, it
polls for jobs a and b to complete. Then it polls b and c, followed by e
and a and so on. To allow this to happen, the list of running jobs now
needs to be maintained as a circular linked list, that supports
iterating through the list, while allowing jobs to be added or removed
from the list in realtime. This is achieved in the newly added
`CircularList` class.

Signed-off-by: Srikrishna Iyer <sriyer@google.com>
diff --git a/util/dvsim/FlowCfg.py b/util/dvsim/FlowCfg.py
index 49443cc..498fa62 100644
--- a/util/dvsim/FlowCfg.py
+++ b/util/dvsim/FlowCfg.py
@@ -12,6 +12,7 @@
 
 import hjson
 from CfgJson import set_target_attribute
+from LauncherFactory import get_launcher_cls
 from Scheduler import Scheduler
 from utils import (VERBOSE, find_and_substitute_wildcards, md_results_to_html,
                    rm_path, subst_wildcards)
@@ -371,7 +372,7 @@
             log.fatal("Nothing to run!")
             sys.exit(1)
 
-        return Scheduler(deploy).run()
+        return Scheduler(deploy, get_launcher_cls()).run()
 
     def _gen_results(self, results):
         '''
diff --git a/util/dvsim/Launcher.py b/util/dvsim/Launcher.py
index ebd7683..5580ebe 100644
--- a/util/dvsim/Launcher.py
+++ b/util/dvsim/Launcher.py
@@ -6,6 +6,7 @@
 import logging as log
 import os
 import re
+import sys
 from pathlib import Path
 
 from utils import VERBOSE, clean_odirs, rm_path
@@ -16,10 +17,11 @@
         self.msg = msg
 
 
-class ErrorMessage(collections.namedtuple(
-    'ErrorMessage',
-    ['line_number', 'message', 'context'],
-)):
+class ErrorMessage(
+        collections.namedtuple(
+            'ErrorMessage',
+            ['line_number', 'message', 'context'],
+        )):
     """Contains error-related information.
 
     This support classification of failures into buckets. The message field
@@ -42,6 +44,15 @@
     # Type of launcher used as string.
     variant = None
 
+    # Max jobs running at one time
+    max_parallel = sys.maxsize
+
+    # Max jobs polled at one time
+    max_poll = 10000
+
+    # Poll job's completion status every this many seconds
+    poll_freq = 1
+
     # Points to the python virtual env area.
     pyvenv = None
 
@@ -217,7 +228,8 @@
 
         Returns (status, err_msg) extracted from the log, where the status is
         "P" if the it passed, "F" otherwise. This is invoked by poll() just
-        after the job finishes.
+        after the job finishes. err_msg is an instance of the named tuple
+        ErrorMessage.
         """
         def _find_patterns(patterns, line):
             """Helper function that returns the pattern if any of the given
@@ -242,7 +254,9 @@
         chk_passed = bool(pass_patterns) and (self.exit_code == 0)
 
         try:
-            with open(self.deploy.get_log_path(), "r", encoding="UTF-8",
+            with open(self.deploy.get_log_path(),
+                      "r",
+                      encoding="UTF-8",
                       errors="surrogateescape") as f:
                 lines = f.readlines()
         except OSError as e:
@@ -288,6 +302,9 @@
         """Do post-completion activities, such as preparing the results.
 
         Must be invoked by poll(), after the job outcome is determined.
+
+        status is the status of the job, either 'P', 'F' or 'K'.
+        err_msg is an instance of the named tuple ErrorMessage.
         """
 
         assert status in ['P', 'F', 'K']
@@ -296,14 +313,6 @@
         self.deploy.post_finish(status)
         log.debug("Item %s has completed execution: %s", self, status)
         if status != "P":
-            self._log_fail_msg(err_msg)
-
-    def _log_fail_msg(self, msg):
-        """Logs the fail msg for the final report.
-
-        Invoked in _post_finish() only if the job did not pass.
-        """
-
-        assert msg and isinstance(msg, ErrorMessage)
-        self.fail_msg = msg
-        log.log(VERBOSE, msg)
+            assert err_msg and isinstance(err_msg, ErrorMessage)
+            self.fail_msg = err_msg
+            log.log(VERBOSE, err_msg.message)
diff --git a/util/dvsim/LauncherFactory.py b/util/dvsim/LauncherFactory.py
index fdfaaeb..0e7479b 100644
--- a/util/dvsim/LauncherFactory.py
+++ b/util/dvsim/LauncherFactory.py
@@ -4,12 +4,10 @@
 
 import logging as log
 import os
-import sys
 
 from Launcher import Launcher
 from LocalLauncher import LocalLauncher
 from LsfLauncher import LsfLauncher
-from Scheduler import Scheduler
 
 try:
     from EdaCloudLauncher import EdaCloudLauncher
@@ -18,7 +16,7 @@
     EDACLOUD_LAUNCHER_EXISTS = False
 
 # The chosen launcher class.
-launcher_cls = None
+_LAUNCHER_CLS = None
 
 
 def set_launcher_type(is_local=False):
@@ -36,29 +34,29 @@
         launcher = "local"
     Launcher.variant = launcher
 
-    global launcher_cls
+    global _LAUNCHER_CLS
     if launcher == "local":
-        launcher_cls = LocalLauncher
+        _LAUNCHER_CLS = LocalLauncher
 
     elif launcher == "lsf":
-        launcher_cls = LsfLauncher
-
-        # The max_parallel setting is not relevant when dispatching with LSF.
-        Scheduler.max_parallel = sys.maxsize
+        _LAUNCHER_CLS = LsfLauncher
 
     # These custom launchers are site specific. They may not be committed to
     # the open source repo.
     elif launcher == "edacloud" and EDACLOUD_LAUNCHER_EXISTS:
-        launcher_cls = EdaCloudLauncher
-
-        # The max_parallel setting is not relevant when dispatching with
-        # EDACloud.
-        Scheduler.max_parallel = sys.maxsize
+        _LAUNCHER_CLS = EdaCloudLauncher
 
     else:
         log.error("Launcher {} set using DVSIM_LAUNCHER env var does not "
                   "exist. Using local launcher instead.".format(launcher))
-        launcher_cls = LocalLauncher
+        _LAUNCHER_CLS = LocalLauncher
+
+
+def get_launcher_cls():
+    '''Returns the chosen launcher class.'''
+
+    assert _LAUNCHER_CLS is not None
+    return _LAUNCHER_CLS
 
 
 def get_launcher(deploy):
@@ -67,6 +65,5 @@
     'deploy' is an instance of the deploy class to with the launcher is paired.
     '''
 
-    global launcher_cls
-    assert launcher_cls is not None
-    return launcher_cls(deploy)
+    assert _LAUNCHER_CLS is not None
+    return _LAUNCHER_CLS(deploy)
diff --git a/util/dvsim/Scheduler.py b/util/dvsim/Scheduler.py
index f7605bd..eb900ff 100644
--- a/util/dvsim/Scheduler.py
+++ b/util/dvsim/Scheduler.py
@@ -19,13 +19,35 @@
     return sum([len(d[k]) for k in d])
 
 
+def get_next_item(arr, index):
+    '''Perpetually get an item from a list.
+
+    Returns the next item on the list by advancing the index by 1. If the index
+    is already the last item on the list, it loops back to the start, thus
+    implementing a circular list.
+
+    arr is a subscriptable list.
+    index is the index of the last item returned.
+
+    Returns (item, index) if successful.
+    Raises IndexError if arr is empty.
+    '''
+    index += 1
+    try:
+        item = arr[index]
+    except IndexError:
+        index = 0
+        try:
+            item = arr[index]
+        except IndexError:
+            raise IndexError("List is empty!")
+
+    return item, index
+
+
 class Scheduler:
     '''An object that runs one or more Deploy items'''
-
-    # Max jobs running at one time
-    max_parallel = 16
-
-    def __init__(self, items):
+    def __init__(self, items, launcher_cls):
         self.items = items
 
         # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen
@@ -48,19 +70,29 @@
         # _queued is a list so that we dispatch things in order (relevant
         # for things like tests where we have ordered things cleverly to
         # try to see failures early). They are maintained for each target.
+
+        # The list of available targets and the list of running items in each
+        # target are polled in a circular fashion, looping back to the start.
+        # This is done to allow us to poll a smaller subset of jobs rather than
+        # the entire regression. We keep rotating through our list of running
+        # items, picking up where we left off on the last poll.
+        self._targets = list(self._scheduled.keys())
         self._queued = {}
         self._running = {}
         self._passed = {}
         self._failed = {}
         self._killed = {}
         self._total = {}
+        self.last_target_polled_idx = -1
+        self.last_item_polled_idx = {}
         for target in self._scheduled:
             self._queued[target] = []
-            self._running[target] = set()
+            self._running[target] = []
             self._passed[target] = set()
             self._failed[target] = set()
             self._killed[target] = set()
             self._total[target] = sum_dict_lists(self._scheduled[target])
+            self.last_item_polled_idx[target] = -1
 
             # Stuff for printing the status.
             width = len(str(self._total[target]))
@@ -76,6 +108,10 @@
         # per-target.
         self.item_to_status = {}
 
+        # The chosen launcher class. This allows us to access launcher
+        # variant-specific settings such as max parallel jobs & poll rate.
+        self.launcher_cls = launcher_cls
+
     def run(self):
         '''Run all scheduled jobs and return the results.
 
@@ -122,7 +158,8 @@
                 # polling loop. But we do it with a bounded wait on stop_now so
                 # that we jump back to the polling loop immediately on a
                 # signal.
-                stop_now.wait(timeout=1)
+                stop_now.wait(timeout=self.launcher_cls.poll_freq)
+
         finally:
             signal(SIGINT, old_handler)
 
@@ -303,49 +340,54 @@
         '''
 
         changed = False
-        for target in self._scheduled:
-            to_pass = []
-            to_fail = []
-            for item in self._running[target]:
+        max_poll = min(self.launcher_cls.max_poll,
+                       sum_dict_lists(self._running))
+
+        while max_poll:
+            target, self.last_target_polled_idx = get_next_item(
+                self._targets, self.last_target_polled_idx)
+
+            while self._running[target] and max_poll:
+                max_poll -= 1
+                item, self.last_item_polled_idx[target] = get_next_item(
+                    self._running[target], self.last_item_polled_idx[target])
                 status = item.launcher.poll()
-                assert status in ['D', 'P', 'F']
+                level = VERBOSE
+
+                assert status in ['D', 'P', 'F', 'K']
                 if status == 'D':
-                    # Still running
                     continue
                 elif status == 'P':
-                    log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]", hms,
-                            target, item.full_name)
-                    to_pass.append(item)
+                    self._passed[target].add(item)
+                elif status == 'F':
+                    self._failed[target].add(item)
+                    level = log.ERROR
                 else:
-                    log.error("[%s]: [%s]: [status] [%s: F]", hms, target,
-                              item.full_name)
-                    to_fail.append(item)
+                    self._killed[target].add(item)
+                    level = log.ERROR
 
-            for item in to_pass:
-                self._passed[target].add(item)
-                self._running[target].remove(item)
-                self.item_to_status[item] = 'P'
-                self._enqueue_successors(item)
+                self._running[target].pop(self.last_item_polled_idx[target])
+                self.last_item_polled_idx[target] -= 1
+                self.item_to_status[item] = status
+                log.log(level, "[%s]: [%s]: [status] [%s: %s]", hms, target,
+                        item.full_name, status)
 
-            for item in to_fail:
-                self._failed[target].add(item)
-                self._running[target].remove(item)
-                self.item_to_status[item] = 'F'
+                # Enqueue item's successors regardless of its status.
+                #
                 # It may be possible that a failed item's successor may not
                 # need all of its dependents to pass (if it has other dependent
                 # jobs). Hence we enqueue all successors rather than canceling
-                # them right here. We leave it to `_dispatch()` to figure out
+                # them right here. We leave it to _dispatch() to figure out
                 # whether an enqueued item can be run or not.
                 self._enqueue_successors(item)
-
-            changed = changed or to_pass or to_fail
+                changed = True
 
         return changed
 
     def _dispatch(self, hms):
         '''Dispatch some queued items if possible.'''
 
-        slots = Scheduler.max_parallel - sum_dict_lists(self._running)
+        slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running)
         if slots <= 0:
             return
 
@@ -404,7 +446,7 @@
                     ", ".join(item.full_name for item in to_dispatch))
 
             for item in to_dispatch:
-                self._running[target].add(item)
+                self._running[target].append(item)
                 self.item_to_status[item] = 'D'
                 try:
                     item.launcher.launch()
@@ -423,7 +465,7 @@
 
         # Kill any running items. Again, take a copy of the set to avoid
         # modifying it while iterating over it.
-        for target in self._queued:
+        for target in self._running:
             for item in [item for item in self._running[target]]:
                 self._kill_item(item)
 
diff --git a/util/dvsim/dvsim.py b/util/dvsim/dvsim.py
index add4d21..3a6f5bf 100755
--- a/util/dvsim/dvsim.py
+++ b/util/dvsim/dvsim.py
@@ -31,9 +31,9 @@
 
 import Launcher
 import LauncherFactory
+import LocalLauncher
 from CfgFactory import make_cfg
 from Deploy import RunTest
-from Scheduler import Scheduler
 from Timer import Timer
 from utils import (TS_FORMAT, TS_FORMAT_LONG, VERBOSE, rm_path,
                    run_cmd_with_timeout)
@@ -327,7 +327,8 @@
                       help=('Run only up to N builds/tests at a time. '
                             'Default value 16, unless the DVSIM_MAX_PARALLEL '
                             'environment variable is set, in which case that '
-                            'is used.'))
+                            'is used. Only applicable when launching jobs '
+                            'locally.'))
 
     pathg = parser.add_argument_group('File management')
 
@@ -645,7 +646,7 @@
 
     # Register the common deploy settings.
     Timer.print_interval = args.print_interval
-    Scheduler.max_parallel = args.max_parallel
+    LocalLauncher.LocalLauncher.max_parallel = args.max_parallel
     Launcher.Launcher.max_odirs = args.max_odirs
     LauncherFactory.set_launcher_type(args.local)