Adding support for multiple queues on the local-task device. (#10817)
Queues may share executors or have their own dedicated executors.
Progress on #10765.
diff --git a/experimental/web/sample_static/device_multithreaded.c b/experimental/web/sample_static/device_multithreaded.c
index 63854ee..c70924b 100644
--- a/experimental/web/sample_static/device_multithreaded.c
+++ b/experimental/web/sample_static/device_multithreaded.c
@@ -55,8 +55,8 @@
if (iree_status_is_ok(status)) {
status = iree_hal_task_device_create(
- identifier, ¶ms, executor, /*loader_count=*/1, &library_loader,
- device_allocator, host_allocator, out_device);
+ identifier, ¶ms, /*queue_count=*/1, &executor, /*loader_count=*/1,
+ &library_loader, device_allocator, host_allocator, out_device);
}
iree_hal_allocator_release(device_allocator);
diff --git a/runtime/src/iree/hal/drivers/local_task/registration/driver_module.c b/runtime/src/iree/hal/drivers/local_task/registration/driver_module.c
index dd0eab8..0f736d4 100644
--- a/runtime/src/iree/hal/drivers/local_task/registration/driver_module.c
+++ b/runtime/src/iree/hal/drivers/local_task/registration/driver_module.c
@@ -61,8 +61,8 @@
if (iree_status_is_ok(status)) {
status = iree_hal_task_driver_create(
- driver_name, &default_params, executor, loader_count, loaders,
- device_allocator, host_allocator, out_driver);
+ driver_name, &default_params, /*queue_count=*/1, &executor,
+ loader_count, loaders, device_allocator, host_allocator, out_driver);
}
iree_hal_allocator_release(device_allocator);
diff --git a/runtime/src/iree/hal/drivers/local_task/task_device.c b/runtime/src/iree/hal/drivers/local_task/task_device.c
index 104f694..ead0e47 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_device.c
+++ b/runtime/src/iree/hal/drivers/local_task/task_device.c
@@ -33,8 +33,6 @@
// buffers can contain inlined data uploads).
iree_arena_block_pool_t large_block_pool;
- iree_task_executor_t* executor;
-
iree_host_size_t loader_count;
iree_hal_executable_loader_t** loaders;
@@ -56,41 +54,49 @@
void iree_hal_task_device_params_initialize(
iree_hal_task_device_params_t* out_params) {
out_params->arena_block_size = 32 * 1024;
- out_params->queue_count = 8;
}
static iree_status_t iree_hal_task_device_check_params(
- const iree_hal_task_device_params_t* params) {
+ const iree_hal_task_device_params_t* params, iree_host_size_t queue_count) {
if (params->arena_block_size < 4096) {
return iree_make_status(IREE_STATUS_INVALID_ARGUMENT,
"arena block size too small (< 4096 bytes)");
}
- if (params->queue_count == 0) {
+ if (queue_count == 0) {
return iree_make_status(IREE_STATUS_INVALID_ARGUMENT,
- "at least one queue is required");
+ "must have at least one queue");
}
return iree_ok_status();
}
+// Returns an event pool used for device-wide system event handles.
+// Each queue executor will have its own (potentially shared) pool and prefer
+// that but generic resource requests (creating semaphores, etc) will use this.
+iree_event_pool_t* iree_hal_task_device_shared_event_pool(
+ iree_hal_task_device_t* device) {
+ return iree_task_executor_event_pool(device->queues[0].executor);
+}
+
iree_status_t iree_hal_task_device_create(
iree_string_view_t identifier, const iree_hal_task_device_params_t* params,
- iree_task_executor_t* executor, iree_host_size_t loader_count,
- iree_hal_executable_loader_t** loaders,
+ iree_host_size_t queue_count, iree_task_executor_t* const* queue_executors,
+ iree_host_size_t loader_count, iree_hal_executable_loader_t** loaders,
iree_hal_allocator_t* device_allocator, iree_allocator_t host_allocator,
iree_hal_device_t** out_device) {
IREE_ASSERT_ARGUMENT(params);
+ IREE_ASSERT_ARGUMENT(!queue_count || queue_executors);
IREE_ASSERT_ARGUMENT(!loader_count || loaders);
IREE_ASSERT_ARGUMENT(device_allocator);
IREE_ASSERT_ARGUMENT(out_device);
*out_device = NULL;
IREE_TRACE_ZONE_BEGIN(z0);
- IREE_RETURN_AND_END_ZONE_IF_ERROR(z0,
- iree_hal_task_device_check_params(params));
+ IREE_RETURN_AND_END_ZONE_IF_ERROR(
+ z0, iree_hal_task_device_check_params(params, queue_count));
iree_hal_task_device_t* device = NULL;
iree_host_size_t struct_size = sizeof(*device) +
- params->queue_count * sizeof(*device->queues) +
+ queue_count * sizeof(*device->queues) +
loader_count * sizeof(*device->loaders);
iree_host_size_t total_size = struct_size + identifier.size;
iree_status_t status =
@@ -110,23 +116,19 @@
iree_arena_block_pool_initialize(params->arena_block_size, host_allocator,
&device->large_block_pool);
- device->executor = executor;
- iree_task_executor_retain(device->executor);
-
device->loader_count = loader_count;
device->loaders =
(iree_hal_executable_loader_t**)((uint8_t*)device + sizeof(*device) +
- params->queue_count *
- sizeof(*device->queues));
+ queue_count * sizeof(*device->queues));
for (iree_host_size_t i = 0; i < device->loader_count; ++i) {
device->loaders[i] = loaders[i];
iree_hal_executable_loader_retain(device->loaders[i]);
}
- device->queue_count = params->queue_count;
+ device->queue_count = queue_count;
for (iree_host_size_t i = 0; i < device->queue_count; ++i) {
// TODO(benvanik): add a number to each queue ID.
- iree_hal_task_queue_initialize(device->identifier, device->executor,
+ iree_hal_task_queue_initialize(device->identifier, queue_executors[i],
&device->small_block_pool,
&device->queues[i]);
}
@@ -152,10 +154,9 @@
for (iree_host_size_t i = 0; i < device->loader_count; ++i) {
iree_hal_executable_loader_release(device->loaders[i]);
}
- iree_task_executor_release(device->executor);
+ iree_hal_allocator_release(device->device_allocator);
iree_arena_block_pool_deinitialize(&device->large_block_pool);
iree_arena_block_pool_deinitialize(&device->small_block_pool);
- iree_hal_allocator_release(device->device_allocator);
iree_allocator_free(host_allocator, device);
IREE_TRACE_ZONE_END(z0);
@@ -181,10 +182,18 @@
static iree_status_t iree_hal_task_device_trim(iree_hal_device_t* base_device) {
iree_hal_task_device_t* device = iree_hal_task_device_cast(base_device);
+
+ // Before trimming the block pools try to trim subsystems that may be holding
+ // on to blocks.
+ for (iree_host_size_t i = 0; i < device->queue_count; ++i) {
+ iree_hal_task_queue_trim(&device->queues[i]);
+ }
+ IREE_RETURN_IF_ERROR(iree_hal_allocator_trim(device->device_allocator));
+
iree_arena_block_pool_trim(&device->small_block_pool);
iree_arena_block_pool_trim(&device->large_block_pool);
- iree_task_executor_trim(device->executor);
- return iree_hal_allocator_trim(device->device_allocator);
+
+ return iree_ok_status();
}
static iree_status_t iree_hal_task_device_query_i64(
@@ -207,7 +216,10 @@
}
} else if (iree_string_view_equal(category, IREE_SV("hal.dispatch"))) {
if (iree_string_view_equal(key, IREE_SV("concurrency"))) {
- *out_value = (int64_t)iree_task_executor_worker_count(device->executor);
+ // NOTE: we always return the queue 0 worker count. This will be incorrect
+ // if there are multiple queues with differing queue counts but that's ok.
+ *out_value =
+ (int64_t)iree_task_executor_worker_count(device->queues[0].executor);
return iree_ok_status();
}
} else if (iree_string_view_equal(category, IREE_SV("hal.cpu"))) {
@@ -269,9 +281,17 @@
iree_hal_device_t* base_device, iree_string_view_t identifier,
iree_loop_t loop, iree_hal_executable_cache_t** out_executable_cache) {
iree_hal_task_device_t* device = iree_hal_task_device_cast(base_device);
+
+ // Sum up the total worker count across all queues so that the loaders can
+ // preallocate worker-specific storage.
+ iree_host_size_t total_worker_count = 0;
+ for (iree_host_size_t i = 0; i < device->queue_count; ++i) {
+ total_worker_count +=
+ iree_task_executor_worker_count(device->queues[i].executor);
+ }
+
return iree_hal_local_executable_cache_create(
- identifier, iree_task_executor_worker_count(device->executor),
- device->loader_count, device->loaders,
+ identifier, total_worker_count, device->loader_count, device->loaders,
iree_hal_device_host_allocator(base_device), out_executable_cache);
}
@@ -290,7 +310,7 @@
iree_hal_semaphore_t** out_semaphore) {
iree_hal_task_device_t* device = iree_hal_task_device_cast(base_device);
return iree_hal_task_semaphore_create(
- iree_task_executor_event_pool(device->executor), initial_value,
+ iree_hal_task_device_shared_event_pool(device), initial_value,
device->host_allocator, out_semaphore);
}
@@ -368,7 +388,7 @@
iree_hal_task_device_t* device = iree_hal_task_device_cast(base_device);
return iree_hal_task_semaphore_multi_wait(
wait_mode, semaphore_list, timeout,
- iree_task_executor_event_pool(device->executor),
+ iree_hal_task_device_shared_event_pool(device),
&device->large_block_pool);
}
diff --git a/runtime/src/iree/hal/drivers/local_task/task_device.h b/runtime/src/iree/hal/drivers/local_task/task_device.h
index 32ebbae..f1479dd 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_device.h
+++ b/runtime/src/iree/hal/drivers/local_task/task_device.h
@@ -19,11 +19,6 @@
// Parameters configuring an iree_hal_task_device_t.
// Must be initialized with iree_hal_task_device_params_initialize prior to use.
typedef struct iree_hal_task_device_params_t {
- // Number of queues exposed on the device.
- // Each queue acts as a separate synchronization scope where all work executes
- // concurrently unless prohibited by semaphores.
- iree_host_size_t queue_count;
-
// Total size of each block in the device shared block pool.
// Larger sizes will lower overhead and ensure the heap isn't hit for
// transient allocations while also increasing memory consumption.
@@ -34,13 +29,20 @@
void iree_hal_task_device_params_initialize(
iree_hal_task_device_params_t* out_params);
-// Creates a new iree/task/-based local CPU device that uses |executor| for
-// scheduling tasks. |loaders| is the set of executable loaders that are
-// available for loading in the device context.
+// Creates a new iree/task/-based local CPU device that uses task executors for
+// scheduling tasks.
+//
+// |queue_count| specifies the number of logical device queues exposed to
+// programs with one entry in |queue_executors| providing the scheduling scope.
+// Multiple queues may share the same executor. When multiple executors are used
+// queries for device capabilities will always report from the first.
+//
+// |loaders| is the set of executable loaders that are available for loading in
+// the device context. The loaders are retained for the lifetime of the device.
iree_status_t iree_hal_task_device_create(
iree_string_view_t identifier, const iree_hal_task_device_params_t* params,
- iree_task_executor_t* executor, iree_host_size_t loader_count,
- iree_hal_executable_loader_t** loaders,
+ iree_host_size_t queue_count, iree_task_executor_t* const* queue_executors,
+ iree_host_size_t loader_count, iree_hal_executable_loader_t** loaders,
iree_hal_allocator_t* device_allocator, iree_allocator_t host_allocator,
iree_hal_device_t** out_device);
diff --git a/runtime/src/iree/hal/drivers/local_task/task_driver.c b/runtime/src/iree/hal/drivers/local_task/task_driver.c
index 67be9b1..eb4c685 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_driver.c
+++ b/runtime/src/iree/hal/drivers/local_task/task_driver.c
@@ -21,7 +21,8 @@
iree_string_view_t identifier;
iree_hal_task_device_params_t default_params;
- iree_task_executor_t* executor;
+ iree_host_size_t queue_count;
+ iree_task_executor_t** queue_executors;
iree_host_size_t loader_count;
iree_hal_executable_loader_t* loaders[];
@@ -38,23 +39,32 @@
iree_status_t iree_hal_task_driver_create(
iree_string_view_t identifier,
const iree_hal_task_device_params_t* default_params,
- iree_task_executor_t* executor, iree_host_size_t loader_count,
- iree_hal_executable_loader_t** loaders,
+ iree_host_size_t queue_count, iree_task_executor_t* const* queue_executors,
+ iree_host_size_t loader_count, iree_hal_executable_loader_t** loaders,
iree_hal_allocator_t* device_allocator, iree_allocator_t host_allocator,
iree_hal_driver_t** out_driver) {
IREE_ASSERT_ARGUMENT(default_params);
+ IREE_ASSERT_ARGUMENT(!queue_count || queue_executors);
IREE_ASSERT_ARGUMENT(!loader_count || loaders);
IREE_ASSERT_ARGUMENT(device_allocator);
IREE_ASSERT_ARGUMENT(out_driver);
*out_driver = NULL;
IREE_TRACE_ZONE_BEGIN(z0);
+ // Allocation is for:
+ // - iree_hal_task_driver_t
+ // + loaders[] VLA
+ // - queue_executors[]
+ // - identifier string
iree_hal_task_driver_t* driver = NULL;
iree_host_size_t struct_size =
sizeof(*driver) + loader_count * sizeof(*driver->loaders);
- iree_host_size_t total_size = struct_size + identifier.size;
+ iree_host_size_t queue_executors_offset = struct_size;
+ struct_size += queue_count * sizeof(driver->queue_executors[0]);
+ iree_host_size_t identifier_offset = struct_size;
+ struct_size += identifier.size;
iree_status_t status =
- iree_allocator_malloc(host_allocator, total_size, (void**)&driver);
+ iree_allocator_malloc(host_allocator, struct_size, (void**)&driver);
if (iree_status_is_ok(status)) {
iree_hal_resource_initialize(&iree_hal_task_driver_vtable,
&driver->resource);
@@ -63,12 +73,17 @@
iree_hal_allocator_retain(device_allocator);
iree_string_view_append_to_buffer(identifier, &driver->identifier,
- (char*)driver + struct_size);
+ (char*)driver + identifier_offset);
memcpy(&driver->default_params, default_params,
sizeof(driver->default_params));
- driver->executor = executor;
- iree_task_executor_retain(driver->executor);
+ driver->queue_count = queue_count;
+ driver->queue_executors =
+ (iree_task_executor_t**)((uint8_t*)driver + queue_executors_offset);
+ for (iree_host_size_t i = 0; i < driver->queue_count; ++i) {
+ driver->queue_executors[i] = queue_executors[i];
+ iree_task_executor_retain(driver->queue_executors[i]);
+ }
driver->loader_count = loader_count;
for (iree_host_size_t i = 0; i < driver->loader_count; ++i) {
@@ -95,7 +110,9 @@
for (iree_host_size_t i = 0; i < driver->loader_count; ++i) {
iree_hal_executable_loader_release(driver->loaders[i]);
}
- iree_task_executor_release(driver->executor);
+ for (iree_host_size_t i = 0; i < driver->queue_count; ++i) {
+ iree_task_executor_release(driver->queue_executors[i]);
+ }
iree_allocator_free(host_allocator, driver);
IREE_TRACE_ZONE_END(z0);
@@ -135,9 +152,9 @@
iree_allocator_t host_allocator, iree_hal_device_t** out_device) {
iree_hal_task_driver_t* driver = iree_hal_task_driver_cast(base_driver);
return iree_hal_task_device_create(
- driver->identifier, &driver->default_params, driver->executor,
- driver->loader_count, driver->loaders, driver->device_allocator,
- host_allocator, out_device);
+ driver->identifier, &driver->default_params, driver->queue_count,
+ driver->queue_executors, driver->loader_count, driver->loaders,
+ driver->device_allocator, host_allocator, out_device);
}
static iree_status_t iree_hal_task_driver_create_device_by_path(
diff --git a/runtime/src/iree/hal/drivers/local_task/task_driver.h b/runtime/src/iree/hal/drivers/local_task/task_driver.h
index 47788f2..a04fc0a 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_driver.h
+++ b/runtime/src/iree/hal/drivers/local_task/task_driver.h
@@ -18,13 +18,20 @@
#endif // __cplusplus
// Creates a new iree/task/-based local CPU driver that creates devices sharing
-// the same |executor| for scheduling tasks. |loaders| is the set of executable
-// loaders that are available for loading in each device context.
+// the provided executors for scheduling tasks.
+//
+// |queue_count| specifies the number of logical device queues exposed to
+// programs with one entry in |queue_executors| providing the scheduling scope.
+// Multiple queues may share the same executor. When multiple executors are used
+// queries for device capabilities will always report from the first.
+//
+// |loaders| is the set of executable loaders that are available for loading in
+// the device context. The loaders are retained for the lifetime of the device.
iree_status_t iree_hal_task_driver_create(
iree_string_view_t identifier,
const iree_hal_task_device_params_t* default_params,
- iree_task_executor_t* executor, iree_host_size_t loader_count,
- iree_hal_executable_loader_t** loaders,
+ iree_host_size_t queue_count, iree_task_executor_t* const* queue_executors,
+ iree_host_size_t loader_count, iree_hal_executable_loader_t** loaders,
iree_hal_allocator_t* device_allocator, iree_allocator_t host_allocator,
iree_hal_driver_t** out_driver);
diff --git a/runtime/src/iree/hal/drivers/local_task/task_queue.c b/runtime/src/iree/hal/drivers/local_task/task_queue.c
index c956c0d..18e8cab 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_queue.c
+++ b/runtime/src/iree/hal/drivers/local_task/task_queue.c
@@ -446,6 +446,11 @@
IREE_TRACE_ZONE_END(z0);
}
+void iree_hal_task_queue_trim(iree_hal_task_queue_t* queue) {
+ IREE_ASSERT_ARGUMENT(queue);
+ iree_task_executor_trim(queue->executor);
+}
+
static iree_status_t iree_hal_task_queue_submit_batch(
iree_hal_task_queue_t* queue, const iree_hal_submission_batch_t* batch) {
// Task to retire the submission and free the transient memory allocated for
diff --git a/runtime/src/iree/hal/drivers/local_task/task_queue.h b/runtime/src/iree/hal/drivers/local_task/task_queue.h
index b46586f..7f30206 100644
--- a/runtime/src/iree/hal/drivers/local_task/task_queue.h
+++ b/runtime/src/iree/hal/drivers/local_task/task_queue.h
@@ -46,6 +46,8 @@
void iree_hal_task_queue_deinitialize(iree_hal_task_queue_t* queue);
+void iree_hal_task_queue_trim(iree_hal_task_queue_t* queue);
+
iree_status_t iree_hal_task_queue_submit(
iree_hal_task_queue_t* queue, iree_host_size_t batch_count,
const iree_hal_submission_batch_t* batches);
diff --git a/runtime/src/iree/task/executor.c b/runtime/src/iree/task/executor.c
index 2f2a6c7..13421dd 100644
--- a/runtime/src/iree/task/executor.c
+++ b/runtime/src/iree/task/executor.c
@@ -128,6 +128,7 @@
// Bring up the workers; the threads will be created here but be suspended
// (if the platform supports it) awaiting the first tasks getting scheduled.
if (iree_status_is_ok(status)) {
+ executor->worker_base_index = options.worker_base_index;
executor->worker_count = worker_count;
executor->workers =
(iree_task_worker_t*)((uint8_t*)executor + executor_base_size);
diff --git a/runtime/src/iree/task/executor.h b/runtime/src/iree/task/executor.h
index 7ddd46f..dc0eeb7 100644
--- a/runtime/src/iree/task/executor.h
+++ b/runtime/src/iree/task/executor.h
@@ -273,6 +273,11 @@
// Specifies the schedule mode used for worker and workload balancing.
iree_task_scheduling_mode_t scheduling_mode;
+ // Base value added to each executor-local worker index.
+ // This allows workers to uniquely identify themselves in multi-executor
+ // configurations.
+ iree_host_size_t worker_base_index;
+
// TODO(benvanik): add a scope_spin_ns to control wait-idle and other
// scope-related waits coming from outside of the task system.
diff --git a/runtime/src/iree/task/executor_impl.h b/runtime/src/iree/task/executor_impl.h
index 98b410c..db00add 100644
--- a/runtime/src/iree/task/executor_impl.h
+++ b/runtime/src/iree/task/executor_impl.h
@@ -112,6 +112,11 @@
// comment on worker_live_mask.
iree_atomic_task_affinity_set_t worker_idle_mask;
+ // Base value added to each executor-local worker index.
+ // This allows workers to uniquely identify themselves in multi-executor
+ // configurations.
+ iree_host_size_t worker_base_index;
+
// Specifies how many workers threads there are.
// For now this number is fixed per executor however if we wanted to enable
// live join/leave behavior we could change this to a registration mechanism.
diff --git a/runtime/src/iree/task/worker.c b/runtime/src/iree/task/worker.c
index 72e3377..a540de7 100644
--- a/runtime/src/iree/task/worker.c
+++ b/runtime/src/iree/task/worker.c
@@ -28,6 +28,7 @@
IREE_TRACE_ZONE_BEGIN(z0);
out_worker->executor = executor;
+ out_worker->worker_index = executor->worker_base_index + worker_index;
out_worker->worker_bit = iree_task_affinity_for_worker(worker_index);
out_worker->ideal_thread_affinity = topology_group->ideal_thread_affinity;
out_worker->constructive_sharing_mask =
@@ -182,8 +183,7 @@
case IREE_TASK_TYPE_DISPATCH_SHARD: {
iree_task_dispatch_shard_execute(
(iree_task_dispatch_shard_t*)task, worker->processor_id,
- iree_task_affinity_set_count_trailing_zeros(worker->worker_bit),
- worker->local_memory, pending_submission);
+ worker->worker_index, worker->local_memory, pending_submission);
break;
}
default:
diff --git a/runtime/src/iree/task/worker.h b/runtime/src/iree/task/worker.h
index 85675c3..72111cd 100644
--- a/runtime/src/iree/task/worker.h
+++ b/runtime/src/iree/task/worker.h
@@ -82,7 +82,11 @@
// pool. Executors always outlive the workers they own.
iree_task_executor_t* executor;
+ // Globally unique worker index (worker_base_index + local worker_index).
+ iree_host_size_t worker_index;
+
// Bit the worker represents in the various worker bitsets.
+ // Local to the executor owning the worker.
iree_task_affinity_set_t worker_bit;
// Ideal thread affinity for the worker thread.
diff --git a/samples/simple_embedding/README.md b/samples/simple_embedding/README.md
index a34a5f8..1e6641f 100644
--- a/samples/simple_embedding/README.md
+++ b/samples/simple_embedding/README.md
@@ -91,7 +91,8 @@
iree_string_view_t identifier = iree_make_cstring_view("local-task");
if (iree_status_is_ok(status)) {
// Create the device.
- status = iree_hal_task_device_create(identifier, ¶ms, executor,
+ status = iree_hal_task_device_create(identifier, ¶ms,
+ /*queue_count=*/1, &executor,
/*loader_count=*/1, &loader,
iree_allocator_system(), device);
```
diff --git a/samples/simple_embedding/device_embedded.c b/samples/simple_embedding/device_embedded.c
index 351ebad..e29f55f 100644
--- a/samples/simple_embedding/device_embedded.c
+++ b/samples/simple_embedding/device_embedded.c
@@ -45,10 +45,10 @@
// Create the device.
if (iree_status_is_ok(status)) {
- status = iree_hal_task_device_create(identifier, ¶ms, executor,
- /*loader_count=*/1, &loader,
- device_allocator, host_allocator,
- out_device);
+ status = iree_hal_task_device_create(
+ identifier, ¶ms, /*queue_count=*/1, &executor,
+ /*loader_count=*/1, &loader, device_allocator, host_allocator,
+ out_device);
}
iree_hal_allocator_release(device_allocator);