| // Copyright 2020 The IREE Authors |
| // |
| // Licensed under the Apache License v2.0 with LLVM Exceptions. |
| // See https://llvm.org/LICENSE.txt for license information. |
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| |
| #include "iree/task/worker.h" |
| |
| #include <stdbool.h> |
| #include <string.h> |
| |
| #include "iree/base/internal/fpu_state.h" |
| #include "iree/base/internal/math.h" |
| #include "iree/task/executor_impl.h" |
| #include "iree/task/post_batch.h" |
| #include "iree/task/submission.h" |
| #include "iree/task/task_impl.h" |
| #include "iree/task/tuning.h" |
| |
| #define IREE_TASK_WORKER_MIN_STACK_SIZE (32 * 1024) |
| |
| static int iree_task_worker_main(iree_task_worker_t* worker); |
| |
| iree_status_t iree_task_worker_initialize( |
| iree_task_executor_t* executor, iree_host_size_t worker_index, |
| const iree_task_topology_group_t* topology_group, |
| iree_host_size_t stack_size, iree_byte_span_t local_memory, |
| iree_prng_splitmix64_state_t* seed_prng, iree_task_worker_t* out_worker) { |
| IREE_TRACE_ZONE_BEGIN(z0); |
| |
| out_worker->executor = executor; |
| out_worker->worker_index = executor->worker_base_index + worker_index; |
| out_worker->worker_bit = iree_task_affinity_for_worker(worker_index); |
| out_worker->ideal_thread_affinity = topology_group->ideal_thread_affinity; |
| out_worker->constructive_sharing_mask = |
| topology_group->constructive_sharing_mask; |
| out_worker->max_theft_attempts = |
| executor->worker_count / IREE_TASK_EXECUTOR_MAX_THEFT_ATTEMPTS_DIVISOR; |
| iree_prng_minilcg128_initialize(iree_prng_splitmix64_next(seed_prng), |
| &out_worker->theft_prng); |
| out_worker->local_memory = local_memory; |
| out_worker->processor_id = 0; |
| out_worker->processor_tag = 0; |
| |
| iree_notification_initialize(&out_worker->wake_notification); |
| iree_notification_initialize(&out_worker->state_notification); |
| iree_atomic_task_slist_initialize(&out_worker->mailbox_slist); |
| iree_task_queue_initialize(&out_worker->local_task_queue); |
| |
| iree_task_worker_state_t initial_state = IREE_TASK_WORKER_STATE_RUNNING; |
| iree_atomic_store_int32(&out_worker->state, initial_state, |
| iree_memory_order_release); |
| |
| iree_thread_create_params_t thread_params; |
| memset(&thread_params, 0, sizeof(thread_params)); |
| thread_params.name = iree_make_cstring_view(topology_group->name); |
| thread_params.create_suspended = false; |
| thread_params.priority_class = IREE_THREAD_PRIORITY_CLASS_NORMAL; |
| thread_params.initial_affinity = out_worker->ideal_thread_affinity; |
| thread_params.stack_size = |
| iree_max(IREE_TASK_WORKER_MIN_STACK_SIZE, stack_size); |
| |
| // NOTE: if the thread creation fails we'll bail here and let the caller |
| // cleanup by calling deinitialize (which is safe because we zero init |
| // everything). |
| iree_status_t status = iree_thread_create( |
| (iree_thread_entry_t)iree_task_worker_main, out_worker, thread_params, |
| executor->allocator, &out_worker->thread); |
| |
| IREE_TRACE_ZONE_END(z0); |
| return status; |
| } |
| |
| void iree_task_worker_request_exit(iree_task_worker_t* worker) { |
| if (!worker->thread) return; |
| IREE_TRACE_ZONE_BEGIN(z0); |
| |
| // If the thread is already in the exiting/zombie state we don't need to do |
| // anything. |
| iree_task_worker_state_t prev_state = |
| (iree_task_worker_state_t)iree_atomic_exchange_int32( |
| &worker->state, IREE_TASK_WORKER_STATE_EXITING, |
| iree_memory_order_acq_rel); |
| switch (prev_state) { |
| case IREE_TASK_WORKER_STATE_ZOMBIE: |
| // Worker already exited; reset state to ZOMBIE. |
| iree_atomic_store_int32(&worker->state, IREE_TASK_WORKER_STATE_ZOMBIE, |
| iree_memory_order_release); |
| break; |
| default: |
| // Worker now set to EXITING and should exit soon. |
| break; |
| } |
| |
| // Kick the worker in case it is waiting for work. |
| iree_notification_post(&worker->wake_notification, 1); |
| |
| IREE_TRACE_ZONE_END(z0); |
| } |
| |
| // Returns true if the worker is in the zombie state (exited and awaiting |
| // teardown). |
| static bool iree_task_worker_is_zombie(iree_task_worker_t* worker) { |
| return iree_atomic_load_int32(&worker->state, iree_memory_order_acquire) == |
| IREE_TASK_WORKER_STATE_ZOMBIE; |
| } |
| |
| void iree_task_worker_await_exit(iree_task_worker_t* worker) { |
| if (!worker->thread) return; |
| IREE_TRACE_ZONE_BEGIN(z0); |
| |
| iree_task_worker_request_exit(worker); |
| iree_notification_await(&worker->state_notification, |
| (iree_condition_fn_t)iree_task_worker_is_zombie, |
| worker, iree_infinite_timeout()); |
| |
| IREE_TRACE_ZONE_END(z0); |
| } |
| |
| void iree_task_worker_deinitialize(iree_task_worker_t* worker) { |
| IREE_TRACE_ZONE_BEGIN(z0); |
| |
| // Must have called request_exit/await_exit. |
| IREE_ASSERT_TRUE(iree_task_worker_is_zombie(worker)); |
| |
| iree_thread_release(worker->thread); |
| worker->thread = NULL; |
| |
| // Release unfinished tasks by flushing the mailbox (which if we're here can't |
| // get anything more posted to it) and then discarding everything we still |
| // have a reference to. |
| iree_atomic_task_slist_discard(&worker->mailbox_slist); |
| iree_task_list_discard(&worker->local_task_queue.list); |
| |
| iree_notification_deinitialize(&worker->wake_notification); |
| iree_notification_deinitialize(&worker->state_notification); |
| iree_atomic_task_slist_deinitialize(&worker->mailbox_slist); |
| iree_task_queue_deinitialize(&worker->local_task_queue); |
| |
| IREE_TRACE_ZONE_END(z0); |
| } |
| |
| // Marks the worker as "active" (scheduling work or executing it). |
| // The idle mask is accessed with 'relaxed' order because it's just a hint. |
| static void iree_task_worker_mark_active(iree_task_worker_t* worker) { |
| iree_task_affinity_set_t old_idle_mask = |
| iree_atomic_task_affinity_set_fetch_and( |
| &worker->executor->worker_idle_mask, ~worker->worker_bit, |
| iree_memory_order_relaxed); |
| (void)old_idle_mask; |
| IREE_TRACE_PLOT_VALUE_F32( |
| worker->executor->trace_name, |
| old_idle_mask |
| ? (100.0f - |
| 100.0f * (iree_task_affinity_set_count_ones(old_idle_mask) - 1) / |
| (float)worker->executor->worker_count) |
| : 100.0f); |
| } |
| |
| // Marks the worker as "idle" (sleeping/spinning waiting to wake). |
| // The idle mask is accessed with 'relaxed' order because it's just a hint. |
| static void iree_task_worker_mark_idle(iree_task_worker_t* worker) { |
| iree_task_affinity_set_t old_idle_mask = |
| iree_atomic_task_affinity_set_fetch_or( |
| &worker->executor->worker_idle_mask, worker->worker_bit, |
| iree_memory_order_relaxed); |
| (void)old_idle_mask; |
| IREE_TRACE_PLOT_VALUE_F32( |
| worker->executor->trace_name, |
| 100.0f - 100.0f * (iree_task_affinity_set_count_ones(old_idle_mask) + 1) / |
| (float)worker->executor->worker_count); |
| } |
| |
| void iree_task_worker_post_tasks(iree_task_worker_t* worker, |
| iree_task_list_t* list) { |
| // Move the list into the mailbox. Note that the mailbox is LIFO and this list |
| // is concatenated with its current order preserved (which should be LIFO). |
| iree_atomic_task_slist_concat(&worker->mailbox_slist, list->head, list->tail); |
| memset(list, 0, sizeof(*list)); |
| } |
| |
| iree_task_t* iree_task_worker_try_steal_task(iree_task_worker_t* worker, |
| iree_task_queue_t* target_queue, |
| iree_host_size_t max_tasks) { |
| // Try to grab tasks from the worker; if more than one task is stolen then the |
| // first will be returned and the remaining will be added to the target queue. |
| iree_task_t* task = iree_task_queue_try_steal(&worker->local_task_queue, |
| target_queue, max_tasks); |
| if (task) return task; |
| |
| // If we still didn't steal any tasks then let's try the slist instead. |
| task = iree_atomic_task_slist_pop(&worker->mailbox_slist); |
| if (task) return task; |
| |
| return NULL; |
| } |
| |
| // Executes a task on a worker. |
| // Only task types that are scheduled to workers are handled; all others must be |
| // handled by the coordinator during scheduling. |
| static void iree_task_worker_execute( |
| iree_task_worker_t* worker, iree_task_t* task, |
| iree_task_submission_t* pending_submission) { |
| // Execute the task and resolve the task and gather any tasks that are now |
| // ready for submission to the executor. They'll be scheduled the next time |
| // the coordinator runs. |
| // |
| // TODO(benvanik): think a bit more about this timing; this ensures we have |
| // BFS behavior at the cost of the additional merge overhead - it's probably |
| // worth it? |
| // TODO(benvanik): handle partial tasks and re-queuing. |
| switch (task->type) { |
| case IREE_TASK_TYPE_CALL: { |
| iree_task_call_execute((iree_task_call_t*)task, pending_submission); |
| break; |
| } |
| case IREE_TASK_TYPE_DISPATCH_SHARD: { |
| iree_task_dispatch_shard_execute( |
| (iree_task_dispatch_shard_t*)task, worker->processor_id, |
| worker->worker_index, worker->local_memory, pending_submission); |
| break; |
| } |
| default: |
| IREE_ASSERT_UNREACHABLE("incorrect task type for worker execution"); |
| break; |
| } |
| |
| // NOTE: task is invalidated above and must not be used! |
| task = NULL; |
| } |
| |
| // Pumps the worker thread once, processing a single task. |
| // Returns true if pumping should continue as there are more tasks remaining or |
| // false if the caller should wait for more tasks to be posted. |
| static bool iree_task_worker_pump_once( |
| iree_task_worker_t* worker, iree_task_submission_t* pending_submission) { |
| IREE_TRACE_ZONE_BEGIN(z0); |
| |
| // Check the local work queue for any work we know we should start |
| // processing immediately. Other workers may try to steal some of this work |
| // if we take too long. |
| iree_task_t* task = iree_task_queue_pop_front(&worker->local_task_queue); |
| |
| // Check the mailbox to see if we have incoming work that has been posted. |
| // We try to greedily move it to our local work list so that we can work |
| // with the full thread-local pending task list. |
| if (!task) { |
| // NOTE: there's a potential for theft pessimization if the queue runs too |
| // low and there's nothing there when a thief goes to grab some tasks. A |
| // standout there would indicate that we weren't scheduling very well in the |
| // first place (large uneven workloads for various workers, bad distribution |
| // in the face of heterogenous multi-core architectures where some workers |
| // complete tasks faster than others, etc). |
| task = iree_task_queue_flush_from_lifo_slist(&worker->local_task_queue, |
| &worker->mailbox_slist); |
| } |
| |
| #if IREE_TASK_EXECUTOR_MAX_THEFT_ATTEMPTS_DIVISOR > 0 |
| // If we ran out of work assigned to this specific worker try to steal some |
| // from other workers that we hopefully share some of the cache hierarchy |
| // with. Their tasks will be moved from their local queue into ours and the |
| // the first task in the queue is popped off and returned. |
| if (!task) { |
| task = iree_task_executor_try_steal_task( |
| worker->executor, worker->constructive_sharing_mask, |
| worker->max_theft_attempts, &worker->theft_prng, |
| &worker->local_task_queue); |
| } |
| #endif // IREE_TASK_EXECUTOR_MAX_THEFT_ATTEMPTS_DIVISOR > 0 |
| |
| // No tasks to run; let the caller know we want to wait for more. |
| if (!task) { |
| IREE_TRACE_ZONE_END(z0); |
| return false; |
| } |
| |
| // Execute the task (may call out to arbitrary user code and may submit more |
| // tasks for execution). |
| iree_task_worker_execute(worker, task, pending_submission); |
| |
| IREE_TRACE_ZONE_END(z0); |
| return true; // try again |
| } |
| |
| // Updates the cached processor ID field in the worker. |
| static void iree_task_worker_update_processor_id(iree_task_worker_t* worker) { |
| iree_cpu_requery_processor_id(&worker->processor_tag, &worker->processor_id); |
| } |
| |
| // Alternates between pumping ready tasks in the worker queue and waiting |
| // for more tasks to arrive. Only returns when the worker has been asked by |
| // the executor to exit. |
| static void iree_task_worker_pump_until_exit(iree_task_worker_t* worker) { |
| // Initial processor ID assignment. We normally refresh this upon waking from |
| // a wait but it's possible that there's already work pending and we want to |
| // be able to process it with the proper processor ID immediately. |
| iree_task_worker_update_processor_id(worker); |
| |
| // Pump the thread loop to process more tasks. |
| while (true) { |
| // If we fail to find any work to do we'll wait at the end of this loop. |
| // In order not to not miss any work that is enqueued after we've already |
| // checked a particular source we use an interruptable wait token that |
| // will prevent the wait from happening if anyone touches the data |
| // structures we use. |
| iree_wait_token_t wait_token = |
| iree_notification_prepare_wait(&worker->wake_notification); |
| |
| // Now active until we decide to go back to sleep until the next pump. |
| iree_task_worker_mark_active(worker); |
| |
| // Check state to see if we've been asked to exit. |
| if (iree_atomic_load_int32(&worker->state, iree_memory_order_acquire) == |
| IREE_TASK_WORKER_STATE_EXITING) { |
| // Thread exit requested - cancel pumping. |
| iree_notification_cancel_wait(&worker->wake_notification); |
| // TODO(benvanik): complete tasks before exiting? |
| break; |
| } |
| |
| // TODO(benvanik): we could try to update the processor ID here before we |
| // begin a new batch of work - assuming it's not too expensive. |
| |
| iree_task_submission_t pending_submission; |
| iree_task_submission_initialize(&pending_submission); |
| |
| while (iree_task_worker_pump_once(worker, &pending_submission)) { |
| // All work done ^, which will return false when the worker should wait. |
| } |
| |
| bool schedule_dirty = false; |
| if (!iree_task_submission_is_empty(&pending_submission)) { |
| iree_task_executor_merge_submission(worker->executor, |
| &pending_submission); |
| schedule_dirty = true; |
| } |
| |
| // We've finished all the work we have scheduled so set our idle flag. |
| // This ensures that if any other thread comes in and wants to give us |
| // work we will properly coordinate/wake below. |
| iree_task_worker_mark_idle(worker); |
| |
| // When we encounter a complete lack of work we can self-nominate to check |
| // the global work queue and distribute work to other threads. Only one |
| // coordinator can be running at a time so we also ensure that if another |
| // is doing its work we gracefully wait for it. It's fine to block in here |
| // as the next thing we'd have done is go idle anyway. |
| |
| // First self-nominate; this *may* do something or just be ignored (if |
| // another worker is already coordinating). |
| iree_task_executor_coordinate(worker->executor, worker); |
| |
| // If nothing has been enqueued since we started this loop (so even |
| // coordination didn't find anything) we go idle. Otherwise we fall |
| // through and try the loop again. |
| if (schedule_dirty || |
| !iree_task_queue_is_empty(&worker->local_task_queue)) { |
| // Have more work to do; loop around to try another pump. |
| iree_notification_cancel_wait(&worker->wake_notification); |
| } else { |
| // Spin/wait in the kernel. We don't care if the condition fails as we're |
| // just using it as a pulse. |
| IREE_TRACE_ZONE_BEGIN_NAMED(z_wait, |
| "iree_task_worker_main_pump_wake_wait"); |
| iree_notification_commit_wait( |
| &worker->wake_notification, wait_token, |
| /*spin_ns=*/worker->executor->worker_spin_ns, |
| /*deadline_ns=*/IREE_TIME_INFINITE_FUTURE); |
| IREE_TRACE_ZONE_END(z_wait); |
| |
| // Woke from a wait - query the processor ID in case we migrated during |
| // the sleep. |
| iree_task_worker_update_processor_id(worker); |
| } |
| |
| // Wait completed. |
| // Jump back up and try pumping any tasks that arrived. |
| continue; |
| } |
| } |
| |
| // Thread entry point for each worker. |
| static int iree_task_worker_main(iree_task_worker_t* worker) { |
| IREE_TRACE_ZONE_BEGIN(thread_zone); |
| |
| // We cannot rely on the global process settings for FPU state. |
| // Be explicit here on what we need. |
| iree_fpu_state_push(IREE_FPU_STATE_FLAG_FLUSH_DENORMALS_TO_ZERO); |
| |
| // Reset affinity (as it can change over time). |
| // TODO(benvanik): call this after waking in case CPU hotplugging happens. |
| iree_thread_request_affinity(worker->thread, worker->ideal_thread_affinity); |
| |
| // Enter the running state immediately. Note that we could have been requested |
| // to exit while suspended/still starting up, so check that here before we |
| // mess with any data structures. |
| const bool should_run = |
| iree_atomic_exchange_int32(&worker->state, IREE_TASK_WORKER_STATE_RUNNING, |
| iree_memory_order_acq_rel) != |
| IREE_TASK_WORKER_STATE_EXITING; |
| if (IREE_LIKELY(should_run)) { |
| // << work happens here >> |
| iree_task_worker_pump_until_exit(worker); |
| } |
| |
| // Indicate idle immediately before exit. |
| iree_task_worker_mark_idle(worker); |
| |
| IREE_TRACE_ZONE_END(thread_zone); |
| iree_atomic_store_int32(&worker->state, IREE_TASK_WORKER_STATE_ZOMBIE, |
| iree_memory_order_release); |
| iree_notification_post(&worker->state_notification, IREE_ALL_WAITERS); |
| return 0; |
| } |