Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 1 | # Copyright lowRISC contributors. |
| 2 | # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| 3 | # SPDX-License-Identifier: Apache-2.0 |
| 4 | |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 5 | import logging as log |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 6 | import threading |
Srikrishna Iyer | 4c472c6 | 2021-02-19 12:03:18 -0800 | [diff] [blame] | 7 | from signal import SIGINT, signal |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 8 | |
Srikrishna Iyer | 6578374 | 2021-02-10 21:42:12 -0800 | [diff] [blame] | 9 | from Launcher import LauncherError |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 10 | from StatusPrinter import get_status_printer |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 11 | from Timer import Timer |
Srikrishna Iyer | 4c472c6 | 2021-02-19 12:03:18 -0800 | [diff] [blame] | 12 | from utils import VERBOSE |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 13 | |
| 14 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 15 | # Sum of lenghts of all lists in the given dict. |
| 16 | def sum_dict_lists(d): |
| 17 | '''Given a dict whose key values are lists, return sum of lengths of |
| 18 | thoese lists.''' |
| 19 | return sum([len(d[k]) for k in d]) |
| 20 | |
| 21 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 22 | def get_next_item(arr, index): |
| 23 | '''Perpetually get an item from a list. |
| 24 | |
| 25 | Returns the next item on the list by advancing the index by 1. If the index |
| 26 | is already the last item on the list, it loops back to the start, thus |
| 27 | implementing a circular list. |
| 28 | |
| 29 | arr is a subscriptable list. |
| 30 | index is the index of the last item returned. |
| 31 | |
| 32 | Returns (item, index) if successful. |
| 33 | Raises IndexError if arr is empty. |
| 34 | ''' |
| 35 | index += 1 |
| 36 | try: |
| 37 | item = arr[index] |
| 38 | except IndexError: |
| 39 | index = 0 |
| 40 | try: |
| 41 | item = arr[index] |
| 42 | except IndexError: |
| 43 | raise IndexError("List is empty!") |
| 44 | |
| 45 | return item, index |
| 46 | |
| 47 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 48 | class Scheduler: |
| 49 | '''An object that runs one or more Deploy items''' |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 50 | def __init__(self, items, launcher_cls): |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 51 | self.items = items |
| 52 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 53 | # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen |
| 54 | # target and cfg. As items in _scheduled are ready to be run (once |
| 55 | # their dependencies pass), they are moved to the _queued list, where |
| 56 | # they wait until slots are available for them to be dispatched. |
| 57 | # When all items (in all cfgs) of a target are done, it is removed from |
| 58 | # this dictionary. |
| 59 | self._scheduled = {} |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 60 | self.add_to_scheduled(items) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 61 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 62 | # Print status periodically using an external status printer. |
| 63 | self.status_printer = get_status_printer() |
| 64 | self.status_printer.print_header( |
| 65 | msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, " |
| 66 | "T: total") |
| 67 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 68 | # Sets of items, split up by their current state. The sets are |
| 69 | # disjoint and their union equals the keys of self.item_to_status. |
| 70 | # _queued is a list so that we dispatch things in order (relevant |
| 71 | # for things like tests where we have ordered things cleverly to |
| 72 | # try to see failures early). They are maintained for each target. |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 73 | |
| 74 | # The list of available targets and the list of running items in each |
| 75 | # target are polled in a circular fashion, looping back to the start. |
| 76 | # This is done to allow us to poll a smaller subset of jobs rather than |
| 77 | # the entire regression. We keep rotating through our list of running |
| 78 | # items, picking up where we left off on the last poll. |
| 79 | self._targets = list(self._scheduled.keys()) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 80 | self._queued = {} |
| 81 | self._running = {} |
| 82 | self._passed = {} |
| 83 | self._failed = {} |
| 84 | self._killed = {} |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 85 | self._total = {} |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 86 | self.last_target_polled_idx = -1 |
| 87 | self.last_item_polled_idx = {} |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 88 | for target in self._scheduled: |
| 89 | self._queued[target] = [] |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 90 | self._running[target] = [] |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 91 | self._passed[target] = set() |
| 92 | self._failed[target] = set() |
| 93 | self._killed[target] = set() |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 94 | self._total[target] = sum_dict_lists(self._scheduled[target]) |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 95 | self.last_item_polled_idx[target] = -1 |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 96 | |
| 97 | # Stuff for printing the status. |
| 98 | width = len(str(self._total[target])) |
| 99 | field_fmt = '{{:0{}d}}'.format(width) |
| 100 | self.msg_fmt = 'Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}'.format( |
| 101 | field_fmt) |
| 102 | msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) |
| 103 | self.status_printer.init_target(target=target, msg=msg) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 104 | |
| 105 | # A map from the Deploy objects tracked by this class to their |
| 106 | # current status. This status is 'Q', 'D', 'P', 'F' or 'K', |
| 107 | # corresponding to membership in the dicts above. This is not |
| 108 | # per-target. |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 109 | self.item_to_status = {} |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 110 | |
Srikrishna Iyer | d562377 | 2021-06-18 22:23:07 -0700 | [diff] [blame] | 111 | # Create the launcher instance for all items. |
| 112 | for item in self.items: |
| 113 | item.create_launcher() |
| 114 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 115 | # The chosen launcher class. This allows us to access launcher |
| 116 | # variant-specific settings such as max parallel jobs & poll rate. |
| 117 | self.launcher_cls = launcher_cls |
| 118 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 119 | def run(self): |
| 120 | '''Run all scheduled jobs and return the results. |
Rupert Swarbrick | c0e753d | 2020-06-04 13:27:58 +0100 | [diff] [blame] | 121 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 122 | Returns the results (status) of all items dispatched for all |
| 123 | targets and cfgs. |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 124 | ''' |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 125 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 126 | timer = Timer() |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 127 | |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 128 | # Catch one SIGINT and tell the runner to quit. On a second, die. |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 129 | stop_now = threading.Event() |
| 130 | old_handler = None |
| 131 | |
| 132 | def on_sigint(signal_received, frame): |
| 133 | log.info('Received SIGINT. Exiting gracefully. ' |
| 134 | 'Send another to force immediate quit ' |
| 135 | '(but you may need to manually kill child processes)') |
| 136 | |
| 137 | # Restore old handler to catch any second signal |
| 138 | assert old_handler is not None |
| 139 | signal(SIGINT, old_handler) |
| 140 | |
| 141 | stop_now.set() |
| 142 | |
| 143 | old_handler = signal(SIGINT, on_sigint) |
| 144 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 145 | # Enqueue all items of the first target. |
| 146 | self._enqueue_successors(None) |
| 147 | |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 148 | try: |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 149 | while True: |
| 150 | if stop_now.is_set(): |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 151 | # We've had an interrupt. Kill any jobs that are running. |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 152 | self._kill() |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 153 | |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 154 | hms = timer.hms() |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 155 | changed = self._poll(hms) or timer.check_time() |
| 156 | self._dispatch(hms) |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 157 | if changed: |
| 158 | if self._check_if_done(hms): |
| 159 | break |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 160 | |
| 161 | # This is essentially sleep(1) to wait a second between each |
| 162 | # polling loop. But we do it with a bounded wait on stop_now so |
| 163 | # that we jump back to the polling loop immediately on a |
| 164 | # signal. |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 165 | stop_now.wait(timeout=self.launcher_cls.poll_freq) |
| 166 | |
Rupert Swarbrick | a2892a5 | 2020-10-12 08:50:08 +0100 | [diff] [blame] | 167 | finally: |
| 168 | signal(SIGINT, old_handler) |
| 169 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 170 | # Cleaup the status printer. |
| 171 | self.status_printer.exit() |
| 172 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 173 | # We got to the end without anything exploding. Return the results. |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 174 | return self.item_to_status |
| 175 | |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 176 | def add_to_scheduled(self, items): |
| 177 | '''Add items to the list of _scheduled. |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 178 | |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 179 | 'items' is a list of Deploy objects. |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 180 | ''' |
Rupert Swarbrick | 43bd842 | 2020-10-12 16:26:29 +0100 | [diff] [blame] | 181 | |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 182 | for item in items: |
| 183 | target_dict = self._scheduled.setdefault(item.target, {}) |
| 184 | cfg_list = target_dict.setdefault(item.sim_cfg, []) |
| 185 | if item not in cfg_list: |
| 186 | cfg_list.append(item) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 187 | |
| 188 | def _remove_from_scheduled(self, item): |
| 189 | '''Removes the item from _scheduled[target][cfg] list. |
| 190 | |
| 191 | When all items in _scheduled[target][cfg] are finally removed, the cfg |
| 192 | key is deleted. |
| 193 | ''' |
| 194 | target_dict = self._scheduled[item.target] |
| 195 | cfg_list = target_dict.get(item.sim_cfg) |
| 196 | if cfg_list is not None: |
| 197 | try: |
| 198 | cfg_list.remove(item) |
| 199 | except ValueError: |
| 200 | pass |
| 201 | if not cfg_list: |
| 202 | del target_dict[item.sim_cfg] |
| 203 | |
| 204 | def _get_next_target(self, curr_target): |
| 205 | '''Returns the target that succeeds the current one. |
| 206 | |
| 207 | curr_target is the target of the job that just completed (example - |
| 208 | build). If it is None, then the first target in _scheduled is returned. |
| 209 | ''' |
| 210 | |
| 211 | if curr_target is None: |
| 212 | return next(iter(self._scheduled)) |
| 213 | |
| 214 | assert curr_target in self._scheduled |
| 215 | target_iterator = iter(self._scheduled) |
| 216 | target = next(target_iterator) |
| 217 | |
| 218 | found = False |
| 219 | while not found: |
| 220 | if target == curr_target: |
| 221 | found = True |
| 222 | try: |
| 223 | target = next(target_iterator) |
| 224 | except StopIteration: |
| 225 | return None |
| 226 | |
| 227 | return target |
| 228 | |
| 229 | def _enqueue_successors(self, item=None): |
| 230 | '''Move an item's successors from _scheduled to _queued. |
| 231 | |
| 232 | 'item' is the recently run job that has completed. If None, then we |
| 233 | move all available items in all available cfgs in _scheduled's first |
| 234 | target. If 'item' is specified, then we find its successors and move |
| 235 | them to _queued. |
| 236 | ''' |
| 237 | |
| 238 | for next_item in self._get_successors(item): |
| 239 | assert next_item not in self.item_to_status |
| 240 | assert next_item not in self._queued[next_item.target] |
| 241 | self.item_to_status[next_item] = 'Q' |
| 242 | self._queued[next_item.target].append(next_item) |
| 243 | self._remove_from_scheduled(next_item) |
| 244 | |
| 245 | def _cancel_successors(self, item): |
| 246 | '''Cancel an item's successors recursively by moving them from |
| 247 | _scheduled or _queued to _killed.''' |
| 248 | |
| 249 | items = self._get_successors(item) |
| 250 | while items: |
| 251 | next_item = items.pop() |
| 252 | self._cancel_item(next_item, cancel_successors=False) |
| 253 | items.extend(self._get_successors(next_item)) |
| 254 | |
| 255 | def _get_successors(self, item=None): |
| 256 | '''Find immediate successors of an item. |
| 257 | |
| 258 | 'item' is a job that has completed. We choose the target that follows |
| 259 | the 'item''s current target and find the list of successors whose |
| 260 | dependency list contains 'item'. If 'item' is None, we pick successors |
| 261 | from all cfgs, else we pick successors only from the cfg to which the |
| 262 | item belongs. |
| 263 | |
| 264 | Returns the list of item's successors, or an empty list if there are |
| 265 | none. |
| 266 | ''' |
| 267 | |
| 268 | if item is None: |
| 269 | target = self._get_next_target(None) |
| 270 | cfgs = set(self._scheduled[target]) |
| 271 | else: |
| 272 | target = self._get_next_target(item.target) |
| 273 | cfgs = {item.sim_cfg} |
| 274 | |
| 275 | if target is None: |
| 276 | return [] |
| 277 | |
| 278 | # Find item's successors that can be enqueued. We assume here that |
| 279 | # only the immediately succeeding target can be enqueued at this |
| 280 | # time. |
| 281 | successors = [] |
| 282 | for cfg in cfgs: |
| 283 | for next_item in self._scheduled[target][cfg]: |
| 284 | if item is not None: |
| 285 | # Something is terribly wrong if item exists but the |
| 286 | # next_item's dependency list is empty. |
| 287 | assert next_item.dependencies |
| 288 | if item not in next_item.dependencies: |
| 289 | continue |
| 290 | |
| 291 | if self._ok_to_enqueue(next_item): |
| 292 | successors.append(next_item) |
| 293 | |
| 294 | return successors |
| 295 | |
| 296 | def _ok_to_enqueue(self, item): |
| 297 | '''Returns true if ALL dependencies of item are complete.''' |
| 298 | |
| 299 | for dep in item.dependencies: |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 300 | # Ignore dependencies that were not scheduled to run. |
| 301 | if dep not in self.items: |
| 302 | continue |
| 303 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 304 | # Has the dep even been enqueued? |
| 305 | if dep not in self.item_to_status: |
| 306 | return False |
| 307 | |
| 308 | # Has the dep completed? |
| 309 | if self.item_to_status[dep] not in ["P", "F", "K"]: |
| 310 | return False |
| 311 | |
| 312 | return True |
| 313 | |
| 314 | def _ok_to_run(self, item): |
| 315 | '''Returns true if the required dependencies have passed. |
| 316 | |
| 317 | The item's needs_all_dependencies_passing setting is used to figure |
| 318 | out whether we can run this item or not, based on its dependent jobs' |
| 319 | statuses. |
| 320 | ''' |
| 321 | # 'item' can run only if its dependencies have passed (their results |
| 322 | # should already show up in the item to status map). |
| 323 | for dep in item.dependencies: |
Srikrishna Iyer | c7a5044 | 2021-03-25 21:47:56 -0700 | [diff] [blame] | 324 | # Ignore dependencies that were not scheduled to run. |
| 325 | if dep not in self.items: |
| 326 | continue |
| 327 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 328 | dep_status = self.item_to_status[dep] |
| 329 | assert dep_status in ['P', 'F', 'K'] |
| 330 | |
| 331 | if item.needs_all_dependencies_passing: |
| 332 | if dep_status in ['F', 'K']: |
| 333 | return False |
| 334 | else: |
| 335 | if dep_status in ['P']: |
| 336 | return True |
| 337 | |
| 338 | return item.needs_all_dependencies_passing |
| 339 | |
| 340 | def _poll(self, hms): |
| 341 | '''Check for running items that have finished |
| 342 | |
| 343 | Returns True if something changed. |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 344 | ''' |
| 345 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 346 | max_poll = min(self.launcher_cls.max_poll, |
| 347 | sum_dict_lists(self._running)) |
| 348 | |
Srikrishna Iyer | c9e4ee8 | 2021-04-22 00:18:25 -0700 | [diff] [blame] | 349 | # If there are no jobs running, we are likely done (possibly because |
| 350 | # of a SIGINT). Since poll() was called anyway, signal that something |
| 351 | # has indeed changed. |
| 352 | if not max_poll: |
| 353 | return True |
| 354 | |
| 355 | changed = False |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 356 | while max_poll: |
| 357 | target, self.last_target_polled_idx = get_next_item( |
| 358 | self._targets, self.last_target_polled_idx) |
| 359 | |
| 360 | while self._running[target] and max_poll: |
| 361 | max_poll -= 1 |
| 362 | item, self.last_item_polled_idx[target] = get_next_item( |
| 363 | self._running[target], self.last_item_polled_idx[target]) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 364 | status = item.launcher.poll() |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 365 | level = VERBOSE |
| 366 | |
| 367 | assert status in ['D', 'P', 'F', 'K'] |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 368 | if status == 'D': |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 369 | continue |
| 370 | elif status == 'P': |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 371 | self._passed[target].add(item) |
| 372 | elif status == 'F': |
| 373 | self._failed[target].add(item) |
| 374 | level = log.ERROR |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 375 | else: |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 376 | self._killed[target].add(item) |
| 377 | level = log.ERROR |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 378 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 379 | self._running[target].pop(self.last_item_polled_idx[target]) |
| 380 | self.last_item_polled_idx[target] -= 1 |
| 381 | self.item_to_status[item] = status |
| 382 | log.log(level, "[%s]: [%s]: [status] [%s: %s]", hms, target, |
| 383 | item.full_name, status) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 384 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 385 | # Enqueue item's successors regardless of its status. |
| 386 | # |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 387 | # It may be possible that a failed item's successor may not |
| 388 | # need all of its dependents to pass (if it has other dependent |
| 389 | # jobs). Hence we enqueue all successors rather than canceling |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 390 | # them right here. We leave it to _dispatch() to figure out |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 391 | # whether an enqueued item can be run or not. |
| 392 | self._enqueue_successors(item) |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 393 | changed = True |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 394 | |
| 395 | return changed |
| 396 | |
| 397 | def _dispatch(self, hms): |
| 398 | '''Dispatch some queued items if possible.''' |
| 399 | |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 400 | slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running) |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 401 | if slots <= 0: |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 402 | return |
| 403 | |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 404 | # Compute how many slots to allocate to each target based on their |
| 405 | # weights. |
| 406 | sum_weight = 0 |
| 407 | slots_filled = 0 |
| 408 | total_weight = sum(self._queued[t][0].weight for t in self._queued |
| 409 | if self._queued[t]) |
| 410 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 411 | for target in self._scheduled: |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 412 | if not self._queued[target]: |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 413 | continue |
| 414 | |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 415 | # N slots are allocated to M targets each with W(m) weights with |
| 416 | # the formula: |
| 417 | # |
| 418 | # N(m) = N * W(m) / T, where, |
| 419 | # T is the sum total of all weights. |
| 420 | # |
| 421 | # This is however, problematic due to fractions. Even after |
| 422 | # rounding off to the nearest digit, slots may not be fully |
| 423 | # utilized (one extra left). An alternate approach that avoids this |
| 424 | # problem is as follows: |
| 425 | # |
| 426 | # N(m) = (N * S(W(m)) / T) - F(m), where, |
| 427 | # S(W(m)) is the running sum of weights upto current target m. |
| 428 | # F(m) is the running total of slots filled. |
| 429 | # |
| 430 | # The computed slots per target is nearly identical to the first |
| 431 | # solution, except that it prioritizes the slot allocation to |
| 432 | # targets that are earlier in the list such that in the end, all |
| 433 | # slots are fully consumed. |
| 434 | sum_weight += self._queued[target][0].weight |
| 435 | target_slots = round( |
| 436 | (slots * sum_weight) / total_weight) - slots_filled |
| 437 | if target_slots <= 0: |
| 438 | continue |
| 439 | slots_filled += target_slots |
| 440 | |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 441 | to_dispatch = [] |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 442 | while self._queued[target] and target_slots > 0: |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 443 | next_item = self._queued[target].pop(0) |
| 444 | if not self._ok_to_run(next_item): |
| 445 | self._cancel_item(next_item, cancel_successors=False) |
| 446 | self._enqueue_successors(next_item) |
| 447 | continue |
| 448 | |
| 449 | to_dispatch.append(next_item) |
Srikrishna Iyer | 1d2899f | 2021-02-11 18:23:15 -0800 | [diff] [blame] | 450 | target_slots -= 1 |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 451 | |
| 452 | if not to_dispatch: |
| 453 | continue |
| 454 | |
| 455 | log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, target, |
| 456 | ", ".join(item.full_name for item in to_dispatch)) |
| 457 | |
| 458 | for item in to_dispatch: |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 459 | self._running[target].append(item) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 460 | self.item_to_status[item] = 'D' |
| 461 | try: |
| 462 | item.launcher.launch() |
| 463 | except LauncherError as err: |
| 464 | log.error('{}'.format(err)) |
| 465 | self._kill_item(item) |
| 466 | |
| 467 | def _kill(self): |
| 468 | '''Kill any running items and cancel any that are waiting''' |
| 469 | |
| 470 | # Cancel any waiting items. We take a copy of self._queued to avoid |
| 471 | # iterating over the set as we modify it. |
| 472 | for target in self._queued: |
| 473 | for item in [item for item in self._queued[target]]: |
| 474 | self._cancel_item(item) |
| 475 | |
| 476 | # Kill any running items. Again, take a copy of the set to avoid |
| 477 | # modifying it while iterating over it. |
Srikrishna Iyer | 206721c | 2021-04-15 15:23:08 -0700 | [diff] [blame] | 478 | for target in self._running: |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 479 | for item in [item for item in self._running[target]]: |
| 480 | self._kill_item(item) |
| 481 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 482 | def _check_if_done(self, hms): |
| 483 | '''Check if we are done executing all jobs. |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 484 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 485 | Also, prints the status of currently running jobs. |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 486 | ''' |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 487 | |
| 488 | done = True |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 489 | for target in self._scheduled: |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 490 | done_cnt = sum([ |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 491 | len(self._passed[target]), |
| 492 | len(self._failed[target]), |
| 493 | len(self._killed[target]) |
| 494 | ]) |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 495 | done = done and (done_cnt == self._total[target]) |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 496 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 497 | # Skip if a target has not even begun executing. |
| 498 | if not (self._queued[target] or self._running[target] or |
| 499 | done_cnt > 0): |
| 500 | continue |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 501 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 502 | perc = done_cnt / self._total[target] * 100 |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 503 | |
Srikrishna Iyer | df0936b | 2021-02-14 16:36:02 -0800 | [diff] [blame] | 504 | msg = self.msg_fmt.format(len(self._queued[target]), |
| 505 | len(self._running[target]), |
| 506 | len(self._passed[target]), |
| 507 | len(self._failed[target]), |
| 508 | len(self._killed[target]), |
| 509 | self._total[target]) |
| 510 | self.status_printer.update_target(target=target, |
| 511 | msg=msg, |
| 512 | hms=hms, |
| 513 | perc=perc) |
| 514 | return done |
Srikrishna Iyer | 7c48056 | 2021-02-11 01:00:24 -0800 | [diff] [blame] | 515 | |
| 516 | def _cancel_item(self, item, cancel_successors=True): |
| 517 | '''Cancel an item and optionally all of its successors. |
| 518 | |
| 519 | Supplied item may be in _scheduled list or the _queued list. From |
| 520 | either, we move it straight to _killed. |
| 521 | ''' |
| 522 | |
| 523 | self.item_to_status[item] = 'K' |
| 524 | self._killed[item.target].add(item) |
| 525 | if item in self._queued[item.target]: |
| 526 | self._queued[item.target].remove(item) |
| 527 | else: |
| 528 | self._remove_from_scheduled(item) |
| 529 | |
| 530 | if cancel_successors: |
| 531 | self._cancel_successors(item) |
| 532 | |
| 533 | def _kill_item(self, item): |
| 534 | '''Kill a running item and cancel all of its successors.''' |
| 535 | |
| 536 | item.launcher.kill() |
| 537 | self.item_to_status[item] = 'K' |
| 538 | self._killed[item.target].add(item) |
| 539 | self._running[item.target].remove(item) |
| 540 | self._cancel_successors(item) |