blob: d3f345814c279337708bd6db74064b0a4a44bd47 [file] [log] [blame]
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01001# Copyright lowRISC contributors.
2# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3# SPDX-License-Identifier: Apache-2.0
4
5
6from collections import OrderedDict
7import logging as log
Rupert Swarbricka2892a52020-10-12 08:50:08 +01008from signal import SIGINT, signal
9import threading
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010010
11from utils import VERBOSE
Rupert Swarbrick69fa1272020-10-12 16:37:35 +010012from Deploy import DeployError
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010013from Timer import Timer
14
15
Rupert Swarbricka2892a52020-10-12 08:50:08 +010016class TargetScheduler:
17 '''A scheduler for the jobs of a given target'''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010018 def __init__(self, name):
19 self.name = name
20
Rupert Swarbricka2892a52020-10-12 08:50:08 +010021 # Sets of items, split up by their current state. The sets are disjoint
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010022 # and their union equals the keys of self.item_to_status. _queued is a
23 # list so that we dispatch things in order (relevant for things like
24 # tests where we have ordered things cleverly to try to see failures
25 # early)
26 self._queued = []
Rupert Swarbricka2892a52020-10-12 08:50:08 +010027 self._running = set()
28 self._passed = set()
29 self._failed = set()
30 self._killed = set()
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010031
Rupert Swarbricka2892a52020-10-12 08:50:08 +010032 # A map from the Deploy objects tracked by this class to their current
33 # status. This status is 'Q', 'D', 'P', 'F' or 'K', corresponding to
34 # membership in the dicts above.
35 self.item_to_status = {}
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010036
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010037 def add_item(self, item):
Rupert Swarbricka2892a52020-10-12 08:50:08 +010038 assert item not in self.item_to_status
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010039 assert item not in self._queued
Rupert Swarbricka2892a52020-10-12 08:50:08 +010040 self.item_to_status[item] = 'Q'
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010041 self._queued.append(item)
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010042
Rupert Swarbricka2892a52020-10-12 08:50:08 +010043 def _kill_item(self, item):
44 '''Kill a running item'''
45 self._running.remove(item)
46 item.kill()
47 self._killed.add(item)
48 self.item_to_status[item] = 'K'
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010049
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010050 def _poll(self, hms):
Rupert Swarbricka2892a52020-10-12 08:50:08 +010051 '''Check for running items that have finished
52
53 Returns True if something changed.
54
55 '''
56 to_pass = []
57 to_fail = []
58
59 for item in self._running:
60 status = item.poll()
61 assert status in ['D', 'P', 'F']
62 if status == 'D':
63 # Still running
64 continue
65 elif status == 'P':
66 log.log(VERBOSE, "[%s]: [%s]: [status] [%s: P]",
67 hms, item.target, item.identifier)
68 to_pass.append(item)
69 else:
70 log.error("[%s]: [%s]: [status] [%s: F]",
71 hms, item.target, item.identifier)
72 to_fail.append(item)
73
74 for item in to_pass:
75 self._running.remove(item)
76 self._passed.add(item)
77 self.item_to_status[item] = 'P'
78 for item in to_fail:
79 self._running.remove(item)
80 self._failed.add(item)
81 self.item_to_status[item] = 'F'
82
83 return to_pass or to_fail
84
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010085 def _dispatch(self, hms, old_results):
86 '''Dispatch some queued items if possible.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010087
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010088 See run() for the format of old_results.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010089
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010090 '''
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010091 num_slots = min(Scheduler.slot_limit,
Rupert Swarbrick69fa1272020-10-12 16:37:35 +010092 Scheduler.max_parallel - len(self._running),
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010093 len(self._queued))
Rupert Swarbrick69fa1272020-10-12 16:37:35 +010094 if num_slots <= 0:
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010095 return
96
Rupert Swarbrick381770d2020-10-12 08:50:08 +010097 to_dispatch = []
Rupert Swarbricka2892a52020-10-12 08:50:08 +010098
Rupert Swarbrick43bd8422020-10-12 16:26:29 +010099 while len(to_dispatch) < num_slots and self._queued:
100 next_item = self._queued.pop(0)
Rupert Swarbrick381770d2020-10-12 08:50:08 +0100101 # Does next_item have any dependencies? Since we dispatch jobs by
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100102 # target, we can assume that each of those dependencies appears
103 # in old_results.
Cindy Chen23b2ab42021-01-28 08:58:42 -0800104 has_failed_dep = False
Rupert Swarbrick381770d2020-10-12 08:50:08 +0100105 for dep in next_item.dependencies:
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100106 dep_status = old_results[dep]
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100107 assert dep_status in ['P', 'F', 'K']
Cindy Chen924c6062021-01-27 14:27:50 -0800108
109 if next_item.needs_all_dependencies_passing:
110 if dep_status in ['F', 'K']:
111 has_failed_dep = True
112 break
113 else:
Cindy Chen23b2ab42021-01-28 08:58:42 -0800114 # Set has_failed_dep default value to True only if the
115 # next_item has dependencies, and next_item does not require
116 # all dependencies to pass
117 has_failed_dep = True
Cindy Chen924c6062021-01-27 14:27:50 -0800118 if dep_status in ['P']:
119 has_failed_dep = False
120 break
Rupert Swarbrick381770d2020-10-12 08:50:08 +0100121
122 # If has_failed_dep then at least one of the dependencies has been
123 # cancelled or has run and failed. Give up on this item too.
124 if has_failed_dep:
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100125 self._killed.add(next_item)
126 self.item_to_status[next_item] = 'K'
Rupert Swarbrick381770d2020-10-12 08:50:08 +0100127 continue
128
129 to_dispatch.append(next_item)
130
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100131 if not to_dispatch:
132 return
133
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100134 log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s",
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100135 hms, self.name,
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100136 ", ".join(item.identifier for item in to_dispatch))
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100137
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100138 for item in to_dispatch:
139 self._running.add(item)
140 self.item_to_status[item] = 'D'
141 try:
142 item.dispatch_cmd()
143 except DeployError as err:
144 log.error('{}'.format(err))
145 self._kill_item(item)
146
147 def _kill(self):
148 '''Kill any running items and cancel any that are waiting'''
149
150 # Cancel any waiting items. We take a copy of self._queued to avoid
151 # iterating over the set as we modify it.
152 for item in [item for item in self._queued]:
153 self._cancel(item)
154
155 # Kill any running items. Again, take a copy of the set to avoid
156 # modifying it while iterating over it.
157 for item in [item for item in self._running]:
158 self._kill_item(item)
159
160 def _cancel(self, item):
161 '''Cancel an item that is currently queued'''
162 assert item in self._queued
163 self._queued.remove(item)
164 self._killed.add(item)
165 self.item_to_status[item] = 'K'
166
167 def _check_if_done(self, timer, hms, print_status):
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100168 '''Check whether we are finished.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100169
170 If print_status or we've reached a time interval then print current
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100171 status for those jobs that weren't known to be finished already.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100172
173 '''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100174 if timer.check_time():
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100175 print_status = True
176
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100177 if print_status:
178 total_cnt = len(self.item_to_status)
179 width = len(str(total_cnt))
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100180
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100181 field_fmt = '{{:0{}d}}'.format(width)
182 msg_fmt = ('[Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}]'
183 .format(field_fmt))
184 msg = msg_fmt.format(len(self._queued),
185 len(self._running),
186 len(self._passed),
187 len(self._failed),
188 len(self._killed),
189 total_cnt)
190 log.info("[%s]: [%s]: %s", hms, self.name, msg)
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100191
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100192 return not (self._queued or self._running)
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100193
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100194 def run(self, timer, old_results):
195 '''Run the jobs for this target.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100196
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100197 timer is a Timer that was started at the start of the Runner's run.
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100198
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100199 old_results is a dictionary mapping items (from previous targets) to
200 statuses. Every job that appears as a dependency will be in this list
201 (because it ran as part of a previous target).
202
203 is_first_tgt is true if this is the first target to run.
204
205 Returns the results from this target (in the same format).
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100206
207 '''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100208 # Catch one SIGINT and tell the runner to quit. On a second, die.
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100209 stop_now = threading.Event()
210 old_handler = None
211
212 def on_sigint(signal_received, frame):
213 log.info('Received SIGINT. Exiting gracefully. '
214 'Send another to force immediate quit '
215 '(but you may need to manually kill child processes)')
216
217 # Restore old handler to catch any second signal
218 assert old_handler is not None
219 signal(SIGINT, old_handler)
220
221 stop_now.set()
222
223 old_handler = signal(SIGINT, on_sigint)
224
225 try:
226 first_time = True
227 while True:
228 if stop_now.is_set():
229 # We've had an interrupt. Kill any jobs that are running,
230 # then exit.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100231 self._kill()
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100232 exit(1)
233
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100234 hms = timer.hms()
235 changed = self._poll(hms)
236 self._dispatch(hms, old_results)
237 if self._check_if_done(timer, hms, changed or first_time):
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100238 break
239 first_time = False
240
241 # This is essentially sleep(1) to wait a second between each
242 # polling loop. But we do it with a bounded wait on stop_now so
243 # that we jump back to the polling loop immediately on a
244 # signal.
245 stop_now.wait(timeout=1)
246 finally:
247 signal(SIGINT, old_handler)
248
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100249 # We got to the end without anything exploding. Return the results for our jobs.
250 return self.item_to_status
251
252
253class Scheduler:
254 '''An object to run one or more Deploy items'''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100255
256 # Max jobs running at one time
257 max_parallel = 16
258
259 # Max jobs dispatched in one go.
260 slot_limit = 20
261
262 def __init__(self, items):
263 # An ordered dictionary keyed by target ('build', 'run' or similar).
264 # The value for each target is a TargetScheduler object.
265 self.schedulers = OrderedDict()
266
267 for item in items:
268 # This works like setdefault, but doesn't construct a TargetScheduler
269 # object unnecessarily.
270 tgt_scheduler = self.schedulers.get(item.target)
271 if tgt_scheduler is None:
272 tgt_scheduler = TargetScheduler(item.target)
273 self.schedulers[item.target] = tgt_scheduler
274
275 tgt_scheduler.add_item(item)
276
277 def run(self):
278 '''Run all items
279
280 Returns a map from item to status.
281
282 '''
283 timer = Timer()
284
Rupert Swarbrickf2bf3702020-10-12 17:04:46 +0100285 log.info("[legend]: [Q: queued, D: dispatched, "
286 "P: passed, F: failed, K: killed, T: total]")
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100287 results = {}
288 for scheduler in self.schedulers.values():
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100289 results.update(scheduler.run(timer, results))
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100290 return results
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100291
292
293def run(items):
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100294 '''Run the given items.
295
296 Returns a map from item to status.
297
298 '''
299 return Scheduler(items).run()