|  | # Copyright lowRISC contributors. | 
|  | # Licensed under the Apache License, Version 2.0, see LICENSE for details. | 
|  | # SPDX-License-Identifier: Apache-2.0 | 
|  |  | 
|  | import logging as log | 
|  | import threading | 
|  | from signal import SIGINT, signal | 
|  |  | 
|  | from Launcher import LauncherError | 
|  | from StatusPrinter import get_status_printer | 
|  | from Timer import Timer | 
|  | from utils import VERBOSE | 
|  |  | 
|  |  | 
|  | # Sum of lenghts of all lists in the given dict. | 
|  | def sum_dict_lists(d): | 
|  | '''Given a dict whose key values are lists, return sum of lengths of | 
|  | thoese lists.''' | 
|  | return sum([len(d[k]) for k in d]) | 
|  |  | 
|  |  | 
|  | def get_next_item(arr, index): | 
|  | '''Perpetually get an item from a list. | 
|  |  | 
|  | Returns the next item on the list by advancing the index by 1. If the index | 
|  | is already the last item on the list, it loops back to the start, thus | 
|  | implementing a circular list. | 
|  |  | 
|  | arr is a subscriptable list. | 
|  | index is the index of the last item returned. | 
|  |  | 
|  | Returns (item, index) if successful. | 
|  | Raises IndexError if arr is empty. | 
|  | ''' | 
|  | index += 1 | 
|  | try: | 
|  | item = arr[index] | 
|  | except IndexError: | 
|  | index = 0 | 
|  | try: | 
|  | item = arr[index] | 
|  | except IndexError: | 
|  | raise IndexError("List is empty!") | 
|  |  | 
|  | return item, index | 
|  |  | 
|  |  | 
|  | class Scheduler: | 
|  | '''An object that runs one or more Deploy items''' | 
|  |  | 
|  | def __init__(self, items, launcher_cls): | 
|  | self.items = items | 
|  |  | 
|  | # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen | 
|  | # target and cfg. As items in _scheduled are ready to be run (once | 
|  | # their dependencies pass), they are moved to the _queued list, where | 
|  | # they wait until slots are available for them to be dispatched. | 
|  | # When all items (in all cfgs) of a target are done, it is removed from | 
|  | # this dictionary. | 
|  | self._scheduled = {} | 
|  | self.add_to_scheduled(items) | 
|  |  | 
|  | # Print status periodically using an external status printer. | 
|  | self.status_printer = get_status_printer() | 
|  | self.status_printer.print_header( | 
|  | msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, " | 
|  | "T: total") | 
|  |  | 
|  | # Sets of items, split up by their current state. The sets are | 
|  | # disjoint and their union equals the keys of self.item_to_status. | 
|  | # _queued is a list so that we dispatch things in order (relevant | 
|  | # for things like tests where we have ordered things cleverly to | 
|  | # try to see failures early). They are maintained for each target. | 
|  |  | 
|  | # The list of available targets and the list of running items in each | 
|  | # target are polled in a circular fashion, looping back to the start. | 
|  | # This is done to allow us to poll a smaller subset of jobs rather than | 
|  | # the entire regression. We keep rotating through our list of running | 
|  | # items, picking up where we left off on the last poll. | 
|  | self._targets = list(self._scheduled.keys()) | 
|  | self._queued = {} | 
|  | self._running = {} | 
|  | self._passed = {} | 
|  | self._failed = {} | 
|  | self._killed = {} | 
|  | self._total = {} | 
|  | self.last_target_polled_idx = -1 | 
|  | self.last_item_polled_idx = {} | 
|  | for target in self._scheduled: | 
|  | self._queued[target] = [] | 
|  | self._running[target] = [] | 
|  | self._passed[target] = set() | 
|  | self._failed[target] = set() | 
|  | self._killed[target] = set() | 
|  | self._total[target] = sum_dict_lists(self._scheduled[target]) | 
|  | self.last_item_polled_idx[target] = -1 | 
|  |  | 
|  | # Stuff for printing the status. | 
|  | width = len(str(self._total[target])) | 
|  | field_fmt = '{{:0{}d}}'.format(width) | 
|  | self.msg_fmt = 'Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}'.format( | 
|  | field_fmt) | 
|  | msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) | 
|  | self.status_printer.init_target(target=target, msg=msg) | 
|  |  | 
|  | # A map from the Deploy objects tracked by this class to their | 
|  | # current status. This status is 'Q', 'D', 'P', 'F' or 'K', | 
|  | # corresponding to membership in the dicts above. This is not | 
|  | # per-target. | 
|  | self.item_to_status = {} | 
|  |  | 
|  | # Create the launcher instance for all items. | 
|  | for item in self.items: | 
|  | item.create_launcher() | 
|  |  | 
|  | # The chosen launcher class. This allows us to access launcher | 
|  | # variant-specific settings such as max parallel jobs & poll rate. | 
|  | self.launcher_cls = launcher_cls | 
|  |  | 
|  | def run(self): | 
|  | '''Run all scheduled jobs and return the results. | 
|  |  | 
|  | Returns the results (status) of all items dispatched for all | 
|  | targets and cfgs. | 
|  | ''' | 
|  |  | 
|  | timer = Timer() | 
|  |  | 
|  | # Catch one SIGINT and tell the runner to quit. On a second, die. | 
|  | stop_now = threading.Event() | 
|  | old_handler = None | 
|  |  | 
|  | def on_sigint(signal_received, frame): | 
|  | log.info('Received SIGINT. Exiting gracefully. ' | 
|  | 'Send another to force immediate quit ' | 
|  | '(but you may need to manually kill child processes)') | 
|  |  | 
|  | # Restore old handler to catch any second signal | 
|  | assert old_handler is not None | 
|  | signal(SIGINT, old_handler) | 
|  |  | 
|  | stop_now.set() | 
|  |  | 
|  | old_handler = signal(SIGINT, on_sigint) | 
|  |  | 
|  | # Enqueue all items of the first target. | 
|  | self._enqueue_successors(None) | 
|  |  | 
|  | try: | 
|  | while True: | 
|  | if stop_now.is_set(): | 
|  | # We've had an interrupt. Kill any jobs that are running. | 
|  | self._kill() | 
|  |  | 
|  | hms = timer.hms() | 
|  | changed = self._poll(hms) or timer.check_time() | 
|  | self._dispatch(hms) | 
|  | if changed: | 
|  | if self._check_if_done(hms): | 
|  | break | 
|  |  | 
|  | # This is essentially sleep(1) to wait a second between each | 
|  | # polling loop. But we do it with a bounded wait on stop_now so | 
|  | # that we jump back to the polling loop immediately on a | 
|  | # signal. | 
|  | stop_now.wait(timeout=self.launcher_cls.poll_freq) | 
|  |  | 
|  | finally: | 
|  | signal(SIGINT, old_handler) | 
|  |  | 
|  | # Cleanup the status printer. | 
|  | self.status_printer.exit() | 
|  |  | 
|  | # We got to the end without anything exploding. Return the results. | 
|  | return self.item_to_status | 
|  |  | 
|  | def add_to_scheduled(self, items): | 
|  | '''Add items to the list of _scheduled. | 
|  |  | 
|  | 'items' is a list of Deploy objects. | 
|  | ''' | 
|  |  | 
|  | for item in items: | 
|  | target_dict = self._scheduled.setdefault(item.target, {}) | 
|  | cfg_list = target_dict.setdefault(item.sim_cfg, []) | 
|  | if item not in cfg_list: | 
|  | cfg_list.append(item) | 
|  |  | 
|  | def _remove_from_scheduled(self, item): | 
|  | '''Removes the item from _scheduled[target][cfg] list. | 
|  |  | 
|  | When all items in _scheduled[target][cfg] are finally removed, the cfg | 
|  | key is deleted. | 
|  | ''' | 
|  | target_dict = self._scheduled[item.target] | 
|  | cfg_list = target_dict.get(item.sim_cfg) | 
|  | if cfg_list is not None: | 
|  | try: | 
|  | cfg_list.remove(item) | 
|  | except ValueError: | 
|  | pass | 
|  | if not cfg_list: | 
|  | del target_dict[item.sim_cfg] | 
|  |  | 
|  | def _get_next_target(self, curr_target): | 
|  | '''Returns the target that succeeds the current one. | 
|  |  | 
|  | curr_target is the target of the job that just completed (example - | 
|  | build). If it is None, then the first target in _scheduled is returned. | 
|  | ''' | 
|  |  | 
|  | if curr_target is None: | 
|  | return next(iter(self._scheduled)) | 
|  |  | 
|  | assert curr_target in self._scheduled | 
|  | target_iterator = iter(self._scheduled) | 
|  | target = next(target_iterator) | 
|  |  | 
|  | found = False | 
|  | while not found: | 
|  | if target == curr_target: | 
|  | found = True | 
|  | try: | 
|  | target = next(target_iterator) | 
|  | except StopIteration: | 
|  | return None | 
|  |  | 
|  | return target | 
|  |  | 
|  | def _enqueue_successors(self, item=None): | 
|  | '''Move an item's successors from _scheduled to _queued. | 
|  |  | 
|  | 'item' is the recently run job that has completed. If None, then we | 
|  | move all available items in all available cfgs in _scheduled's first | 
|  | target. If 'item' is specified, then we find its successors and move | 
|  | them to _queued. | 
|  | ''' | 
|  |  | 
|  | for next_item in self._get_successors(item): | 
|  | assert next_item not in self.item_to_status | 
|  | assert next_item not in self._queued[next_item.target] | 
|  | self.item_to_status[next_item] = 'Q' | 
|  | self._queued[next_item.target].append(next_item) | 
|  | self._remove_from_scheduled(next_item) | 
|  |  | 
|  | def _cancel_successors(self, item): | 
|  | '''Cancel an item's successors recursively by moving them from | 
|  | _scheduled or _queued to _killed.''' | 
|  |  | 
|  | items = self._get_successors(item) | 
|  | while items: | 
|  | next_item = items.pop() | 
|  | self._cancel_item(next_item, cancel_successors=False) | 
|  | items.extend(self._get_successors(next_item)) | 
|  |  | 
|  | def _get_successors(self, item=None): | 
|  | '''Find immediate successors of an item. | 
|  |  | 
|  | 'item' is a job that has completed. We choose the target that follows | 
|  | the 'item''s current target and find the list of successors whose | 
|  | dependency list contains 'item'. If 'item' is None, we pick successors | 
|  | from all cfgs, else we pick successors only from the cfg to which the | 
|  | item belongs. | 
|  |  | 
|  | Returns the list of item's successors, or an empty list if there are | 
|  | none. | 
|  | ''' | 
|  |  | 
|  | if item is None: | 
|  | target = self._get_next_target(None) | 
|  | cfgs = set(self._scheduled[target]) | 
|  | else: | 
|  | target = self._get_next_target(item.target) | 
|  | cfgs = {item.sim_cfg} | 
|  |  | 
|  | if target is None: | 
|  | return [] | 
|  |  | 
|  | # Find item's successors that can be enqueued. We assume here that | 
|  | # only the immediately succeeding target can be enqueued at this | 
|  | # time. | 
|  | successors = [] | 
|  | for cfg in cfgs: | 
|  | for next_item in self._scheduled[target][cfg]: | 
|  | if item is not None: | 
|  | # Something is terribly wrong if item exists but the | 
|  | # next_item's dependency list is empty. | 
|  | assert next_item.dependencies | 
|  | if item not in next_item.dependencies: | 
|  | continue | 
|  |  | 
|  | if self._ok_to_enqueue(next_item): | 
|  | successors.append(next_item) | 
|  |  | 
|  | return successors | 
|  |  | 
|  | def _ok_to_enqueue(self, item): | 
|  | '''Returns true if ALL dependencies of item are complete.''' | 
|  |  | 
|  | for dep in item.dependencies: | 
|  | # Ignore dependencies that were not scheduled to run. | 
|  | if dep not in self.items: | 
|  | continue | 
|  |  | 
|  | # Has the dep even been enqueued? | 
|  | if dep not in self.item_to_status: | 
|  | return False | 
|  |  | 
|  | # Has the dep completed? | 
|  | if self.item_to_status[dep] not in ["P", "F", "K"]: | 
|  | return False | 
|  |  | 
|  | return True | 
|  |  | 
|  | def _ok_to_run(self, item): | 
|  | '''Returns true if the required dependencies have passed. | 
|  |  | 
|  | The item's needs_all_dependencies_passing setting is used to figure | 
|  | out whether we can run this item or not, based on its dependent jobs' | 
|  | statuses. | 
|  | ''' | 
|  | # 'item' can run only if its dependencies have passed (their results | 
|  | # should already show up in the item to status map). | 
|  | for dep in item.dependencies: | 
|  | # Ignore dependencies that were not scheduled to run. | 
|  | if dep not in self.items: | 
|  | continue | 
|  |  | 
|  | dep_status = self.item_to_status[dep] | 
|  | assert dep_status in ['P', 'F', 'K'] | 
|  |  | 
|  | if item.needs_all_dependencies_passing: | 
|  | if dep_status in ['F', 'K']: | 
|  | return False | 
|  | else: | 
|  | if dep_status in ['P']: | 
|  | return True | 
|  |  | 
|  | return item.needs_all_dependencies_passing | 
|  |  | 
|  | def _poll(self, hms): | 
|  | '''Check for running items that have finished | 
|  |  | 
|  | Returns True if something changed. | 
|  | ''' | 
|  |  | 
|  | max_poll = min(self.launcher_cls.max_poll, | 
|  | sum_dict_lists(self._running)) | 
|  |  | 
|  | # If there are no jobs running, we are likely done (possibly because | 
|  | # of a SIGINT). Since poll() was called anyway, signal that something | 
|  | # has indeed changed. | 
|  | if not max_poll: | 
|  | return True | 
|  |  | 
|  | changed = False | 
|  | while max_poll: | 
|  | target, self.last_target_polled_idx = get_next_item( | 
|  | self._targets, self.last_target_polled_idx) | 
|  |  | 
|  | while self._running[target] and max_poll: | 
|  | max_poll -= 1 | 
|  | item, self.last_item_polled_idx[target] = get_next_item( | 
|  | self._running[target], self.last_item_polled_idx[target]) | 
|  | status = item.launcher.poll() | 
|  | level = VERBOSE | 
|  |  | 
|  | assert status in ['D', 'P', 'F', 'K'] | 
|  | if status == 'D': | 
|  | continue | 
|  | elif status == 'P': | 
|  | self._passed[target].add(item) | 
|  | elif status == 'F': | 
|  | self._failed[target].add(item) | 
|  | level = log.ERROR | 
|  | else: | 
|  | self._killed[target].add(item) | 
|  | level = log.ERROR | 
|  |  | 
|  | self._running[target].pop(self.last_item_polled_idx[target]) | 
|  | self.last_item_polled_idx[target] -= 1 | 
|  | self.item_to_status[item] = status | 
|  | log.log(level, "[%s]: [%s]: [status] [%s: %s]", hms, target, | 
|  | item.full_name, status) | 
|  |  | 
|  | # Enqueue item's successors regardless of its status. | 
|  | # | 
|  | # It may be possible that a failed item's successor may not | 
|  | # need all of its dependents to pass (if it has other dependent | 
|  | # jobs). Hence we enqueue all successors rather than canceling | 
|  | # them right here. We leave it to _dispatch() to figure out | 
|  | # whether an enqueued item can be run or not. | 
|  | self._enqueue_successors(item) | 
|  | changed = True | 
|  |  | 
|  | return changed | 
|  |  | 
|  | def _dispatch(self, hms): | 
|  | '''Dispatch some queued items if possible.''' | 
|  |  | 
|  | slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running) | 
|  | if slots <= 0: | 
|  | return | 
|  |  | 
|  | # Compute how many slots to allocate to each target based on their | 
|  | # weights. | 
|  | sum_weight = 0 | 
|  | slots_filled = 0 | 
|  | total_weight = sum(self._queued[t][0].weight for t in self._queued | 
|  | if self._queued[t]) | 
|  |  | 
|  | for target in self._scheduled: | 
|  | if not self._queued[target]: | 
|  | continue | 
|  |  | 
|  | # N slots are allocated to M targets each with W(m) weights with | 
|  | # the formula: | 
|  | # | 
|  | # N(m) = N * W(m) / T, where, | 
|  | #   T is the sum total of all weights. | 
|  | # | 
|  | # This is however, problematic due to fractions. Even after | 
|  | # rounding off to the nearest digit, slots may not be fully | 
|  | # utilized (one extra left). An alternate approach that avoids this | 
|  | # problem is as follows: | 
|  | # | 
|  | # N(m) = (N * S(W(m)) / T) - F(m), where, | 
|  | #   S(W(m)) is the running sum of weights upto current target m. | 
|  | #   F(m) is the running total of slots filled. | 
|  | # | 
|  | # The computed slots per target is nearly identical to the first | 
|  | # solution, except that it prioritizes the slot allocation to | 
|  | # targets that are earlier in the list such that in the end, all | 
|  | # slots are fully consumed. | 
|  | sum_weight += self._queued[target][0].weight | 
|  | target_slots = round( | 
|  | (slots * sum_weight) / total_weight) - slots_filled | 
|  | if target_slots <= 0: | 
|  | continue | 
|  | slots_filled += target_slots | 
|  |  | 
|  | to_dispatch = [] | 
|  | while self._queued[target] and target_slots > 0: | 
|  | next_item = self._queued[target].pop(0) | 
|  | if not self._ok_to_run(next_item): | 
|  | self._cancel_item(next_item, cancel_successors=False) | 
|  | self._enqueue_successors(next_item) | 
|  | continue | 
|  |  | 
|  | to_dispatch.append(next_item) | 
|  | target_slots -= 1 | 
|  |  | 
|  | if not to_dispatch: | 
|  | continue | 
|  |  | 
|  | log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, target, | 
|  | ", ".join(item.full_name for item in to_dispatch)) | 
|  |  | 
|  | for item in to_dispatch: | 
|  | self._running[target].append(item) | 
|  | self.item_to_status[item] = 'D' | 
|  | try: | 
|  | item.launcher.launch() | 
|  | except LauncherError as err: | 
|  | log.error('{}'.format(err)) | 
|  | self._kill_item(item) | 
|  |  | 
|  | def _kill(self): | 
|  | '''Kill any running items and cancel any that are waiting''' | 
|  |  | 
|  | # Cancel any waiting items. We take a copy of self._queued to avoid | 
|  | # iterating over the set as we modify it. | 
|  | for target in self._queued: | 
|  | for item in [item for item in self._queued[target]]: | 
|  | self._cancel_item(item) | 
|  |  | 
|  | # Kill any running items. Again, take a copy of the set to avoid | 
|  | # modifying it while iterating over it. | 
|  | for target in self._running: | 
|  | for item in [item for item in self._running[target]]: | 
|  | self._kill_item(item) | 
|  |  | 
|  | def _check_if_done(self, hms): | 
|  | '''Check if we are done executing all jobs. | 
|  |  | 
|  | Also, prints the status of currently running jobs. | 
|  | ''' | 
|  |  | 
|  | done = True | 
|  | for target in self._scheduled: | 
|  | done_cnt = sum([ | 
|  | len(self._passed[target]), | 
|  | len(self._failed[target]), | 
|  | len(self._killed[target]) | 
|  | ]) | 
|  | done = done and (done_cnt == self._total[target]) | 
|  |  | 
|  | # Skip if a target has not even begun executing. | 
|  | if not (self._queued[target] or self._running[target] or | 
|  | done_cnt > 0): | 
|  | continue | 
|  |  | 
|  | perc = done_cnt / self._total[target] * 100 | 
|  |  | 
|  | running = ", ".join( | 
|  | [f"{item.full_name}" for item in self._running[target]]) | 
|  | msg = self.msg_fmt.format(len(self._queued[target]), | 
|  | len(self._running[target]), | 
|  | len(self._passed[target]), | 
|  | len(self._failed[target]), | 
|  | len(self._killed[target]), | 
|  | self._total[target]) | 
|  | self.status_printer.update_target(target=target, | 
|  | msg=msg, | 
|  | hms=hms, | 
|  | perc=perc, | 
|  | running=running) | 
|  | return done | 
|  |  | 
|  | def _cancel_item(self, item, cancel_successors=True): | 
|  | '''Cancel an item and optionally all of its successors. | 
|  |  | 
|  | Supplied item may be in _scheduled list or the _queued list. From | 
|  | either, we move it straight to _killed. | 
|  | ''' | 
|  |  | 
|  | self.item_to_status[item] = 'K' | 
|  | self._killed[item.target].add(item) | 
|  | if item in self._queued[item.target]: | 
|  | self._queued[item.target].remove(item) | 
|  | else: | 
|  | self._remove_from_scheduled(item) | 
|  |  | 
|  | if cancel_successors: | 
|  | self._cancel_successors(item) | 
|  |  | 
|  | def _kill_item(self, item): | 
|  | '''Kill a running item and cancel all of its successors.''' | 
|  |  | 
|  | item.launcher.kill() | 
|  | self.item_to_status[item] = 'K' | 
|  | self._killed[item.target].add(item) | 
|  | self._running[item.target].remove(item) | 
|  | self._cancel_successors(item) |