blob: f7605bd814030bb4738d17f4dbf5be2610d80f80 [file] [log] [blame]
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01001# Copyright lowRISC contributors.
2# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3# SPDX-License-Identifier: Apache-2.0
4
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01005import logging as log
Rupert Swarbricka2892a52020-10-12 08:50:08 +01006import threading
Srikrishna Iyer4c472c62021-02-19 12:03:18 -08007from signal import SIGINT, signal
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01008
Srikrishna Iyer65783742021-02-10 21:42:12 -08009from Launcher import LauncherError
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080010from StatusPrinter import get_status_printer
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010011from Timer import Timer
Srikrishna Iyer4c472c62021-02-19 12:03:18 -080012from utils import VERBOSE
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010013
14
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080015# Sum of lenghts of all lists in the given dict.
16def sum_dict_lists(d):
17 '''Given a dict whose key values are lists, return sum of lengths of
18 thoese lists.'''
19 return sum([len(d[k]) for k in d])
20
21
Srikrishna Iyer7c480562021-02-11 01:00:24 -080022class Scheduler:
23 '''An object that runs one or more Deploy items'''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010024
Srikrishna Iyer7c480562021-02-11 01:00:24 -080025 # Max jobs running at one time
26 max_parallel = 16
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010027
Srikrishna Iyer7c480562021-02-11 01:00:24 -080028 def __init__(self, items):
Srikrishna Iyerc7a50442021-03-25 21:47:56 -070029 self.items = items
30
Srikrishna Iyer7c480562021-02-11 01:00:24 -080031 # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen
32 # target and cfg. As items in _scheduled are ready to be run (once
33 # their dependencies pass), they are moved to the _queued list, where
34 # they wait until slots are available for them to be dispatched.
35 # When all items (in all cfgs) of a target are done, it is removed from
36 # this dictionary.
37 self._scheduled = {}
Srikrishna Iyerc7a50442021-03-25 21:47:56 -070038 self.add_to_scheduled(items)
Srikrishna Iyer7c480562021-02-11 01:00:24 -080039
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080040 # Print status periodically using an external status printer.
41 self.status_printer = get_status_printer()
42 self.status_printer.print_header(
43 msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, "
44 "T: total")
45
Srikrishna Iyer7c480562021-02-11 01:00:24 -080046 # Sets of items, split up by their current state. The sets are
47 # disjoint and their union equals the keys of self.item_to_status.
48 # _queued is a list so that we dispatch things in order (relevant
49 # for things like tests where we have ordered things cleverly to
50 # try to see failures early). They are maintained for each target.
51 self._queued = {}
52 self._running = {}
53 self._passed = {}
54 self._failed = {}
55 self._killed = {}
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080056 self._total = {}
Srikrishna Iyer7c480562021-02-11 01:00:24 -080057 for target in self._scheduled:
58 self._queued[target] = []
59 self._running[target] = set()
60 self._passed[target] = set()
61 self._failed[target] = set()
62 self._killed[target] = set()
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080063 self._total[target] = sum_dict_lists(self._scheduled[target])
64
65 # Stuff for printing the status.
66 width = len(str(self._total[target]))
67 field_fmt = '{{:0{}d}}'.format(width)
68 self.msg_fmt = 'Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}'.format(
69 field_fmt)
70 msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target])
71 self.status_printer.init_target(target=target, msg=msg)
Srikrishna Iyer7c480562021-02-11 01:00:24 -080072
73 # A map from the Deploy objects tracked by this class to their
74 # current status. This status is 'Q', 'D', 'P', 'F' or 'K',
75 # corresponding to membership in the dicts above. This is not
76 # per-target.
Rupert Swarbricka2892a52020-10-12 08:50:08 +010077 self.item_to_status = {}
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010078
Srikrishna Iyer7c480562021-02-11 01:00:24 -080079 def run(self):
80 '''Run all scheduled jobs and return the results.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010081
Srikrishna Iyer7c480562021-02-11 01:00:24 -080082 Returns the results (status) of all items dispatched for all
83 targets and cfgs.
Rupert Swarbricka2892a52020-10-12 08:50:08 +010084 '''
Rupert Swarbricka2892a52020-10-12 08:50:08 +010085
Srikrishna Iyer7c480562021-02-11 01:00:24 -080086 timer = Timer()
Rupert Swarbricka2892a52020-10-12 08:50:08 +010087
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010088 # Catch one SIGINT and tell the runner to quit. On a second, die.
Rupert Swarbricka2892a52020-10-12 08:50:08 +010089 stop_now = threading.Event()
90 old_handler = None
91
92 def on_sigint(signal_received, frame):
93 log.info('Received SIGINT. Exiting gracefully. '
94 'Send another to force immediate quit '
95 '(but you may need to manually kill child processes)')
96
97 # Restore old handler to catch any second signal
98 assert old_handler is not None
99 signal(SIGINT, old_handler)
100
101 stop_now.set()
102
103 old_handler = signal(SIGINT, on_sigint)
104
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800105 # Enqueue all items of the first target.
106 self._enqueue_successors(None)
107
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100108 try:
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100109 while True:
110 if stop_now.is_set():
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800111 # We've had an interrupt. Kill any jobs that are running.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100112 self._kill()
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100113
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100114 hms = timer.hms()
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800115 changed = self._poll(hms) or timer.check_time()
116 self._dispatch(hms)
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800117 if changed:
118 if self._check_if_done(hms):
119 break
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100120
121 # This is essentially sleep(1) to wait a second between each
122 # polling loop. But we do it with a bounded wait on stop_now so
123 # that we jump back to the polling loop immediately on a
124 # signal.
125 stop_now.wait(timeout=1)
126 finally:
127 signal(SIGINT, old_handler)
128
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800129 # Cleaup the status printer.
130 self.status_printer.exit()
131
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800132 # We got to the end without anything exploding. Return the results.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100133 return self.item_to_status
134
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700135 def add_to_scheduled(self, items):
136 '''Add items to the list of _scheduled.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100137
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700138 'items' is a list of Deploy objects.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100139 '''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100140
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700141 for item in items:
142 target_dict = self._scheduled.setdefault(item.target, {})
143 cfg_list = target_dict.setdefault(item.sim_cfg, [])
144 if item not in cfg_list:
145 cfg_list.append(item)
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800146
147 def _remove_from_scheduled(self, item):
148 '''Removes the item from _scheduled[target][cfg] list.
149
150 When all items in _scheduled[target][cfg] are finally removed, the cfg
151 key is deleted.
152 '''
153 target_dict = self._scheduled[item.target]
154 cfg_list = target_dict.get(item.sim_cfg)
155 if cfg_list is not None:
156 try:
157 cfg_list.remove(item)
158 except ValueError:
159 pass
160 if not cfg_list:
161 del target_dict[item.sim_cfg]
162
163 def _get_next_target(self, curr_target):
164 '''Returns the target that succeeds the current one.
165
166 curr_target is the target of the job that just completed (example -
167 build). If it is None, then the first target in _scheduled is returned.
168 '''
169
170 if curr_target is None:
171 return next(iter(self._scheduled))
172
173 assert curr_target in self._scheduled
174 target_iterator = iter(self._scheduled)
175 target = next(target_iterator)
176
177 found = False
178 while not found:
179 if target == curr_target:
180 found = True
181 try:
182 target = next(target_iterator)
183 except StopIteration:
184 return None
185
186 return target
187
188 def _enqueue_successors(self, item=None):
189 '''Move an item's successors from _scheduled to _queued.
190
191 'item' is the recently run job that has completed. If None, then we
192 move all available items in all available cfgs in _scheduled's first
193 target. If 'item' is specified, then we find its successors and move
194 them to _queued.
195 '''
196
197 for next_item in self._get_successors(item):
198 assert next_item not in self.item_to_status
199 assert next_item not in self._queued[next_item.target]
200 self.item_to_status[next_item] = 'Q'
201 self._queued[next_item.target].append(next_item)
202 self._remove_from_scheduled(next_item)
203
204 def _cancel_successors(self, item):
205 '''Cancel an item's successors recursively by moving them from
206 _scheduled or _queued to _killed.'''
207
208 items = self._get_successors(item)
209 while items:
210 next_item = items.pop()
211 self._cancel_item(next_item, cancel_successors=False)
212 items.extend(self._get_successors(next_item))
213
214 def _get_successors(self, item=None):
215 '''Find immediate successors of an item.
216
217 'item' is a job that has completed. We choose the target that follows
218 the 'item''s current target and find the list of successors whose
219 dependency list contains 'item'. If 'item' is None, we pick successors
220 from all cfgs, else we pick successors only from the cfg to which the
221 item belongs.
222
223 Returns the list of item's successors, or an empty list if there are
224 none.
225 '''
226
227 if item is None:
228 target = self._get_next_target(None)
229 cfgs = set(self._scheduled[target])
230 else:
231 target = self._get_next_target(item.target)
232 cfgs = {item.sim_cfg}
233
234 if target is None:
235 return []
236
237 # Find item's successors that can be enqueued. We assume here that
238 # only the immediately succeeding target can be enqueued at this
239 # time.
240 successors = []
241 for cfg in cfgs:
242 for next_item in self._scheduled[target][cfg]:
243 if item is not None:
244 # Something is terribly wrong if item exists but the
245 # next_item's dependency list is empty.
246 assert next_item.dependencies
247 if item not in next_item.dependencies:
248 continue
249
250 if self._ok_to_enqueue(next_item):
251 successors.append(next_item)
252
253 return successors
254
255 def _ok_to_enqueue(self, item):
256 '''Returns true if ALL dependencies of item are complete.'''
257
258 for dep in item.dependencies:
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700259 # Ignore dependencies that were not scheduled to run.
260 if dep not in self.items:
261 continue
262
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800263 # Has the dep even been enqueued?
264 if dep not in self.item_to_status:
265 return False
266
267 # Has the dep completed?
268 if self.item_to_status[dep] not in ["P", "F", "K"]:
269 return False
270
271 return True
272
273 def _ok_to_run(self, item):
274 '''Returns true if the required dependencies have passed.
275
276 The item's needs_all_dependencies_passing setting is used to figure
277 out whether we can run this item or not, based on its dependent jobs'
278 statuses.
279 '''
280 # 'item' can run only if its dependencies have passed (their results
281 # should already show up in the item to status map).
282 for dep in item.dependencies:
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700283 # Ignore dependencies that were not scheduled to run.
284 if dep not in self.items:
285 continue
286
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800287 dep_status = self.item_to_status[dep]
288 assert dep_status in ['P', 'F', 'K']
289
290 if item.needs_all_dependencies_passing:
291 if dep_status in ['F', 'K']:
292 return False
293 else:
294 if dep_status in ['P']:
295 return True
296
297 return item.needs_all_dependencies_passing
298
299 def _poll(self, hms):
300 '''Check for running items that have finished
301
302 Returns True if something changed.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800303 '''
304
305 changed = False
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800306 for target in self._scheduled:
307 to_pass = []
308 to_fail = []
309 for item in self._running[target]:
310 status = item.launcher.poll()
311 assert status in ['D', 'P', 'F']
312 if status == 'D':
313 # Still running
314 continue
315 elif status == 'P':
316 log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]", hms,
317 target, item.full_name)
318 to_pass.append(item)
319 else:
320 log.error("[%s]: [%s]: [status] [%s: F]", hms, target,
321 item.full_name)
322 to_fail.append(item)
323
324 for item in to_pass:
325 self._passed[target].add(item)
326 self._running[target].remove(item)
327 self.item_to_status[item] = 'P'
328 self._enqueue_successors(item)
329
330 for item in to_fail:
331 self._failed[target].add(item)
332 self._running[target].remove(item)
333 self.item_to_status[item] = 'F'
334 # It may be possible that a failed item's successor may not
335 # need all of its dependents to pass (if it has other dependent
336 # jobs). Hence we enqueue all successors rather than canceling
337 # them right here. We leave it to `_dispatch()` to figure out
338 # whether an enqueued item can be run or not.
339 self._enqueue_successors(item)
340
341 changed = changed or to_pass or to_fail
342
343 return changed
344
345 def _dispatch(self, hms):
346 '''Dispatch some queued items if possible.'''
347
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800348 slots = Scheduler.max_parallel - sum_dict_lists(self._running)
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800349 if slots <= 0:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800350 return
351
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800352 # Compute how many slots to allocate to each target based on their
353 # weights.
354 sum_weight = 0
355 slots_filled = 0
356 total_weight = sum(self._queued[t][0].weight for t in self._queued
357 if self._queued[t])
358
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800359 for target in self._scheduled:
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800360 if not self._queued[target]:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800361 continue
362
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800363 # N slots are allocated to M targets each with W(m) weights with
364 # the formula:
365 #
366 # N(m) = N * W(m) / T, where,
367 # T is the sum total of all weights.
368 #
369 # This is however, problematic due to fractions. Even after
370 # rounding off to the nearest digit, slots may not be fully
371 # utilized (one extra left). An alternate approach that avoids this
372 # problem is as follows:
373 #
374 # N(m) = (N * S(W(m)) / T) - F(m), where,
375 # S(W(m)) is the running sum of weights upto current target m.
376 # F(m) is the running total of slots filled.
377 #
378 # The computed slots per target is nearly identical to the first
379 # solution, except that it prioritizes the slot allocation to
380 # targets that are earlier in the list such that in the end, all
381 # slots are fully consumed.
382 sum_weight += self._queued[target][0].weight
383 target_slots = round(
384 (slots * sum_weight) / total_weight) - slots_filled
385 if target_slots <= 0:
386 continue
387 slots_filled += target_slots
388
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800389 to_dispatch = []
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800390 while self._queued[target] and target_slots > 0:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800391 next_item = self._queued[target].pop(0)
392 if not self._ok_to_run(next_item):
393 self._cancel_item(next_item, cancel_successors=False)
394 self._enqueue_successors(next_item)
395 continue
396
397 to_dispatch.append(next_item)
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800398 target_slots -= 1
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800399
400 if not to_dispatch:
401 continue
402
403 log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, target,
404 ", ".join(item.full_name for item in to_dispatch))
405
406 for item in to_dispatch:
407 self._running[target].add(item)
408 self.item_to_status[item] = 'D'
409 try:
410 item.launcher.launch()
411 except LauncherError as err:
412 log.error('{}'.format(err))
413 self._kill_item(item)
414
415 def _kill(self):
416 '''Kill any running items and cancel any that are waiting'''
417
418 # Cancel any waiting items. We take a copy of self._queued to avoid
419 # iterating over the set as we modify it.
420 for target in self._queued:
421 for item in [item for item in self._queued[target]]:
422 self._cancel_item(item)
423
424 # Kill any running items. Again, take a copy of the set to avoid
425 # modifying it while iterating over it.
426 for target in self._queued:
427 for item in [item for item in self._running[target]]:
428 self._kill_item(item)
429
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800430 def _check_if_done(self, hms):
431 '''Check if we are done executing all jobs.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800432
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800433 Also, prints the status of currently running jobs.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800434 '''
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800435
436 done = True
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800437 for target in self._scheduled:
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800438 done_cnt = sum([
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800439 len(self._passed[target]),
440 len(self._failed[target]),
441 len(self._killed[target])
442 ])
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800443 done = done and (done_cnt == self._total[target])
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800444
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800445 # Skip if a target has not even begun executing.
446 if not (self._queued[target] or self._running[target] or
447 done_cnt > 0):
448 continue
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800449
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800450 perc = done_cnt / self._total[target] * 100
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800451
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800452 msg = self.msg_fmt.format(len(self._queued[target]),
453 len(self._running[target]),
454 len(self._passed[target]),
455 len(self._failed[target]),
456 len(self._killed[target]),
457 self._total[target])
458 self.status_printer.update_target(target=target,
459 msg=msg,
460 hms=hms,
461 perc=perc)
462 return done
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800463
464 def _cancel_item(self, item, cancel_successors=True):
465 '''Cancel an item and optionally all of its successors.
466
467 Supplied item may be in _scheduled list or the _queued list. From
468 either, we move it straight to _killed.
469 '''
470
471 self.item_to_status[item] = 'K'
472 self._killed[item.target].add(item)
473 if item in self._queued[item.target]:
474 self._queued[item.target].remove(item)
475 else:
476 self._remove_from_scheduled(item)
477
478 if cancel_successors:
479 self._cancel_successors(item)
480
481 def _kill_item(self, item):
482 '''Kill a running item and cancel all of its successors.'''
483
484 item.launcher.kill()
485 self.item_to_status[item] = 'K'
486 self._killed[item.target].add(item)
487 self._running[item.target].remove(item)
488 self._cancel_successors(item)