blob: ad880ec8f189f7c6b0bb4d07cd8aa241b2f9328d [file] [log] [blame]
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01001# Copyright lowRISC contributors.
2# Licensed under the Apache License, Version 2.0, see LICENSE for details.
3# SPDX-License-Identifier: Apache-2.0
4
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01005import logging as log
Rupert Swarbricka2892a52020-10-12 08:50:08 +01006import threading
Srikrishna Iyer4c472c62021-02-19 12:03:18 -08007from signal import SIGINT, signal
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +01008
Srikrishna Iyer65783742021-02-10 21:42:12 -08009from Launcher import LauncherError
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080010from StatusPrinter import get_status_printer
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010011from Timer import Timer
Srikrishna Iyer4c472c62021-02-19 12:03:18 -080012from utils import VERBOSE
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +010013
14
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080015# Sum of lenghts of all lists in the given dict.
16def sum_dict_lists(d):
17 '''Given a dict whose key values are lists, return sum of lengths of
18 thoese lists.'''
19 return sum([len(d[k]) for k in d])
20
21
Srikrishna Iyer206721c2021-04-15 15:23:08 -070022def get_next_item(arr, index):
23 '''Perpetually get an item from a list.
24
25 Returns the next item on the list by advancing the index by 1. If the index
26 is already the last item on the list, it loops back to the start, thus
27 implementing a circular list.
28
29 arr is a subscriptable list.
30 index is the index of the last item returned.
31
32 Returns (item, index) if successful.
33 Raises IndexError if arr is empty.
34 '''
35 index += 1
36 try:
37 item = arr[index]
38 except IndexError:
39 index = 0
40 try:
41 item = arr[index]
42 except IndexError:
43 raise IndexError("List is empty!")
44
45 return item, index
46
47
Srikrishna Iyer7c480562021-02-11 01:00:24 -080048class Scheduler:
49 '''An object that runs one or more Deploy items'''
Srikrishna Iyer206721c2021-04-15 15:23:08 -070050 def __init__(self, items, launcher_cls):
Srikrishna Iyerc7a50442021-03-25 21:47:56 -070051 self.items = items
52
Srikrishna Iyer7c480562021-02-11 01:00:24 -080053 # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen
54 # target and cfg. As items in _scheduled are ready to be run (once
55 # their dependencies pass), they are moved to the _queued list, where
56 # they wait until slots are available for them to be dispatched.
57 # When all items (in all cfgs) of a target are done, it is removed from
58 # this dictionary.
59 self._scheduled = {}
Srikrishna Iyerc7a50442021-03-25 21:47:56 -070060 self.add_to_scheduled(items)
Srikrishna Iyer7c480562021-02-11 01:00:24 -080061
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080062 # Print status periodically using an external status printer.
63 self.status_printer = get_status_printer()
64 self.status_printer.print_header(
65 msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, "
66 "T: total")
67
Srikrishna Iyer7c480562021-02-11 01:00:24 -080068 # Sets of items, split up by their current state. The sets are
69 # disjoint and their union equals the keys of self.item_to_status.
70 # _queued is a list so that we dispatch things in order (relevant
71 # for things like tests where we have ordered things cleverly to
72 # try to see failures early). They are maintained for each target.
Srikrishna Iyer206721c2021-04-15 15:23:08 -070073
74 # The list of available targets and the list of running items in each
75 # target are polled in a circular fashion, looping back to the start.
76 # This is done to allow us to poll a smaller subset of jobs rather than
77 # the entire regression. We keep rotating through our list of running
78 # items, picking up where we left off on the last poll.
79 self._targets = list(self._scheduled.keys())
Srikrishna Iyer7c480562021-02-11 01:00:24 -080080 self._queued = {}
81 self._running = {}
82 self._passed = {}
83 self._failed = {}
84 self._killed = {}
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080085 self._total = {}
Srikrishna Iyer206721c2021-04-15 15:23:08 -070086 self.last_target_polled_idx = -1
87 self.last_item_polled_idx = {}
Srikrishna Iyer7c480562021-02-11 01:00:24 -080088 for target in self._scheduled:
89 self._queued[target] = []
Srikrishna Iyer206721c2021-04-15 15:23:08 -070090 self._running[target] = []
Srikrishna Iyer7c480562021-02-11 01:00:24 -080091 self._passed[target] = set()
92 self._failed[target] = set()
93 self._killed[target] = set()
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080094 self._total[target] = sum_dict_lists(self._scheduled[target])
Srikrishna Iyer206721c2021-04-15 15:23:08 -070095 self.last_item_polled_idx[target] = -1
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -080096
97 # Stuff for printing the status.
98 width = len(str(self._total[target]))
99 field_fmt = '{{:0{}d}}'.format(width)
100 self.msg_fmt = 'Q: {0}, D: {0}, P: {0}, F: {0}, K: {0}, T: {0}'.format(
101 field_fmt)
102 msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target])
103 self.status_printer.init_target(target=target, msg=msg)
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800104
105 # A map from the Deploy objects tracked by this class to their
106 # current status. This status is 'Q', 'D', 'P', 'F' or 'K',
107 # corresponding to membership in the dicts above. This is not
108 # per-target.
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100109 self.item_to_status = {}
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100110
Srikrishna Iyerd5623772021-06-18 22:23:07 -0700111 # Create the launcher instance for all items.
112 for item in self.items:
113 item.create_launcher()
114
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700115 # The chosen launcher class. This allows us to access launcher
116 # variant-specific settings such as max parallel jobs & poll rate.
117 self.launcher_cls = launcher_cls
118
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800119 def run(self):
120 '''Run all scheduled jobs and return the results.
Rupert Swarbrickc0e753d2020-06-04 13:27:58 +0100121
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800122 Returns the results (status) of all items dispatched for all
123 targets and cfgs.
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100124 '''
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100125
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800126 timer = Timer()
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100127
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100128 # Catch one SIGINT and tell the runner to quit. On a second, die.
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100129 stop_now = threading.Event()
130 old_handler = None
131
132 def on_sigint(signal_received, frame):
133 log.info('Received SIGINT. Exiting gracefully. '
134 'Send another to force immediate quit '
135 '(but you may need to manually kill child processes)')
136
137 # Restore old handler to catch any second signal
138 assert old_handler is not None
139 signal(SIGINT, old_handler)
140
141 stop_now.set()
142
143 old_handler = signal(SIGINT, on_sigint)
144
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800145 # Enqueue all items of the first target.
146 self._enqueue_successors(None)
147
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100148 try:
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100149 while True:
150 if stop_now.is_set():
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800151 # We've had an interrupt. Kill any jobs that are running.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100152 self._kill()
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100153
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100154 hms = timer.hms()
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800155 changed = self._poll(hms) or timer.check_time()
156 self._dispatch(hms)
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800157 if changed:
158 if self._check_if_done(hms):
159 break
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100160
161 # This is essentially sleep(1) to wait a second between each
162 # polling loop. But we do it with a bounded wait on stop_now so
163 # that we jump back to the polling loop immediately on a
164 # signal.
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700165 stop_now.wait(timeout=self.launcher_cls.poll_freq)
166
Rupert Swarbricka2892a52020-10-12 08:50:08 +0100167 finally:
168 signal(SIGINT, old_handler)
169
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800170 # Cleaup the status printer.
171 self.status_printer.exit()
172
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800173 # We got to the end without anything exploding. Return the results.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100174 return self.item_to_status
175
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700176 def add_to_scheduled(self, items):
177 '''Add items to the list of _scheduled.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100178
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700179 'items' is a list of Deploy objects.
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100180 '''
Rupert Swarbrick43bd8422020-10-12 16:26:29 +0100181
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700182 for item in items:
183 target_dict = self._scheduled.setdefault(item.target, {})
184 cfg_list = target_dict.setdefault(item.sim_cfg, [])
185 if item not in cfg_list:
186 cfg_list.append(item)
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800187
188 def _remove_from_scheduled(self, item):
189 '''Removes the item from _scheduled[target][cfg] list.
190
191 When all items in _scheduled[target][cfg] are finally removed, the cfg
192 key is deleted.
193 '''
194 target_dict = self._scheduled[item.target]
195 cfg_list = target_dict.get(item.sim_cfg)
196 if cfg_list is not None:
197 try:
198 cfg_list.remove(item)
199 except ValueError:
200 pass
201 if not cfg_list:
202 del target_dict[item.sim_cfg]
203
204 def _get_next_target(self, curr_target):
205 '''Returns the target that succeeds the current one.
206
207 curr_target is the target of the job that just completed (example -
208 build). If it is None, then the first target in _scheduled is returned.
209 '''
210
211 if curr_target is None:
212 return next(iter(self._scheduled))
213
214 assert curr_target in self._scheduled
215 target_iterator = iter(self._scheduled)
216 target = next(target_iterator)
217
218 found = False
219 while not found:
220 if target == curr_target:
221 found = True
222 try:
223 target = next(target_iterator)
224 except StopIteration:
225 return None
226
227 return target
228
229 def _enqueue_successors(self, item=None):
230 '''Move an item's successors from _scheduled to _queued.
231
232 'item' is the recently run job that has completed. If None, then we
233 move all available items in all available cfgs in _scheduled's first
234 target. If 'item' is specified, then we find its successors and move
235 them to _queued.
236 '''
237
238 for next_item in self._get_successors(item):
239 assert next_item not in self.item_to_status
240 assert next_item not in self._queued[next_item.target]
241 self.item_to_status[next_item] = 'Q'
242 self._queued[next_item.target].append(next_item)
243 self._remove_from_scheduled(next_item)
244
245 def _cancel_successors(self, item):
246 '''Cancel an item's successors recursively by moving them from
247 _scheduled or _queued to _killed.'''
248
249 items = self._get_successors(item)
250 while items:
251 next_item = items.pop()
252 self._cancel_item(next_item, cancel_successors=False)
253 items.extend(self._get_successors(next_item))
254
255 def _get_successors(self, item=None):
256 '''Find immediate successors of an item.
257
258 'item' is a job that has completed. We choose the target that follows
259 the 'item''s current target and find the list of successors whose
260 dependency list contains 'item'. If 'item' is None, we pick successors
261 from all cfgs, else we pick successors only from the cfg to which the
262 item belongs.
263
264 Returns the list of item's successors, or an empty list if there are
265 none.
266 '''
267
268 if item is None:
269 target = self._get_next_target(None)
270 cfgs = set(self._scheduled[target])
271 else:
272 target = self._get_next_target(item.target)
273 cfgs = {item.sim_cfg}
274
275 if target is None:
276 return []
277
278 # Find item's successors that can be enqueued. We assume here that
279 # only the immediately succeeding target can be enqueued at this
280 # time.
281 successors = []
282 for cfg in cfgs:
283 for next_item in self._scheduled[target][cfg]:
284 if item is not None:
285 # Something is terribly wrong if item exists but the
286 # next_item's dependency list is empty.
287 assert next_item.dependencies
288 if item not in next_item.dependencies:
289 continue
290
291 if self._ok_to_enqueue(next_item):
292 successors.append(next_item)
293
294 return successors
295
296 def _ok_to_enqueue(self, item):
297 '''Returns true if ALL dependencies of item are complete.'''
298
299 for dep in item.dependencies:
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700300 # Ignore dependencies that were not scheduled to run.
301 if dep not in self.items:
302 continue
303
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800304 # Has the dep even been enqueued?
305 if dep not in self.item_to_status:
306 return False
307
308 # Has the dep completed?
309 if self.item_to_status[dep] not in ["P", "F", "K"]:
310 return False
311
312 return True
313
314 def _ok_to_run(self, item):
315 '''Returns true if the required dependencies have passed.
316
317 The item's needs_all_dependencies_passing setting is used to figure
318 out whether we can run this item or not, based on its dependent jobs'
319 statuses.
320 '''
321 # 'item' can run only if its dependencies have passed (their results
322 # should already show up in the item to status map).
323 for dep in item.dependencies:
Srikrishna Iyerc7a50442021-03-25 21:47:56 -0700324 # Ignore dependencies that were not scheduled to run.
325 if dep not in self.items:
326 continue
327
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800328 dep_status = self.item_to_status[dep]
329 assert dep_status in ['P', 'F', 'K']
330
331 if item.needs_all_dependencies_passing:
332 if dep_status in ['F', 'K']:
333 return False
334 else:
335 if dep_status in ['P']:
336 return True
337
338 return item.needs_all_dependencies_passing
339
340 def _poll(self, hms):
341 '''Check for running items that have finished
342
343 Returns True if something changed.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800344 '''
345
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700346 max_poll = min(self.launcher_cls.max_poll,
347 sum_dict_lists(self._running))
348
Srikrishna Iyerc9e4ee82021-04-22 00:18:25 -0700349 # If there are no jobs running, we are likely done (possibly because
350 # of a SIGINT). Since poll() was called anyway, signal that something
351 # has indeed changed.
352 if not max_poll:
353 return True
354
355 changed = False
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700356 while max_poll:
357 target, self.last_target_polled_idx = get_next_item(
358 self._targets, self.last_target_polled_idx)
359
360 while self._running[target] and max_poll:
361 max_poll -= 1
362 item, self.last_item_polled_idx[target] = get_next_item(
363 self._running[target], self.last_item_polled_idx[target])
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800364 status = item.launcher.poll()
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700365 level = VERBOSE
366
367 assert status in ['D', 'P', 'F', 'K']
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800368 if status == 'D':
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800369 continue
370 elif status == 'P':
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700371 self._passed[target].add(item)
372 elif status == 'F':
373 self._failed[target].add(item)
374 level = log.ERROR
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800375 else:
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700376 self._killed[target].add(item)
377 level = log.ERROR
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800378
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700379 self._running[target].pop(self.last_item_polled_idx[target])
380 self.last_item_polled_idx[target] -= 1
381 self.item_to_status[item] = status
382 log.log(level, "[%s]: [%s]: [status] [%s: %s]", hms, target,
383 item.full_name, status)
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800384
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700385 # Enqueue item's successors regardless of its status.
386 #
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800387 # It may be possible that a failed item's successor may not
388 # need all of its dependents to pass (if it has other dependent
389 # jobs). Hence we enqueue all successors rather than canceling
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700390 # them right here. We leave it to _dispatch() to figure out
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800391 # whether an enqueued item can be run or not.
392 self._enqueue_successors(item)
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700393 changed = True
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800394
395 return changed
396
397 def _dispatch(self, hms):
398 '''Dispatch some queued items if possible.'''
399
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700400 slots = self.launcher_cls.max_parallel - sum_dict_lists(self._running)
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800401 if slots <= 0:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800402 return
403
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800404 # Compute how many slots to allocate to each target based on their
405 # weights.
406 sum_weight = 0
407 slots_filled = 0
408 total_weight = sum(self._queued[t][0].weight for t in self._queued
409 if self._queued[t])
410
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800411 for target in self._scheduled:
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800412 if not self._queued[target]:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800413 continue
414
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800415 # N slots are allocated to M targets each with W(m) weights with
416 # the formula:
417 #
418 # N(m) = N * W(m) / T, where,
419 # T is the sum total of all weights.
420 #
421 # This is however, problematic due to fractions. Even after
422 # rounding off to the nearest digit, slots may not be fully
423 # utilized (one extra left). An alternate approach that avoids this
424 # problem is as follows:
425 #
426 # N(m) = (N * S(W(m)) / T) - F(m), where,
427 # S(W(m)) is the running sum of weights upto current target m.
428 # F(m) is the running total of slots filled.
429 #
430 # The computed slots per target is nearly identical to the first
431 # solution, except that it prioritizes the slot allocation to
432 # targets that are earlier in the list such that in the end, all
433 # slots are fully consumed.
434 sum_weight += self._queued[target][0].weight
435 target_slots = round(
436 (slots * sum_weight) / total_weight) - slots_filled
437 if target_slots <= 0:
438 continue
439 slots_filled += target_slots
440
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800441 to_dispatch = []
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800442 while self._queued[target] and target_slots > 0:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800443 next_item = self._queued[target].pop(0)
444 if not self._ok_to_run(next_item):
445 self._cancel_item(next_item, cancel_successors=False)
446 self._enqueue_successors(next_item)
447 continue
448
449 to_dispatch.append(next_item)
Srikrishna Iyer1d2899f2021-02-11 18:23:15 -0800450 target_slots -= 1
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800451
452 if not to_dispatch:
453 continue
454
455 log.log(VERBOSE, "[%s]: [%s]: [dispatch]:\n%s", hms, target,
456 ", ".join(item.full_name for item in to_dispatch))
457
458 for item in to_dispatch:
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700459 self._running[target].append(item)
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800460 self.item_to_status[item] = 'D'
461 try:
462 item.launcher.launch()
463 except LauncherError as err:
464 log.error('{}'.format(err))
465 self._kill_item(item)
466
467 def _kill(self):
468 '''Kill any running items and cancel any that are waiting'''
469
470 # Cancel any waiting items. We take a copy of self._queued to avoid
471 # iterating over the set as we modify it.
472 for target in self._queued:
473 for item in [item for item in self._queued[target]]:
474 self._cancel_item(item)
475
476 # Kill any running items. Again, take a copy of the set to avoid
477 # modifying it while iterating over it.
Srikrishna Iyer206721c2021-04-15 15:23:08 -0700478 for target in self._running:
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800479 for item in [item for item in self._running[target]]:
480 self._kill_item(item)
481
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800482 def _check_if_done(self, hms):
483 '''Check if we are done executing all jobs.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800484
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800485 Also, prints the status of currently running jobs.
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800486 '''
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800487
488 done = True
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800489 for target in self._scheduled:
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800490 done_cnt = sum([
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800491 len(self._passed[target]),
492 len(self._failed[target]),
493 len(self._killed[target])
494 ])
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800495 done = done and (done_cnt == self._total[target])
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800496
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800497 # Skip if a target has not even begun executing.
498 if not (self._queued[target] or self._running[target] or
499 done_cnt > 0):
500 continue
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800501
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800502 perc = done_cnt / self._total[target] * 100
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800503
Srikrishna Iyerdf0936b2021-02-14 16:36:02 -0800504 msg = self.msg_fmt.format(len(self._queued[target]),
505 len(self._running[target]),
506 len(self._passed[target]),
507 len(self._failed[target]),
508 len(self._killed[target]),
509 self._total[target])
510 self.status_printer.update_target(target=target,
511 msg=msg,
512 hms=hms,
513 perc=perc)
514 return done
Srikrishna Iyer7c480562021-02-11 01:00:24 -0800515
516 def _cancel_item(self, item, cancel_successors=True):
517 '''Cancel an item and optionally all of its successors.
518
519 Supplied item may be in _scheduled list or the _queued list. From
520 either, we move it straight to _killed.
521 '''
522
523 self.item_to_status[item] = 'K'
524 self._killed[item.target].add(item)
525 if item in self._queued[item.target]:
526 self._queued[item.target].remove(item)
527 else:
528 self._remove_from_scheduled(item)
529
530 if cancel_successors:
531 self._cancel_successors(item)
532
533 def _kill_item(self, item):
534 '''Kill a running item and cancel all of its successors.'''
535
536 item.launcher.kill()
537 self.item_to_status[item] = 'K'
538 self._killed[item.target].add(item)
539 self._running[item.target].remove(item)
540 self._cancel_successors(item)