blob: fce28a28dde60b9d6fdfe8a19dec9a53b748df97 [file]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "iree/task/post_batch.h"
#include "iree/base/internal/math.h"
#include "iree/base/internal/synchronization.h"
#include "iree/base/internal/threading.h"
#include "iree/base/tracing.h"
#include "iree/task/executor_impl.h"
void iree_task_post_batch_initialize(iree_task_executor_t* executor,
iree_task_worker_t* current_worker,
iree_task_post_batch_t* out_post_batch) {
out_post_batch->executor = executor;
out_post_batch->current_worker = current_worker;
out_post_batch->worker_pending_mask = 0;
memset(&out_post_batch->worker_pending_lifos, 0,
executor->worker_count * sizeof(iree_task_list_t));
}
iree_host_size_t iree_task_post_batch_worker_count(
const iree_task_post_batch_t* post_batch) {
return post_batch->executor->worker_count;
}
static iree_host_size_t iree_task_post_batch_select_random_worker(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t affinity_set) {
iree_task_affinity_set_t worker_live_mask =
iree_atomic_task_affinity_set_load(
&post_batch->executor->worker_live_mask, iree_memory_order_relaxed);
iree_task_affinity_set_t valid_worker_mask = affinity_set & worker_live_mask;
if (!valid_worker_mask) {
// No valid workers as desired; for now just bail to worker 0.
return 0;
}
// TODO(benvanik): rotate through workers here. Instead, if the affinity set
// has the current_worker allowed we just use that to avoid needing a
// cross-thread hop.
return iree_task_affinity_set_count_trailing_zeros(valid_worker_mask);
}
iree_host_size_t iree_task_post_batch_select_worker(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t affinity_set) {
if (post_batch->current_worker) {
// Posting from a worker - prefer sending right back to this worker if we
// haven't already scheduled for it.
if ((affinity_set & post_batch->current_worker->worker_bit) &&
!(post_batch->worker_pending_mask &
post_batch->current_worker->worker_bit)) {
return iree_task_affinity_set_count_trailing_zeros(
post_batch->current_worker->worker_bit);
}
}
// Prefer workers that are idle as though they'll need to wake up it is
// guaranteed that they aren't working on something else and the latency of
// waking should (hopefully) be less than the latency of waiting for a
// worker's queue to finish. Note that we only consider workers idle if we
// ourselves in this batch haven't already queued work for them (as then they
// aren't going to be idle).
iree_task_affinity_set_t worker_idle_mask =
iree_atomic_task_affinity_set_load(
&post_batch->executor->worker_idle_mask, iree_memory_order_relaxed);
worker_idle_mask &= ~post_batch->worker_pending_mask;
iree_task_affinity_set_t idle_affinity_set = affinity_set & worker_idle_mask;
if (idle_affinity_set) {
return iree_task_post_batch_select_random_worker(post_batch,
idle_affinity_set);
}
// No more workers are idle; farm out at random. In the worst case work
// stealing will help balance things out on the backend.
return iree_task_post_batch_select_random_worker(post_batch, affinity_set);
}
void iree_task_post_batch_enqueue(iree_task_post_batch_t* post_batch,
iree_host_size_t worker_index,
iree_task_t* task) {
iree_task_list_push_front(&post_batch->worker_pending_lifos[worker_index],
task);
post_batch->worker_pending_mask |=
iree_task_affinity_for_worker(worker_index);
}
// Wakes each worker indicated in the |wake_mask|, if needed.
static void iree_task_post_batch_wake_workers(
iree_task_post_batch_t* post_batch, iree_task_affinity_set_t wake_mask) {
IREE_TRACE_ZONE_BEGIN(z0);
IREE_TRACE_ZONE_APPEND_VALUE(z0, iree_math_count_ones_u64(wake_mask));
iree_task_executor_t* executor = post_batch->executor;
// Wake workers that may be suspended. We fetch the set of workers we need to
// wake (hopefully none in the common case) and mark that we've woken them so
// that we don't double-resume.
iree_task_affinity_set_t resume_mask =
iree_atomic_task_affinity_set_fetch_and(&executor->worker_suspend_mask,
~wake_mask,
iree_memory_order_acquire);
resume_mask &= wake_mask;
if (IREE_UNLIKELY(resume_mask)) {
int resume_count = iree_task_affinity_set_count_ones(resume_mask);
int worker_index = 0;
for (int i = 0; i < resume_count; ++i) {
int offset = iree_task_affinity_set_count_trailing_zeros(resume_mask) + 1;
int resume_index = worker_index + offset;
worker_index += offset + 1;
resume_mask = iree_shr(resume_mask, offset + 1);
iree_thread_resume(executor->workers[resume_index].thread);
}
}
// TODO(#4016): use a FUTEX_WAKE_BITSET here to wake all of the workers that
// have pending work in a single syscall (vs. popcnt(worker_pending_mask)
// syscalls). This will reduce wake latency for workers later in the set;
// for example today worker[31] will wait until workers[0-30] have had their
// syscalls performed before it's even requested to wake. This also loses
// information the kernel could use to avoid core migration as it knows when N
// threads will be needed simultaneously and can hopefully perform any needed
// migrations prior to beginning execution.
int wake_count = iree_task_affinity_set_count_ones(wake_mask);
int worker_index = 0;
for (int i = 0; i < wake_count; ++i) {
int offset = iree_task_affinity_set_count_trailing_zeros(wake_mask);
int wake_index = worker_index + offset;
worker_index += offset + 1;
wake_mask = iree_shr(wake_mask, offset + 1);
// Wake workers if they are waiting - workers are the only thing that can
// wait on this notification so this should almost always be either free (an
// atomic load) if a particular worker isn't waiting or it's required to
// actually wake it and we can't avoid it.
iree_task_worker_t* worker = &executor->workers[wake_index];
iree_notification_post(&worker->wake_notification, 1);
}
IREE_TRACE_ZONE_END(z0);
}
bool iree_task_post_batch_submit(iree_task_post_batch_t* post_batch) {
if (!post_batch->worker_pending_mask) return false;
IREE_TRACE_ZONE_BEGIN(z0);
// Run through each worker that has a bit set in the pending mask and post
// the pending tasks.
iree_task_affinity_set_t worker_mask = post_batch->worker_pending_mask;
post_batch->worker_pending_mask = 0;
int worker_index = 0;
int post_count = iree_task_affinity_set_count_ones(worker_mask);
iree_task_affinity_set_t worker_wake_mask = 0;
for (int i = 0; i < post_count; ++i) {
int offset = iree_task_affinity_set_count_trailing_zeros(worker_mask);
int target_index = worker_index + offset;
worker_index += offset + 1;
worker_mask = iree_shr(worker_mask, offset + 1);
iree_task_worker_t* worker = &post_batch->executor->workers[target_index];
iree_task_list_t* target_pending_lifo =
&post_batch->worker_pending_lifos[target_index];
if (worker == post_batch->current_worker) {
// Fast-path for posting to self; this happens when a worker plays the
// role of coordinator and we want to ensure we aren't doing a fully
// block-and-flush loop when we could just be popping the next new task
// off the list.
iree_task_queue_append_from_lifo_list_unsafe(&worker->local_task_queue,
target_pending_lifo);
} else {
iree_task_worker_post_tasks(worker, target_pending_lifo);
worker_wake_mask |= iree_task_affinity_for_worker(target_index);
}
}
// Wake all workers that now have pending work. If a worker is not already
// waiting this will be cheap (no syscall).
if (worker_wake_mask != 0) {
iree_task_post_batch_wake_workers(post_batch, worker_wake_mask);
}
IREE_TRACE_ZONE_END(z0);
return post_count != 0;
}