[cuda][hip] Fix launch host func and worker thread state update (#16568)
This commits fixes a few issues in pending action queue to
resolve driver deadlock issues:
* In host launch func, which is called from a driver thread,
we cannot invoke any GPU API. Otherwise we might see
deadlock. This includes cleaning up the actions after
execution--it may involve buffer releasing/unregistering
which was the issue causing hip driver hang. Now move
this cleanup into the worker thread. This is done by adding
a state field to each action to indicate whether it's alive
or zombie. We enqueue each action again after done
execution by flipping its state to zombie to let the worker
thread to cleanup.
* The worker thread can have five states--two normal states
(idle waiting or workload pending), three exit states (requested,
committed, error). They have increasing priorities w.r.t.
overwriting. We cannot overwrite state later in the list
without checking. This guarantees that exit requests are
properly respected and not dropping to the floor so to have
clean exit.
* When the worker thread is waken to process ready list, we
need to immediately flip the worker state from workload
pending to idle waiting, before any real processing. This
makes sure we don't drop new workload enqueued while
we are processing, and the worker thread can be waken
up again properly later.
With the above fixes, we can pass all stablehlo/tosa e2e op
tests on hip driver without hang or crashes. The same
change is mirrored to the cuda pending action queue.
Fixes https://github.com/openxla/iree/issues/15790
Progress towards https://github.com/openxla/iree/issues/16504
diff --git a/experimental/hip/pending_queue_actions.c b/experimental/hip/pending_queue_actions.c
index cf63c45..7f346a3 100644
--- a/experimental/hip/pending_queue_actions.c
+++ b/experimental/hip/pending_queue_actions.c
@@ -17,6 +17,7 @@
#include "iree/base/api.h"
#include "iree/base/internal/arena.h"
#include "iree/base/internal/atomic_slist.h"
+#include "iree/base/internal/atomics.h"
#include "iree/base/internal/synchronization.h"
#include "iree/base/internal/threading.h"
#include "iree/hal/api.h"
@@ -35,6 +36,13 @@
// TODO: Add support for queue alloca and dealloca.
} iree_hal_hip_queue_action_kind_t;
+typedef enum iree_hal_hip_queue_action_state_e {
+ // The current action is active as waiting for or under execution.
+ IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE,
+ // The current action is done execution and waiting for destruction.
+ IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE,
+} iree_hal_hip_queue_action_state_t;
+
// A pending queue action.
//
// Note that this struct does not have internal synchronization; it's expected
@@ -49,6 +57,11 @@
// Retained to make sure it outlives the current action.
iree_hal_hip_pending_queue_actions_t* owning_actions;
+ // The current state of this action. When an action is initially created it
+ // will be alive and enqueued to wait for releasing to the GPU. After done
+ // execution, it will be flipped into zombie state and enqueued again for
+ // destruction.
+ iree_hal_hip_queue_action_state_t state;
// The callback to run after completing this action and before freeing
// all resources. Can be NULL.
iree_hal_hip_pending_action_cleanup_callback_t cleanup_callback;
@@ -83,6 +96,7 @@
// Scratch fields for analyzing whether actions are ready to issue.
hipEvent_t events[IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT];
iree_host_size_t event_count;
+ // Whether the current action is still not ready for releasing to the GPU.
bool is_pending;
} iree_hal_hip_queue_action_t;
@@ -139,7 +153,7 @@
static void iree_hal_hip_queue_action_list_take_all(
iree_hal_hip_queue_action_list_t* available_list,
iree_hal_hip_queue_action_list_t* ready_list) {
- IREE_ASSERT(available_list != ready_list);
+ IREE_ASSERT_NE(available_list, ready_list);
ready_list->head = available_list->head;
ready_list->tail = available_list->tail;
available_list->head = NULL;
@@ -173,12 +187,16 @@
slist_next));
// The ready-list processing worker's working/exiting state.
+//
+// States in the list has increasing priorities--meaning normally ones appearing
+// earlier can overwrite ones appearing later without checking; but not the
+// reverse order.
typedef enum iree_hal_hip_worker_state_e {
- IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0,
- IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1,
- IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1,
- IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2,
- IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3,
+ IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread
+ IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread
+ IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread
+ IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread
+ IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread
} iree_hal_hip_worker_state_t;
// The data structure needed by a ready-list processing worker thread to issue
@@ -329,6 +347,7 @@
iree_memory_order_acq_rel);
iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS);
+ // Check potential exit states from the worker.
if (prev_state != IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR) {
// Wait until the worker acknowledged exiting.
iree_notification_await(
@@ -424,12 +443,16 @@
z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action),
(void**)&action));
- action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION;
+ action->owning_actions = actions;
+ action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE;
action->cleanup_callback = cleanup_callback;
action->callback_user_data = callback_user_data;
+ action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION;
action->device = device;
action->dispatch_hip_stream = dispatch_stream;
action->callback_hip_stream = callback_stream;
+
+ // Initialize scratch fields.
action->event_count = 0;
action->is_pending = true;
@@ -451,6 +474,9 @@
iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count,
signal_semaphore_list.semaphores);
}
+ if (IREE_LIKELY(iree_status_is_ok(status))) {
+ action->resource_set = resource_set;
+ }
// Copy the command buffer list for later access.
// TODO: avoid host allocator malloc; use some pool for the allocation.
@@ -475,11 +501,10 @@
}
if (IREE_LIKELY(iree_status_is_ok(status))) {
- action->owning_actions = actions;
+ // Retain the owning queue to make sure the action outlives it.
iree_hal_resource_retain(actions);
- action->resource_set = resource_set;
-
+ // Now everything is okay and we can enqueue the action.
iree_slim_mutex_lock(&actions->action_mutex);
iree_hal_hip_queue_action_list_push_back(&actions->action_list, action);
iree_slim_mutex_unlock(&actions->action_mutex);
@@ -498,34 +523,42 @@
return status;
}
-static void iree_hal_hip_pending_queue_actions_cleanup_execution(
- iree_hal_hip_queue_action_t* action);
-
// Releases resources after action completion on the GPU and advances timeline
// and pending actions queue.
//
-// This is the HIP host function callback to hipLaunchHostFunc, invoked by a
-// HIP driver thread.
+// This is the HIP host function callback to hipLaunchHostFunc(), invoked by a
+// HIP driver thread. Note that code in this function MUST NOT invoke any GPU
+// API under the hood to avoid potential deadlock.
static void iree_hal_hip_execution_device_signal_host_callback(
void* user_data) {
IREE_TRACE_ZONE_BEGIN(z0);
iree_hal_hip_queue_action_t* action = (iree_hal_hip_queue_action_t*)user_data;
+ IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION);
+ IREE_ASSERT_EQ(action->state, IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE);
iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions;
+
+ // Flip the action state to zombie and enqueue it again so that we can let
+ // the worker thread clean it up. Note that this is necessary because cleanup
+ // may involve GPU API calls like buffer releasing or unregistering, so we can
+ // not inline it here.
+ action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE;
+ iree_slim_mutex_lock(&actions->action_mutex);
+ iree_hal_hip_queue_action_list_push_back(&actions->action_list, action);
+ iree_slim_mutex_unlock(&actions->action_mutex);
+
// Advance semaphore timelines by calling into the host signaling function.
+ // This will internally try to release more workload to the GPU.
IREE_IGNORE_ERROR(
iree_hal_semaphore_list_signal(action->signal_semaphore_list));
- // Destroy the current action given its done now--this also frees all retained
- // resources.
- iree_hal_hip_pending_queue_actions_cleanup_execution(action);
- // Try to release more pending actions to the GPU now.
- IREE_IGNORE_ERROR(iree_hal_hip_pending_queue_actions_issue(actions));
+
IREE_TRACE_ZONE_END(z0);
}
// Issues the given kernel dispatch |action| to the GPU.
static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution(
iree_hal_hip_queue_action_t* action) {
- IREE_ASSERT(action->is_pending == false);
+ IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION);
+ IREE_ASSERT_EQ(action->is_pending, false);
const iree_hal_hip_dynamic_symbols_t* symbols =
action->owning_actions->symbols;
IREE_TRACE_ZONE_BEGIN(z0);
@@ -605,27 +638,32 @@
return iree_ok_status();
}
-// Releases resources after completing the given kernel dispatch |action|.
-static void iree_hal_hip_pending_queue_actions_cleanup_execution(
+// Performs the given cleanup |action| on the CPU.
+static iree_status_t iree_hal_hip_pending_queue_actions_issue_cleanup(
iree_hal_hip_queue_action_t* action) {
iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions;
iree_allocator_t host_allocator = actions->host_allocator;
IREE_TRACE_ZONE_BEGIN(z0);
+ // Call user provided callback before releasing any resource.
if (action->cleanup_callback) {
action->cleanup_callback(action->callback_user_data);
}
+ // Only release resources after callbacks have been issued.
iree_hal_resource_set_free(action->resource_set);
iree_hal_hip_free_semaphore_list(host_allocator,
&action->wait_semaphore_list);
iree_hal_hip_free_semaphore_list(host_allocator,
&action->signal_semaphore_list);
+
+ // Drop reference to the pending action queue given now we are done.
iree_hal_resource_release(actions);
iree_allocator_free(host_allocator, action);
IREE_TRACE_ZONE_END(z0);
+ return iree_ok_status();
}
iree_status_t iree_hal_hip_pending_queue_actions_issue(
@@ -657,35 +695,39 @@
action->event_count = 0;
action->is_pending = false;
- // Look at all wait semaphores.
- for (iree_host_size_t i = 0; i < semaphore_count; ++i) {
- // If this semaphore has already signaled past the desired value, we can
- // just ignore it.
- uint64_t value = 0;
- status = iree_hal_semaphore_query(semaphores[i], &value);
- if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
- if (value >= values[i]) continue;
+ // Cleanup actions are immediately ready to release. Otherwise, look at all
+ // wait semaphores to make sure that they are either already ready or we can
+ // wait on a device event.
+ if (action->state == IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE) {
+ for (iree_host_size_t i = 0; i < semaphore_count; ++i) {
+ // If this semaphore has already signaled past the desired value, we can
+ // just ignore it.
+ uint64_t value = 0;
+ status = iree_hal_semaphore_query(semaphores[i], &value);
+ if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
+ if (value >= values[i]) continue;
- // Try to acquire a hipEvent_t from a device wait timepoint. If so, we can
- // use that hipEvent_t to wait on the device. Otherwise, this action is
- // still not ready.
- hipEvent_t event = NULL;
- status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait(
- semaphores[i], values[i], &event);
- if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
- if (!event) {
- // Clear the scratch fields.
- action->event_count = 0;
- action->is_pending = true;
- break;
+ // Try to acquire a hipEvent_t from a device wait timepoint. If so, we
+ // can use that hipEvent_t to wait on the device. Otherwise, this action
+ // is still not ready.
+ hipEvent_t event = NULL;
+ status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait(
+ semaphores[i], values[i], &event);
+ if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
+ if (!event) {
+ // Clear the scratch fields.
+ action->event_count = 0;
+ action->is_pending = true;
+ break;
+ }
+ if (IREE_UNLIKELY(action->event_count >=
+ IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) {
+ status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED,
+ "exceeded max wait hipEvent_t limit");
+ break;
+ }
+ action->events[action->event_count++] = event;
}
- if (IREE_UNLIKELY(action->event_count >=
- IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) {
- status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED,
- "exceeded max wait hipEvent_t limit");
- break;
- }
- action->events[action->event_count++] = event;
}
if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
@@ -712,6 +754,12 @@
iree_slim_mutex_unlock(&actions->action_mutex);
+ if (ready_list.head == NULL) {
+ // Nothing ready yet. Just return.
+ IREE_TRACE_ZONE_END(z0);
+ return status;
+ }
+
iree_hal_hip_atomic_slist_entry_t* entry = NULL;
// TODO: avoid host allocator malloc; use some pool for the allocation.
if (iree_status_is_ok(status)) {
@@ -723,6 +771,7 @@
// Release all actions in the ready list to avoid leaking.
iree_hal_hip_queue_action_list_free_actions(actions->host_allocator,
&ready_list);
+ iree_allocator_free(actions->host_allocator, entry);
IREE_TRACE_ZONE_END(z0);
return status;
}
@@ -732,11 +781,17 @@
entry->ready_list_head = ready_list.head;
iree_hal_hip_ready_action_slist_push(&actions->working_area.ready_worklist,
entry);
+
+ // We can only overwrite the worker state if the previous state is idle
+ // waiting; we cannot overwrite exit related states. so we need to perform
+ // atomic compare and exchange here.
iree_hal_hip_worker_state_t prev_state =
- (iree_hal_hip_worker_state_t)iree_atomic_exchange_int32(
- &actions->working_area.worker_state,
- IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING,
- iree_memory_order_acq_rel);
+ IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING;
+ iree_atomic_compare_exchange_strong_int32(
+ &actions->working_area.worker_state, /*expected=*/&prev_state,
+ /*desired=*/IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING,
+ /*order_succ=*/iree_memory_order_acq_rel,
+ /*order_fail=*/iree_memory_order_acquire);
iree_notification_post(&actions->working_area.state_notification,
IREE_ALL_WAITERS);
@@ -759,6 +814,8 @@
iree_hal_hip_working_area_t* working_area) {
iree_hal_hip_worker_state_t value = iree_atomic_load_int32(
&working_area->worker_state, iree_memory_order_acquire);
+ // These are the only two possible states that set from the main thread to
+ // the worker thread.
return value == IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING ||
value == IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED;
}
@@ -788,7 +845,14 @@
iree_hal_hip_queue_action_t* next_action = action->next;
action->next = NULL;
- status = iree_hal_hip_pending_queue_actions_issue_execution(action);
+ switch (action->state) {
+ case IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE:
+ status = iree_hal_hip_pending_queue_actions_issue_execution(action);
+ break;
+ case IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE:
+ status = iree_hal_hip_pending_queue_actions_issue_cleanup(action);
+ break;
+ }
if (!iree_status_is_ok(status)) break;
action->event_count = 0;
@@ -814,11 +878,26 @@
(iree_condition_fn_t)iree_hal_hip_worker_has_incoming_request,
working_area, iree_infinite_timeout());
+ // Immediately flip the state to idle waiting if and only if the previous
+ // state is workload pending. We do it before processing ready list to make
+ // sure that we don't accidentally ignore new workload pushed after done
+ // ready list processing but before overwriting the state from this worker
+ // thread. Also we don't want to overwrite other exit states. So we need to
+ // perform atomic compare and exchange here.
+ iree_hal_hip_worker_state_t prev_state =
+ IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING;
+ iree_atomic_compare_exchange_strong_int32(
+ &working_area->worker_state, /*expected=*/&prev_state,
+ /*desired=*/IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING,
+ /*order_succ=*/iree_memory_order_acq_rel,
+ /*order_fail=*/iree_memory_order_acquire);
+
// Check if we received request to stop processing and exit this thread.
bool should_exit = iree_atomic_load_int32(&working_area->worker_state,
iree_memory_order_acquire) ==
IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED;
+ // Process the ready list. We also want this even requested to exit.
iree_status_t status = iree_hal_hip_worker_process_ready_list(
working_area->host_allocator, worklist);
if (IREE_UNLIKELY(!iree_status_is_ok(status))) {
@@ -826,6 +905,7 @@
iree_atomic_store_int32(&working_area->error_code,
iree_status_code(status),
iree_memory_order_release);
+ // This state has the highest priority so just overwrite.
iree_atomic_store_int32(&working_area->worker_state,
IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR,
iree_memory_order_release);
@@ -835,7 +915,9 @@
}
if (should_exit) {
- // Signal that this thread is committed to exit.
+ // Signal that this thread is committed to exit. This state has a priority
+ // that is only lower than error exit. And we just checked error exit in
+ // the above. So also just overwrite.
iree_atomic_store_int32(&working_area->worker_state,
IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED,
iree_memory_order_release);
@@ -843,11 +925,6 @@
IREE_ALL_WAITERS);
return 0;
}
-
- // Signal that this thread is done processing and now waiting for more.
- iree_atomic_store_int32(&working_area->worker_state,
- IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING,
- iree_memory_order_release);
}
return 0;
}
diff --git a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c
index 61b6576..2110562 100644
--- a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c
+++ b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c
@@ -12,6 +12,7 @@
#include "iree/base/api.h"
#include "iree/base/internal/arena.h"
#include "iree/base/internal/atomic_slist.h"
+#include "iree/base/internal/atomics.h"
#include "iree/base/internal/synchronization.h"
#include "iree/base/internal/threading.h"
#include "iree/hal/api.h"
@@ -35,6 +36,13 @@
// TODO: Add support for queue alloca and dealloca.
} iree_hal_cuda_queue_action_kind_t;
+typedef enum iree_hal_cuda_queue_action_state_e {
+ // The current action is active as waiting for or under execution.
+ IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE,
+ // The current action is done execution and waiting for destruction.
+ IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE,
+} iree_hal_cuda_queue_action_state_t;
+
// A pending queue action.
//
// Note that this struct does not have internal synchronization; it's expected
@@ -49,6 +57,11 @@
// Retained to make sure it outlives the current action.
iree_hal_cuda_pending_queue_actions_t* owning_actions;
+ // The current state of this action. When an action is initially created it
+ // will be alive and enqueued to wait for releasing to the GPU. After done
+ // execution, it will be flipped into zombie state and enqueued again for
+ // destruction.
+ iree_hal_cuda_queue_action_state_t state;
// The callback to run after completing this action and before freeing
// all resources. Can be NULL.
iree_hal_cuda_pending_action_cleanup_callback_t cleanup_callback;
@@ -84,6 +97,7 @@
CUevent events[IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT];
iree_host_size_t event_count;
bool is_pending;
+ // Whether the current action is still not ready for releasing to the GPU.
} iree_hal_cuda_queue_action_t;
//===----------------------------------------------------------------------===//
@@ -139,7 +153,7 @@
static void iree_hal_cuda_queue_action_list_take_all(
iree_hal_cuda_queue_action_list_t* available_list,
iree_hal_cuda_queue_action_list_t* ready_list) {
- IREE_ASSERT(available_list != ready_list);
+ IREE_ASSERT_NE(available_list, ready_list);
ready_list->head = available_list->head;
ready_list->tail = available_list->tail;
available_list->head = NULL;
@@ -173,12 +187,16 @@
slist_next));
// The ready-list processing worker's working/exiting state.
+//
+// States in the list has increasing priorities--meaning normally ones appearing
+// earlier can overwrite ones appearing later without checking; but not the
+// reverse order.
typedef enum iree_hal_cuda_worker_state_e {
- IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0,
- IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1,
- IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1,
- IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2,
- IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3,
+ IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread
+ IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread
+ IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread
+ IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread
+ IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread
} iree_hal_cuda_worker_state_t;
// The data structure needed by a ready-list processing worker thread to issue
@@ -329,6 +347,7 @@
IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED, iree_memory_order_acq_rel);
iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS);
+ // Check potential exit states from the worker.
if (prev_state != IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR) {
// Wait until the worker acknowledged exiting.
iree_notification_await(
@@ -424,12 +443,16 @@
z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action),
(void**)&action));
- action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION;
+ action->owning_actions = actions;
+ action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE;
action->cleanup_callback = cleanup_callback;
action->callback_user_data = callback_user_data;
+ action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION;
action->device = device;
action->dispatch_cu_stream = dispatch_stream;
action->callback_cu_stream = callback_stream;
+
+ // Initialize scratch fields.
action->event_count = 0;
action->is_pending = true;
@@ -451,6 +474,9 @@
iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count,
signal_semaphore_list.semaphores);
}
+ if (IREE_LIKELY(iree_status_is_ok(status))) {
+ action->resource_set = resource_set;
+ }
// Copy the command buffer list for later access.
// TODO: avoid host allocator malloc; use some pool for the allocation.
@@ -475,11 +501,10 @@
}
if (IREE_LIKELY(iree_status_is_ok(status))) {
- action->owning_actions = actions;
+ // Retain the owning queue to make sure the action outlives it.
iree_hal_resource_retain(actions);
- action->resource_set = resource_set;
-
+ // Now everything is okay and we can enqueue the action.
iree_slim_mutex_lock(&actions->action_mutex);
iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action);
iree_slim_mutex_unlock(&actions->action_mutex);
@@ -498,35 +523,43 @@
return status;
}
-static void iree_hal_cuda_pending_queue_actions_cleanup_execution(
- iree_hal_cuda_queue_action_t* action);
-
// Releases resources after action completion on the GPU and advances timeline
// and pending actions queue.
//
-// This is the CUDA host function callback to cudaLaunchHostFunc, invoked by a
-// CUDA driver thread.
+// This is the CUDA host function callback to cudaLaunchHostFunc(), invoked by a
+// CUDA driver thread. Note that code in this function MUST NOT invoke any GPU
+// API under the hood to avoid potential deadlock.
static void iree_hal_cuda_execution_device_signal_host_callback(
void* user_data) {
IREE_TRACE_ZONE_BEGIN(z0);
iree_hal_cuda_queue_action_t* action =
(iree_hal_cuda_queue_action_t*)user_data;
+ IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION);
+ IREE_ASSERT_EQ(action->state, IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE);
iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions;
+
+ // Flip the action state to zombie and enqueue it again so that we can let
+ // the worker thread clean it up. Note that this is necessary because cleanup
+ // may involve GPU API calls like buffer releasing or unregistering, so we can
+ // not inline it here.
+ action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE;
+ iree_slim_mutex_lock(&actions->action_mutex);
+ iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action);
+ iree_slim_mutex_unlock(&actions->action_mutex);
+
// Advance semaphore timelines by calling into the host signaling function.
+ // This will internally try to release more workload to the GPU.
IREE_IGNORE_ERROR(
iree_hal_semaphore_list_signal(action->signal_semaphore_list));
- // Destroy the current action given its done now--this also frees all retained
- // resources.
- iree_hal_cuda_pending_queue_actions_cleanup_execution(action);
- // Try to release more pending actions to the GPU now.
- IREE_IGNORE_ERROR(iree_hal_cuda_pending_queue_actions_issue(actions));
+
IREE_TRACE_ZONE_END(z0);
}
// Issues the given kernel dispatch |action| to the GPU.
static iree_status_t iree_hal_cuda_pending_queue_actions_issue_execution(
iree_hal_cuda_queue_action_t* action) {
- IREE_ASSERT(action->is_pending == false);
+ IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION);
+ IREE_ASSERT_EQ(action->is_pending, false);
const iree_hal_cuda_dynamic_symbols_t* symbols =
action->owning_actions->symbols;
IREE_TRACE_ZONE_BEGIN(z0);
@@ -607,27 +640,32 @@
return iree_ok_status();
}
-// Releases resources after completing the given kernel dispatch |action|.
-static void iree_hal_cuda_pending_queue_actions_cleanup_execution(
+// Performs the given cleanup |action| on the CPU.
+static iree_status_t iree_hal_cuda_pending_queue_actions_issue_cleanup(
iree_hal_cuda_queue_action_t* action) {
iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions;
iree_allocator_t host_allocator = actions->host_allocator;
IREE_TRACE_ZONE_BEGIN(z0);
+ // Call user provided callback before releasing any resource.
if (action->cleanup_callback) {
action->cleanup_callback(action->callback_user_data);
}
+ // Only release resources after callbacks have been issued.
iree_hal_resource_set_free(action->resource_set);
iree_hal_cuda_free_semaphore_list(host_allocator,
&action->wait_semaphore_list);
iree_hal_cuda_free_semaphore_list(host_allocator,
&action->signal_semaphore_list);
+
+ // Drop reference to the pending action queue given now we are done.
iree_hal_resource_release(actions);
iree_allocator_free(host_allocator, action);
IREE_TRACE_ZONE_END(z0);
+ return iree_ok_status();
}
iree_status_t iree_hal_cuda_pending_queue_actions_issue(
@@ -659,35 +697,39 @@
action->event_count = 0;
action->is_pending = false;
- // Look at all wait semaphores.
- for (iree_host_size_t i = 0; i < semaphore_count; ++i) {
- // If this semaphore has already signaled past the desired value, we can
- // just ignore it.
- uint64_t value = 0;
- status = iree_hal_semaphore_query(semaphores[i], &value);
- if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
- if (value >= values[i]) continue;
+ // Cleanup actions are immediately ready to release. Otherwise, look at all
+ // wait semaphores to make sure that they are either already ready or we can
+ // wait on a device event.
+ if (action->state == IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE) {
+ for (iree_host_size_t i = 0; i < semaphore_count; ++i) {
+ // If this semaphore has already signaled past the desired value, we can
+ // just ignore it.
+ uint64_t value = 0;
+ status = iree_hal_semaphore_query(semaphores[i], &value);
+ if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
+ if (value >= values[i]) continue;
- // Try to acquire a CUevent from a device wait timepoint. If so, we can
- // use that CUevent to wait on the device. Otherwise, this action is still
- // not ready.
- CUevent event = NULL;
- status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait(
- semaphores[i], values[i], &event);
- if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
- if (!event) {
- // Clear the scratch fields.
- action->event_count = 0;
- action->is_pending = true;
- break;
+ // Try to acquire a CUevent from a device wait timepoint. If so, we can
+ // use that CUevent to wait on the device. Otherwise, this action is
+ // still not ready.
+ CUevent event = NULL;
+ status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait(
+ semaphores[i], values[i], &event);
+ if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
+ if (!event) {
+ // Clear the scratch fields.
+ action->event_count = 0;
+ action->is_pending = true;
+ break;
+ }
+ if (IREE_UNLIKELY(action->event_count >=
+ IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) {
+ status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED,
+ "exceeded max wait CUevent limit");
+ break;
+ }
+ action->events[action->event_count++] = event;
}
- if (IREE_UNLIKELY(action->event_count >=
- IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) {
- status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED,
- "exceeded max wait CUevent limit");
- break;
- }
- action->events[action->event_count++] = event;
}
if (IREE_UNLIKELY(!iree_status_is_ok(status))) break;
@@ -714,6 +756,12 @@
iree_slim_mutex_unlock(&actions->action_mutex);
+ if (ready_list.head == NULL) {
+ // Nothing ready yet. Just return.
+ IREE_TRACE_ZONE_END(z0);
+ return status;
+ }
+
iree_hal_cuda_atomic_slist_entry_t* entry = NULL;
// TODO: avoid host allocator malloc; use some pool for the allocation.
if (iree_status_is_ok(status)) {
@@ -725,6 +773,7 @@
// Release all actions in the ready list to avoid leaking.
iree_hal_cuda_queue_action_list_free_actions(actions->host_allocator,
&ready_list);
+ iree_allocator_free(actions->host_allocator, entry);
IREE_TRACE_ZONE_END(z0);
return status;
}
@@ -734,11 +783,17 @@
entry->ready_list_head = ready_list.head;
iree_hal_cuda_ready_action_slist_push(&actions->working_area.ready_worklist,
entry);
+
+ // We can only overwrite the worker state if the previous state is idle
+ // waiting; we cannot overwrite exit related states. so we need to perform
+ // atomic compare and exchange here.
iree_hal_cuda_worker_state_t prev_state =
- (iree_hal_cuda_worker_state_t)iree_atomic_exchange_int32(
- &actions->working_area.worker_state,
- IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING,
- iree_memory_order_acq_rel);
+ IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING;
+ iree_atomic_compare_exchange_strong_int32(
+ &actions->working_area.worker_state, /*expected=*/&prev_state,
+ /*desired=*/IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING,
+ /*order_succ=*/iree_memory_order_acq_rel,
+ /*order_fail=*/iree_memory_order_acquire);
iree_notification_post(&actions->working_area.state_notification,
IREE_ALL_WAITERS);
@@ -761,6 +816,8 @@
iree_hal_cuda_working_area_t* working_area) {
iree_hal_cuda_worker_state_t value = iree_atomic_load_int32(
&working_area->worker_state, iree_memory_order_acquire);
+ // These are the only two possible states that set from the main thread to
+ // the worker thread.
return value == IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING ||
value == IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED;
}
@@ -790,7 +847,14 @@
iree_hal_cuda_queue_action_t* next_action = action->next;
action->next = NULL;
- status = iree_hal_cuda_pending_queue_actions_issue_execution(action);
+ switch (action->state) {
+ case IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE:
+ status = iree_hal_cuda_pending_queue_actions_issue_execution(action);
+ break;
+ case IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE:
+ status = iree_hal_cuda_pending_queue_actions_issue_cleanup(action);
+ break;
+ }
if (!iree_status_is_ok(status)) break;
action->event_count = 0;
@@ -816,11 +880,26 @@
(iree_condition_fn_t)iree_hal_cuda_worker_has_incoming_request,
working_area, iree_infinite_timeout());
+ // Immediately flip the state to idle waiting if and only if the previous
+ // state is workload pending. We do it before processing ready list to make
+ // sure that we don't accidentally ignore new workload pushed after done
+ // ready list processing but before overwriting the state from this worker
+ // thread. Also we don't want to overwrite other exit states. So we need to
+ // perform atomic compare and exchange here.
+ iree_hal_cuda_worker_state_t prev_state =
+ IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING;
+ iree_atomic_compare_exchange_strong_int32(
+ &working_area->worker_state, /*expected=*/&prev_state,
+ /*desired=*/IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING,
+ /*order_succ=*/iree_memory_order_acq_rel,
+ /*order_fail=*/iree_memory_order_acquire);
+
// Check if we received request to stop processing and exit this thread.
bool should_exit = iree_atomic_load_int32(&working_area->worker_state,
iree_memory_order_acquire) ==
IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED;
+ // Process the ready list. We also want this even requested to exit.
iree_status_t status = iree_hal_cuda_worker_process_ready_list(
working_area->host_allocator, worklist);
if (IREE_UNLIKELY(!iree_status_is_ok(status))) {
@@ -828,6 +907,7 @@
iree_atomic_store_int32(&working_area->error_code,
iree_status_code(status),
iree_memory_order_release);
+ // This state has the highest priority so just overwrite.
iree_atomic_store_int32(&working_area->worker_state,
IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR,
iree_memory_order_release);
@@ -837,7 +917,9 @@
}
if (should_exit) {
- // Signal that this thread is committed to exit.
+ // Signal that this thread is committed to exit. This state has a priority
+ // that is only lower than error exit. And we just checked error exit in
+ // the above. So also just overwrite.
iree_atomic_store_int32(&working_area->worker_state,
IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED,
iree_memory_order_release);
@@ -845,11 +927,6 @@
IREE_ALL_WAITERS);
return 0;
}
-
- // Signal that this thread is done processing and now waiting for more.
- iree_atomic_store_int32(&working_area->worker_state,
- IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING,
- iree_memory_order_release);
}
return 0;
}