blob: 51c893cff7f1a6b792836b9234e1115901d2da78 [file] [log] [blame]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
from collections import OrderedDict
import logging as log
from signal import SIGINT, signal
import threading
from utils import VERBOSE
from Deploy import Deploy, DeployError
from Timer import Timer
class TargetScheduler:
'''A scheduler for the jobs of a given target'''
def __init__(self):
# Sets of items, split up by their current state. The sets are disjoint
# and their union equals the keys of self.item_to_status.
self._queued = set()
self._running = set()
self._passed = set()
self._failed = set()
self._killed = set()
# A map from the Deploy objects tracked by this class to their current
# status. This status is 'Q', 'D', 'P', 'F' or 'K', corresponding to
# membership in the dicts above.
self.item_to_status = {}
# A flag set by check_if_done if all jobs are done.
self._done = True
def check_status(self):
'''Return (was_done, is_done, has_started)'''
was_done = self._done
self._done = not (self._queued or self._running)
return (was_done,
self._done,
(self._running or self._passed or
self._failed or self._killed))
def print_counters(self, tgt_name, hms):
total_cnt = len(self.item_to_status)
width = len(str(total_cnt))
field_fmt = '{{:0{}d}}'.format(width)
msg_fmt = ('[Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}]'
.format(field_fmt))
msg = msg_fmt.format(len(self._queued),
len(self._running),
len(self._passed),
len(self._failed),
len(self._killed),
total_cnt)
log.info("[%s]: [%s]: %s", hms, tgt_name, msg)
def add_item(self, item):
assert item not in self.item_to_status
self.item_to_status[item] = 'Q'
self._queued.add(item)
self._done = False
def _kill_item(self, item):
'''Kill a running item'''
self._running.remove(item)
item.kill()
self._killed.add(item)
self.item_to_status[item] = 'K'
def dispatch(self, items):
'''Start (dispatch) each item in the list'''
for item in items:
assert item in self._queued
self._queued.remove(item)
self._running.add(item)
self.item_to_status[item] = 'D'
try:
item.dispatch_cmd()
except DeployError as err:
log.error('{}'.format(err))
self._kill_item(item)
def kill(self):
'''Kill any running items and cancel any that are waiting'''
# Cancel any waiting items. We take a copy of self._queued to avoid
# iterating over the set as we modify it.
for item in [item for item in self._queued]:
self.cancel(item)
# Kill any running items. Again, take a copy of the set to avoid
# modifying it while iterating over it.
for item in [item for item in self._running]:
self._kill_item(item)
def poll(self, hms):
'''Check for running items that have finished
Returns True if something changed.
'''
to_pass = []
to_fail = []
for item in self._running:
status = item.poll()
assert status in ['D', 'P', 'F']
if status == 'D':
# Still running
continue
elif status == 'P':
log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]",
hms, item.target, item.identifier)
to_pass.append(item)
else:
log.error("[%s]: [%s]: [status] [%s: F]",
hms, item.target, item.identifier)
to_fail.append(item)
for item in to_pass:
self._running.remove(item)
self._passed.add(item)
self.item_to_status[item] = 'P'
for item in to_fail:
self._running.remove(item)
self._failed.add(item)
self.item_to_status[item] = 'F'
return to_pass or to_fail
def cancel(self, item):
'''Cancel an item that is currently queued'''
assert item in self._queued
self._queued.remove(item)
self._killed.add(item)
self.item_to_status[item] = 'K'
class Scheduler:
'''An object to run one or more Deploy items'''
print_legend = True
# Max jobs running at one time
max_parallel = 16
# Max jobs dispatched in one go.
slot_limit = 20
def __init__(self, items):
self.timer = Timer()
self.queued_items = []
self.dispatched_items = []
# An ordered dictionary keyed by target ('build', 'run' or similar).
# The value for each target is a TargetScheduler object.
self.schedulers = OrderedDict()
for item in items:
self.add_item(item)
def add_item(self, item):
'''Add a queued item'''
self.queued_items.append(item)
# Like setdefault, but doesn't construct a TargetScheduler object
# unnecessarily.
tgt_scheduler = self.schedulers.get(item.target)
if tgt_scheduler is None:
tgt_scheduler = TargetScheduler()
self.schedulers[item.target] = tgt_scheduler
tgt_scheduler.add_item(item)
def kill(self):
'''Kill any running items and cancel any that are waiting'''
for scheduler in self.schedulers.values():
scheduler.kill()
def poll(self):
'''Update status of running items. Returns true on a change'''
status_changed = False
hms = self.timer.hms()
for scheduler in self.schedulers.values():
if scheduler.poll(hms):
status_changed = True
return status_changed
def dispatch(self):
'''Dispatch some queued items if possible'''
num_slots = min(Scheduler.slot_limit,
Scheduler.max_parallel - Deploy.dispatch_counter,
len(self.queued_items))
if not num_slots:
return
# We only dispatch things for one target at once.
cur_tgt = None
for item in self.dispatched_items:
if item.process is not None:
cur_tgt = item.target
break
to_dispatch = []
while len(to_dispatch) < num_slots and self.queued_items:
next_item = self.queued_items[0]
# Keep track of the current target to make sure we dispatch things
# in phases.
if cur_tgt is None:
cur_tgt = next_item.target
if next_item.target != cur_tgt:
break
self.queued_items = self.queued_items[1:]
# Does next_item have any dependencies? Since we dispatch jobs by
# "target", we can assume that each of those dependencies appears
# earlier in the list than we do.
has_failed_dep = False
for dep in next_item.dependencies:
dep_status = self.schedulers[dep.target].item_to_status[dep]
assert dep_status in ['P', 'F', 'K']
if dep_status in ['F', 'K']:
has_failed_dep = True
break
# If has_failed_dep then at least one of the dependencies has been
# cancelled or has run and failed. Give up on this item too.
if has_failed_dep:
self.schedulers[cur_tgt].cancel(next_item)
continue
to_dispatch.append(next_item)
if not to_dispatch:
return
assert cur_tgt is not None
self.dispatched_items.extend(to_dispatch)
self.schedulers[cur_tgt].dispatch(to_dispatch)
log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s",
self.timer.hms(), cur_tgt,
", ".join(item.identifier for item in to_dispatch))
def check_if_done(self, print_status):
'''Check whether we are finished.
If print_status or we've reached a time interval then print current
status for those that weren't known to be finished already.
'''
if self.timer.check_time():
print_status = True
hms = self.timer.hms()
all_done = True
printed_something = False
for target, scheduler in self.schedulers.items():
was_done, is_done, has_started = scheduler.check_status()
all_done &= is_done
should_print = (print_status and
not (was_done and is_done) and
(has_started or not printed_something))
if should_print:
scheduler.print_counters(target, hms)
printed_something = True
return all_done
def run(self):
'''Run all items
Returns a map from item to status.
'''
# Print the legend just once (at the start of the first run)
if Scheduler.print_legend:
log.info("[legend]: [Q: queued, D: dispatched, "
"P: passed, F: failed, K: killed, T: total]")
Scheduler.print_legend = False
# Catch one SIGINT and tell the scheduler to quit. On a second, die.
stop_now = threading.Event()
old_handler = None
def on_sigint(signal_received, frame):
log.info('Received SIGINT. Exiting gracefully. '
'Send another to force immediate quit '
'(but you may need to manually kill child processes)')
# Restore old handler to catch any second signal
assert old_handler is not None
signal(SIGINT, old_handler)
stop_now.set()
old_handler = signal(SIGINT, on_sigint)
try:
first_time = True
while True:
if stop_now.is_set():
# We've had an interrupt. Kill any jobs that are running,
# then exit.
self.kill()
exit(1)
changed = self.poll()
self.dispatch()
if self.check_if_done(changed or first_time):
break
first_time = False
# This is essentially sleep(1) to wait a second between each
# polling loop. But we do it with a bounded wait on stop_now so
# that we jump back to the polling loop immediately on a
# signal.
stop_now.wait(timeout=1)
finally:
signal(SIGINT, old_handler)
# We got to the end without anything exploding. Extract and return
# results from the schedulers.
results = {}
for scheduler in self.schedulers.values():
results.update(scheduler.item_to_status)
return results
def run(items):
'''Run the given items.
Returns a map from item to status.
'''
return Scheduler(items).run()