blob: 0823e0b1c49ac2a15f90cf5353de5b90a3648f17 [file] [log] [blame]
// Copyright 2020 The IREE Authors
//
// Licensed under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#include "iree/task/post_batch.h"
#include <string.h>
#include "iree/base/api.h"
#include "iree/base/internal/math.h"
#include "iree/base/internal/synchronization.h"
#include "iree/base/internal/threading.h"
#include "iree/task/executor_impl.h"
#include "iree/task/queue.h"
#include "iree/task/worker.h"
void iree_task_post_batch_initialize(iree_task_executor_t* executor,
iree_task_worker_t* current_worker,
iree_task_post_batch_t* out_post_batch) {
out_post_batch->executor = executor;
out_post_batch->current_worker = current_worker;
out_post_batch->worker_pending_mask = 0;
memset(&out_post_batch->worker_pending_lifos, 0,
executor->worker_count * sizeof(iree_task_list_t));
}
iree_host_size_t iree_task_post_batch_worker_count(
const iree_task_post_batch_t* post_batch) {
return post_batch->executor->worker_count;
}
static iree_host_size_t iree_task_post_batch_select_random_worker(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t affinity_set) {
// The masks are accessed with 'relaxed' order because they are just hints.
iree_task_affinity_set_t worker_live_mask =
iree_atomic_task_affinity_set_load(
&post_batch->executor->worker_live_mask, iree_memory_order_relaxed);
iree_task_affinity_set_t valid_worker_mask = affinity_set & worker_live_mask;
if (!valid_worker_mask) {
// No valid workers as desired; for now just bail to worker 0.
return 0;
}
// TODO(benvanik): rotate through workers here. Instead, if the affinity set
// has the current_worker allowed we just use that to avoid needing a
// cross-thread hop.
return iree_task_affinity_set_count_trailing_zeros(valid_worker_mask);
}
iree_host_size_t iree_task_post_batch_select_worker(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t affinity_set) {
if (post_batch->current_worker) {
// Posting from a worker - prefer sending right back to this worker if we
// haven't already scheduled for it.
if ((affinity_set & post_batch->current_worker->worker_bit) &&
!(post_batch->worker_pending_mask &
post_batch->current_worker->worker_bit)) {
return iree_task_affinity_set_count_trailing_zeros(
post_batch->current_worker->worker_bit);
}
}
// Prefer workers that are idle as though they'll need to wake up it is
// guaranteed that they aren't working on something else and the latency of
// waking should (hopefully) be less than the latency of waiting for a
// worker's queue to finish. Note that we only consider workers idle if we
// ourselves in this batch haven't already queued work for them (as then they
// aren't going to be idle).
// The masks are accessed with 'relaxed' order because they are just hints.
iree_task_affinity_set_t worker_idle_mask =
iree_atomic_task_affinity_set_load(
&post_batch->executor->worker_idle_mask, iree_memory_order_relaxed);
worker_idle_mask &= ~post_batch->worker_pending_mask;
iree_task_affinity_set_t idle_affinity_set = affinity_set & worker_idle_mask;
if (idle_affinity_set) {
return iree_task_post_batch_select_random_worker(post_batch,
idle_affinity_set);
}
// No more workers are idle; farm out at random. In the worst case work
// stealing will help balance things out on the backend.
return iree_task_post_batch_select_random_worker(post_batch, affinity_set);
}
void iree_task_post_batch_enqueue(iree_task_post_batch_t* post_batch,
iree_host_size_t worker_index,
iree_task_t* task) {
iree_task_list_push_front(&post_batch->worker_pending_lifos[worker_index],
task);
post_batch->worker_pending_mask |=
iree_task_affinity_for_worker(worker_index);
}
// Wakes each worker indicated in the |wake_mask|, if needed.
static void iree_task_post_batch_wake_workers(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t wake_mask) {
IREE_TRACE_ZONE_BEGIN(z0);
IREE_TRACE_ZONE_APPEND_VALUE_I64(z0, iree_math_count_ones_u64(wake_mask));
// TODO(#4016): use a FUTEX_WAKE_BITSET here to wake all of the workers that
// have pending work in a single syscall (vs. popcnt(worker_pending_mask)
// syscalls). This will reduce wake latency for workers later in the set;
// for example today worker[31] will wait until workers[0-30] have had their
// syscalls performed before it's even requested to wake. This also loses
// information the kernel could use to avoid core migration as it knows when N
// threads will be needed simultaneously and can hopefully perform any needed
// migrations prior to beginning execution.
iree_task_executor_t* executor = post_batch->executor;
int wake_count = iree_task_affinity_set_count_ones(wake_mask);
int worker_index = 0;
for (int i = 0; i < wake_count; ++i) {
int offset = iree_task_affinity_set_count_trailing_zeros(wake_mask);
int wake_index = worker_index + offset;
worker_index += offset + 1;
wake_mask = iree_shr(wake_mask, offset + 1);
// Wake workers if they are waiting - workers are the only thing that can
// wait on this notification so this should almost always be either free (an
// atomic load) if a particular worker isn't waiting or it's required to
// actually wake it and we can't avoid it.
iree_task_worker_t* worker = &executor->workers[wake_index];
iree_notification_post(&worker->wake_notification, 1);
}
IREE_TRACE_ZONE_END(z0);
}
bool iree_task_post_batch_submit(iree_task_post_batch_t* post_batch) {
if (!post_batch->worker_pending_mask) return false;
IREE_TRACE_ZONE_BEGIN(z0);
// Run through each worker that has a bit set in the pending mask and post
// the pending tasks.
iree_task_affinity_set_t worker_mask = post_batch->worker_pending_mask;
post_batch->worker_pending_mask = 0;
int worker_index = 0;
int post_count = iree_task_affinity_set_count_ones(worker_mask);
iree_task_affinity_set_t worker_wake_mask = 0;
for (int i = 0; i < post_count; ++i) {
int offset = iree_task_affinity_set_count_trailing_zeros(worker_mask);
int target_index = worker_index + offset;
worker_index += offset + 1;
worker_mask = iree_shr(worker_mask, offset + 1);
iree_task_worker_t* worker = &post_batch->executor->workers[target_index];
iree_task_list_t* target_pending_lifo =
&post_batch->worker_pending_lifos[target_index];
if (worker == post_batch->current_worker) {
// Fast-path for posting to self; this happens when a worker plays the
// role of coordinator and we want to ensure we aren't doing a fully
// block-and-flush loop when we could just be popping the next new task
// off the list.
iree_task_queue_append_from_lifo_list_unsafe(&worker->local_task_queue,
target_pending_lifo);
} else {
iree_task_worker_post_tasks(worker, target_pending_lifo);
worker_wake_mask |= iree_task_affinity_for_worker(target_index);
}
}
// Wake all workers that now have pending work. If a worker is not already
// waiting this will be cheap (no syscall).
if (worker_wake_mask != 0) {
iree_task_post_batch_wake_workers(post_batch, worker_wake_mask);
}
IREE_TRACE_ZONE_END(z0);
return post_count != 0;
}