| // Copyright 2026 The IREE Authors |
| // |
| // Licensed under the Apache License v2.0 with LLVM Exceptions. |
| // See https://llvm.org/LICENSE.txt for license information. |
| // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception |
| |
| #include "iree/async/semaphore.h" |
| |
| #include <atomic> |
| #include <thread> |
| #include <vector> |
| |
| #include "iree/async/frontier.h" |
| #include "iree/async/proactor_platform.h" |
| #include "iree/base/api.h" |
| #include "iree/testing/gtest.h" |
| #include "iree/testing/status_matchers.h" |
| |
| namespace iree { |
| namespace { |
| |
| //===----------------------------------------------------------------------===// |
| // Test proactor (shared by all tests in this file) |
| //===----------------------------------------------------------------------===// |
| |
| // Returns a shared proactor for semaphore tests. Created on first use and |
| // released at process exit. Semaphores require a non-NULL proactor for async |
| // I/O integration (import_fence, export_fence). |
| static iree_async_proactor_t* test_proactor() { |
| static iree_async_proactor_t* proactor = nullptr; |
| if (!proactor) { |
| IREE_CHECK_OK(iree_async_proactor_create_platform( |
| iree_async_proactor_options_default(), iree_allocator_system(), |
| &proactor)); |
| atexit([] { |
| iree_async_proactor_release(proactor); |
| proactor = nullptr; |
| }); |
| } |
| return proactor; |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Test helpers |
| //===----------------------------------------------------------------------===// |
| |
| // Test callback that records invocations. |
| struct TimepointCallback { |
| std::atomic<int> call_count{0}; |
| std::atomic<uint64_t> last_value{0}; |
| iree_status_code_t last_status_code{IREE_STATUS_OK}; |
| |
| static void Invoke(void* user_data, |
| iree_async_semaphore_timepoint_t* timepoint, |
| iree_status_t status) { |
| auto* self = static_cast<TimepointCallback*>(user_data); |
| self->call_count++; |
| self->last_value = timepoint->minimum_value; |
| self->last_status_code = iree_status_code(status); |
| iree_status_free(status); |
| } |
| }; |
| |
| // Creates a frontier with the given entries. |
| class FrontierBuilder { |
| public: |
| FrontierBuilder() = default; |
| |
| FrontierBuilder& Add(iree_async_axis_t axis, uint64_t epoch) { |
| entries_.push_back({axis, epoch}); |
| return *this; |
| } |
| |
| // Returns a pointer to a stack-allocated frontier. |
| // Only valid until the next call or destruction. |
| iree_async_frontier_t* Build() { |
| // Sort by axis. |
| std::sort(entries_.begin(), entries_.end(), |
| [](const auto& a, const auto& b) { return a.axis < b.axis; }); |
| |
| // Allocate space for header + entries. |
| buffer_.resize(sizeof(iree_async_frontier_t) + |
| entries_.size() * sizeof(iree_async_frontier_entry_t)); |
| auto* frontier = reinterpret_cast<iree_async_frontier_t*>(buffer_.data()); |
| iree_async_frontier_initialize(frontier, |
| static_cast<uint8_t>(entries_.size())); |
| for (size_t i = 0; i < entries_.size(); ++i) { |
| frontier->entries[i] = entries_[i]; |
| } |
| return frontier; |
| } |
| |
| private: |
| std::vector<iree_async_frontier_entry_t> entries_; |
| std::vector<uint8_t> buffer_; |
| }; |
| |
| //===----------------------------------------------------------------------===// |
| // Create / Destroy |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, CreateWithInitialValue) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 42, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| EXPECT_EQ(iree_async_semaphore_query(sem), 42u); |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, CreateWithZeroInitialValue) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| EXPECT_EQ(iree_async_semaphore_query(sem), 0u); |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, RetainRelease) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| iree_async_semaphore_retain(sem); |
| iree_async_semaphore_release(sem); // First release. |
| iree_async_semaphore_release(sem); // Second release, destroys. |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Signal / Query |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, SignalAdvancesValue) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 5, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query(sem), 5u); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 10, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query(sem), 10u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, SignalLessThanCurrentFails) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_EXPECT_STATUS_IS(IREE_STATUS_INVALID_ARGUMENT, |
| iree_async_semaphore_signal(sem, 5, nullptr)); |
| |
| // Value unchanged. |
| EXPECT_EQ(iree_async_semaphore_query(sem), 10u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, SignalEqualToCurrentFails) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_EXPECT_STATUS_IS(IREE_STATUS_INVALID_ARGUMENT, |
| iree_async_semaphore_signal(sem, 10, nullptr)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, SignalLargeJump) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, UINT64_MAX, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query(sem), UINT64_MAX); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Timepoints |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, TimepointImmediatelySatisfied) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 5, &timepoint)); |
| |
| // Callback fired immediately (value 10 >= 5). |
| EXPECT_EQ(callback.call_count, 1); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_OK); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, TimepointPendsThenSatisfied) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| |
| // Not yet satisfied. |
| EXPECT_EQ(callback.call_count, 0); |
| |
| // Signal to 5 — still not satisfied. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 5, nullptr)); |
| EXPECT_EQ(callback.call_count, 0); |
| |
| // Signal to 10 — now satisfied. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 10, nullptr)); |
| EXPECT_EQ(callback.call_count, 1); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_OK); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, TimepointOvershoot) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_EQ(callback.call_count, 0); |
| |
| // Signal to 100 — overshoots the target, still fires. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 100, nullptr)); |
| EXPECT_EQ(callback.call_count, 1); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_OK); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, MultipleTimepoints) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback cb1, cb2, cb3; |
| iree_async_semaphore_timepoint_t tp1, tp2, tp3; |
| tp1.callback = TimepointCallback::Invoke; |
| tp1.user_data = &cb1; |
| tp2.callback = TimepointCallback::Invoke; |
| tp2.user_data = &cb2; |
| tp3.callback = TimepointCallback::Invoke; |
| tp3.user_data = &cb3; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 5, &tp1)); |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &tp2)); |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 15, &tp3)); |
| |
| EXPECT_EQ(cb1.call_count, 0); |
| EXPECT_EQ(cb2.call_count, 0); |
| EXPECT_EQ(cb3.call_count, 0); |
| |
| // Signal to 7 — only tp1 satisfied. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 7, nullptr)); |
| EXPECT_EQ(cb1.call_count, 1); |
| EXPECT_EQ(cb2.call_count, 0); |
| EXPECT_EQ(cb3.call_count, 0); |
| |
| // Signal to 15 — tp2 and tp3 satisfied. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 15, nullptr)); |
| EXPECT_EQ(cb1.call_count, 1); |
| EXPECT_EQ(cb2.call_count, 1); |
| EXPECT_EQ(cb3.call_count, 1); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Cancel |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, CancelPendingTimepoint) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_EQ(callback.call_count, 0); |
| |
| // Cancel before satisfaction — should succeed. |
| EXPECT_TRUE(iree_async_semaphore_cancel_timepoint(sem, &timepoint)); |
| |
| // Signal past the target — callback should NOT fire. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 100, nullptr)); |
| EXPECT_EQ(callback.call_count, 0); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, CancelAlreadyFiredIsNoOp) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 5, &timepoint)); |
| EXPECT_EQ(callback.call_count, 1); // Already fired. |
| |
| // Cancel after firing — returns false (already dispatched), no crash. |
| EXPECT_FALSE(iree_async_semaphore_cancel_timepoint(sem, &timepoint)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, DoubleCancelIsSafe) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_TRUE(iree_async_semaphore_cancel_timepoint(sem, &timepoint)); |
| EXPECT_FALSE(iree_async_semaphore_cancel_timepoint(sem, &timepoint)); |
| |
| EXPECT_EQ(callback.call_count, 0); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Failure |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, FailDispatchesPendingTimepoints) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback cb1, cb2; |
| iree_async_semaphore_timepoint_t tp1, tp2; |
| tp1.callback = TimepointCallback::Invoke; |
| tp1.user_data = &cb1; |
| tp2.callback = TimepointCallback::Invoke; |
| tp2.user_data = &cb2; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &tp1)); |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 20, &tp2)); |
| |
| EXPECT_EQ(cb1.call_count, 0); |
| EXPECT_EQ(cb2.call_count, 0); |
| |
| // Fail the semaphore. |
| iree_async_semaphore_fail( |
| sem, iree_make_status(IREE_STATUS_ABORTED, "test failure")); |
| |
| // Both timepoints fired with failure. |
| EXPECT_EQ(cb1.call_count, 1); |
| EXPECT_EQ(cb1.last_status_code, IREE_STATUS_ABORTED); |
| EXPECT_EQ(cb2.call_count, 1); |
| EXPECT_EQ(cb2.last_status_code, IREE_STATUS_ABORTED); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| // Verifies that timepoints are fully unlinked before their failure callback |
| // fires. A callback seeing non-NULL next/prev would have stale list pointers. |
| TEST(SemaphoreTest, FailUnlinksTimepointsBeforeCallback) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| // Callback that checks the timepoint is unlinked when it fires. |
| struct UnlinkChecker { |
| std::atomic<int> call_count{0}; |
| std::atomic<bool> was_unlinked{true}; |
| static void Invoke(void* user_data, |
| iree_async_semaphore_timepoint_t* timepoint, |
| iree_status_t status) { |
| auto* self = static_cast<UnlinkChecker*>(user_data); |
| if (timepoint->next != nullptr || timepoint->prev != nullptr) { |
| self->was_unlinked = false; |
| } |
| self->call_count++; |
| iree_status_free(status); |
| } |
| }; |
| |
| UnlinkChecker checker1, checker2; |
| iree_async_semaphore_timepoint_t tp1, tp2; |
| tp1.callback = UnlinkChecker::Invoke; |
| tp1.user_data = &checker1; |
| tp2.callback = UnlinkChecker::Invoke; |
| tp2.user_data = &checker2; |
| |
| // Register two timepoints so the list has actual linkage. |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &tp1)); |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 20, &tp2)); |
| |
| iree_async_semaphore_fail( |
| sem, iree_make_status(IREE_STATUS_ABORTED, "test failure")); |
| |
| EXPECT_EQ(checker1.call_count, 1); |
| EXPECT_EQ(checker2.call_count, 1); |
| EXPECT_TRUE(checker1.was_unlinked); |
| EXPECT_TRUE(checker2.was_unlinked); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, FailThenAcquireTimepoint) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| // Fail first. |
| iree_async_semaphore_fail(sem, |
| iree_make_status(IREE_STATUS_ABORTED, "failed")); |
| |
| // Then acquire timepoint — should fire immediately with failure. |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_EQ(callback.call_count, 1); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_ABORTED); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, FirstFailureWins) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| // First failure. |
| iree_async_semaphore_fail(sem, |
| iree_make_status(IREE_STATUS_ABORTED, "first")); |
| |
| // Second failure (should be ignored). |
| iree_async_semaphore_fail(sem, |
| iree_make_status(IREE_STATUS_CANCELLED, "second")); |
| |
| // Acquire timepoint — should get the first failure status. |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_ABORTED); // First failure. |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Destroy with pending timepoints |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, DestroyWithPendingTimepoints) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = TimepointCallback::Invoke; |
| timepoint.user_data = &callback; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem, 10, &timepoint)); |
| EXPECT_EQ(callback.call_count, 0); |
| |
| // Destroy while timepoint is pending. |
| iree_async_semaphore_release(sem); |
| |
| // Callback fired with CANCELLED. |
| EXPECT_EQ(callback.call_count, 1); |
| EXPECT_EQ(callback.last_status_code, IREE_STATUS_CANCELLED); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Frontier tracking |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, SignalWithFrontier) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| iree_async_axis_t axis_a = iree_async_axis_make_queue(1, 0, 0, 0); |
| iree_async_axis_t axis_b = iree_async_axis_make_queue(1, 0, 1, 0); |
| |
| FrontierBuilder fb; |
| IREE_ASSERT_OK( |
| iree_async_semaphore_signal(sem, 1, fb.Add(axis_a, 10).Build())); |
| |
| // Query the accumulated frontier. |
| uint8_t storage[sizeof(iree_async_frontier_t) + |
| 2 * sizeof(iree_async_frontier_entry_t)]; |
| auto* out = reinterpret_cast<iree_async_frontier_t*>(storage); |
| uint8_t count = iree_async_semaphore_query_frontier(sem, out, 2); |
| |
| EXPECT_EQ(count, 1u); |
| EXPECT_EQ(out->entry_count, 1u); |
| EXPECT_EQ(out->entries[0].axis, axis_a); |
| EXPECT_EQ(out->entries[0].epoch, 10u); |
| |
| // Signal with another frontier — should merge. |
| IREE_ASSERT_OK( |
| iree_async_semaphore_signal(sem, 2, fb.Add(axis_b, 5).Build())); |
| count = iree_async_semaphore_query_frontier(sem, out, 2); |
| |
| EXPECT_EQ(count, 2u); |
| // Entries are sorted by axis. |
| EXPECT_EQ(out->entries[0].axis, axis_a); |
| EXPECT_EQ(out->entries[0].epoch, 10u); |
| EXPECT_EQ(out->entries[1].axis, axis_b); |
| EXPECT_EQ(out->entries[1].epoch, 5u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, FrontierMergeMaxEpoch) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| iree_async_axis_t axis = iree_async_axis_make_queue(1, 0, 0, 0); |
| |
| FrontierBuilder fb1; |
| IREE_ASSERT_OK( |
| iree_async_semaphore_signal(sem, 1, fb1.Add(axis, 10).Build())); |
| |
| FrontierBuilder fb2; |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 2, fb2.Add(axis, 5).Build())); |
| |
| // Merged frontier should have max(10, 5) = 10. |
| uint8_t storage[sizeof(iree_async_frontier_t) + |
| sizeof(iree_async_frontier_entry_t)]; |
| auto* out = reinterpret_cast<iree_async_frontier_t*>(storage); |
| iree_async_semaphore_query_frontier(sem, out, 1); |
| |
| EXPECT_EQ(out->entries[0].epoch, 10u); // Max of 10 and 5. |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, SignalWithNullFrontier) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 1, nullptr)); |
| |
| // Frontier should be empty. |
| EXPECT_EQ(iree_async_semaphore_query_frontier(sem, nullptr, 0), 0u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Tainting |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, InitialValueIsUntainted) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 10u); |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 10)); |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 5)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, RegularSignalDoesNotAdvanceUntainted) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem, 10, nullptr)); |
| |
| // Regular signal doesn't advance untainted watermark. |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 0u); |
| EXPECT_TRUE(iree_async_semaphore_is_value_tainted(sem, 10)); |
| EXPECT_TRUE(iree_async_semaphore_is_value_tainted(sem, 1)); |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 0)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, SignalUntaintedAdvancesWatermark) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal_untainted(sem, 10, nullptr)); |
| |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 10u); |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 10)); |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 5)); |
| EXPECT_TRUE(iree_async_semaphore_is_value_tainted(sem, 11)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, MarkTaintedAbove) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| // Advance timeline and untainted together. |
| IREE_ASSERT_OK(iree_async_semaphore_signal_untainted(sem, 10, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 10u); |
| |
| // Mark tainted above 5 (reduces the untainted watermark). |
| iree_async_semaphore_mark_tainted_above(sem, 5); |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 5u); |
| |
| EXPECT_FALSE(iree_async_semaphore_is_value_tainted(sem, 5)); |
| EXPECT_TRUE(iree_async_semaphore_is_value_tainted(sem, 6)); |
| EXPECT_TRUE(iree_async_semaphore_is_value_tainted(sem, 10)); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, MarkTaintedAboveOnlyDecreases) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| // Start at untainted 10. |
| IREE_ASSERT_OK(iree_async_semaphore_signal_untainted(sem, 10, nullptr)); |
| |
| // Try to mark tainted above 15 — should have no effect (15 > 10). |
| iree_async_semaphore_mark_tainted_above(sem, 15); |
| EXPECT_EQ(iree_async_semaphore_query_untainted_value(sem), 10u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Multi-wait |
| //===----------------------------------------------------------------------===// |
| |
| TEST(MultiWaitTest, EmptyListSucceeds) { |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, nullptr, nullptr, 0, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| } |
| |
| TEST(MultiWaitTest, SingleSemaphoreAlreadySatisfied) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| uint64_t value = 5; |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, SingleSemaphoreExactValue) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| uint64_t value = 10; |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, SingleSemaphoreTimesOut) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| uint64_t value = 10; |
| iree_status_t status = iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_make_timeout_ms(1), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system()); |
| EXPECT_EQ(iree_status_code(status), IREE_STATUS_DEADLINE_EXCEEDED); |
| iree_status_free(status); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, SingleSemaphoreImmediateTimeout) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| uint64_t value = 10; |
| iree_status_t status = iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_immediate_timeout(), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system()); |
| EXPECT_EQ(iree_status_code(status), IREE_STATUS_DEADLINE_EXCEEDED); |
| iree_status_free(status); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, SingleSemaphoreImmediateTimeoutAlreadySatisfied) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| uint64_t value = 5; |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_immediate_timeout(), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, AllModeAllAlreadySatisfied) { |
| constexpr int kCount = 4; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {5, 10, 15, 20}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), values[i], |
| IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, iree_allocator_system(), |
| &sems[i])); |
| } |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AllModeSignaledFromThread) { |
| constexpr int kCount = 3; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 20, 30}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Signal all semaphores from a background thread. |
| std::thread signaler([&]() { |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[i], values[i], nullptr)); |
| } |
| }); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, |
| iree_make_timeout_ms(5000), IREE_ASYNC_WAIT_FLAG_NONE, |
| iree_allocator_system())); |
| |
| signaler.join(); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AnyModeFirstSatisfied) { |
| constexpr int kCount = 3; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 20, 30}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Signal only the first semaphore. |
| std::thread signaler([&]() { |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[0], 10, nullptr)); |
| }); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ANY, sems, values, kCount, |
| iree_make_timeout_ms(5000), IREE_ASYNC_WAIT_FLAG_NONE, |
| iree_allocator_system())); |
| |
| signaler.join(); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AnyModeMiddleSatisfied) { |
| constexpr int kCount = 3; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 20, 30}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Signal only the second semaphore. |
| std::thread signaler([&]() { |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[1], 20, nullptr)); |
| }); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ANY, sems, values, kCount, |
| iree_make_timeout_ms(5000), IREE_ASYNC_WAIT_FLAG_NONE, |
| iree_allocator_system())); |
| |
| signaler.join(); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AnyModeAlreadySatisfied) { |
| constexpr int kCount = 3; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 20, 30}; |
| |
| // Only the second semaphore is already satisfied. |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[0])); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 100, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[1])); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[2])); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ANY, sems, values, kCount, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, FailureAbortsWait) { |
| constexpr int kCount = 2; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 10}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Fail the second semaphore from a background thread. |
| std::thread fail_thread([&]() { |
| iree_async_semaphore_fail( |
| sems[1], iree_make_status(IREE_STATUS_INTERNAL, "gpu fault")); |
| }); |
| |
| iree_status_t status = iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, |
| iree_make_timeout_ms(5000), IREE_ASYNC_WAIT_FLAG_NONE, |
| iree_allocator_system()); |
| // multi_wait returns the actual failure code (not ABORTED) so the caller |
| // knows the specific error without needing a follow-up query. |
| EXPECT_EQ(iree_status_code(status), IREE_STATUS_INTERNAL); |
| iree_status_free(status); |
| |
| fail_thread.join(); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AlreadyFailedSemaphoreAbortsImmediately) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| iree_async_semaphore_fail( |
| sem, iree_make_status(IREE_STATUS_INTERNAL, "already failed")); |
| |
| uint64_t value = 10; |
| iree_status_t status = iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, &sem, &value, 1, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system()); |
| EXPECT_EQ(iree_status_code(status), IREE_STATUS_INTERNAL); |
| iree_status_free(status); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(MultiWaitTest, ImmediateTimeoutPollAnyOneSatisfied) { |
| constexpr int kCount = 3; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 10, 10}; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[0])); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 100, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[1])); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[2])); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ANY, sems, values, kCount, iree_immediate_timeout(), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, ImmediateTimeoutPollAllNoneSatisfied) { |
| constexpr int kCount = 2; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 10}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| iree_status_t status = iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, iree_immediate_timeout(), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system()); |
| EXPECT_EQ(iree_status_code(status), IREE_STATUS_DEADLINE_EXCEEDED); |
| iree_status_free(status); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, LargeCountUsesHeapAllocation) { |
| // More than IREE_ASYNC_MULTI_WAIT_INLINE_CAPACITY (8) to exercise the |
| // heap allocation path. |
| constexpr int kCount = 16; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| values[i] = 10; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 10, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, iree_make_timeout_ms(100), |
| IREE_ASYNC_WAIT_FLAG_NONE, iree_allocator_system())); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(MultiWaitTest, AllModeStaggeredSignals) { |
| // Tests that ALL mode correctly waits for the last semaphore. |
| constexpr int kCount = 4; |
| iree_async_semaphore_t* sems[kCount] = {}; |
| uint64_t values[kCount] = {10, 20, 30, 40}; |
| |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Signal semaphores one by one from a background thread. |
| std::thread signaler([&]() { |
| for (int i = 0; i < kCount; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[i], values[i], nullptr)); |
| } |
| }); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_multi_wait( |
| IREE_ASYNC_WAIT_MODE_ALL, sems, values, kCount, |
| iree_make_timeout_ms(5000), IREE_ASYNC_WAIT_FLAG_NONE, |
| iree_allocator_system())); |
| |
| // Verify all semaphores reached their values. |
| for (int i = 0; i < kCount; ++i) { |
| EXPECT_GE(iree_async_semaphore_query(sems[i]), values[i]); |
| } |
| |
| signaler.join(); |
| |
| for (int i = 0; i < kCount; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Semaphore chaining |
| //===----------------------------------------------------------------------===// |
| |
| // Callback that signals a different semaphore when the source fires. |
| // This exercises the key unlock-before-dispatch capability: callbacks can |
| // signal other semaphores without deadlock. |
| struct ChainingCallback { |
| iree_async_semaphore_t* target_semaphore; |
| uint64_t signal_value; |
| std::atomic<int> call_count{0}; |
| |
| static void Invoke(void* user_data, |
| iree_async_semaphore_timepoint_t* timepoint, |
| iree_status_t status) { |
| auto* self = static_cast<ChainingCallback*>(user_data); |
| self->call_count++; |
| if (iree_status_is_ok(status)) { |
| iree_status_t signal_status = iree_async_semaphore_signal( |
| self->target_semaphore, self->signal_value, nullptr); |
| iree_status_ignore(signal_status); |
| } |
| iree_status_free(status); |
| } |
| }; |
| |
| TEST(SemaphoreTest, ChainSignalFromCallback) { |
| iree_async_semaphore_t* sem_a = nullptr; |
| iree_async_semaphore_t* sem_b = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_a)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_b)); |
| |
| // When sem_a reaches 10, signal sem_b to 20. |
| ChainingCallback chain; |
| chain.target_semaphore = sem_b; |
| chain.signal_value = 20; |
| |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = ChainingCallback::Invoke; |
| timepoint.user_data = &chain; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem_a, 10, &timepoint)); |
| EXPECT_EQ(chain.call_count, 0); |
| EXPECT_EQ(iree_async_semaphore_query(sem_b), 0u); |
| |
| // Signal sem_a — should trigger the chain to sem_b. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem_a, 10, nullptr)); |
| |
| EXPECT_EQ(chain.call_count, 1); |
| EXPECT_EQ(iree_async_semaphore_query(sem_b), 20u); |
| |
| iree_async_semaphore_release(sem_a); |
| iree_async_semaphore_release(sem_b); |
| } |
| |
| TEST(SemaphoreTest, ChainThreeSemaphores) { |
| // A -> B -> C chain: A reaches 5 -> B signaled to 10 -> C signaled to 15. |
| iree_async_semaphore_t* sems[3] = {}; |
| for (int i = 0; i < 3; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // B -> C chain. |
| ChainingCallback chain_bc; |
| chain_bc.target_semaphore = sems[2]; |
| chain_bc.signal_value = 15; |
| iree_async_semaphore_timepoint_t tp_bc; |
| tp_bc.callback = ChainingCallback::Invoke; |
| tp_bc.user_data = &chain_bc; |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sems[1], 10, &tp_bc)); |
| |
| // A -> B chain. |
| ChainingCallback chain_ab; |
| chain_ab.target_semaphore = sems[1]; |
| chain_ab.signal_value = 10; |
| iree_async_semaphore_timepoint_t tp_ab; |
| tp_ab.callback = ChainingCallback::Invoke; |
| tp_ab.user_data = &chain_ab; |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sems[0], 5, &tp_ab)); |
| |
| // Signal A — the entire chain should propagate synchronously. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[0], 5, nullptr)); |
| |
| EXPECT_EQ(chain_ab.call_count, 1); |
| EXPECT_EQ(chain_bc.call_count, 1); |
| EXPECT_EQ(iree_async_semaphore_query(sems[0]), 5u); |
| EXPECT_EQ(iree_async_semaphore_query(sems[1]), 10u); |
| EXPECT_EQ(iree_async_semaphore_query(sems[2]), 15u); |
| |
| for (int i = 0; i < 3; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| // Callback that conditionally signals based on a flag. |
| // Exercises the pipeline pattern: VAD -> maybe encoder. |
| struct ConditionalChainingCallback { |
| iree_async_semaphore_t* target_semaphore; |
| uint64_t signal_value; |
| std::atomic<bool> should_signal{false}; |
| std::atomic<int> call_count{0}; |
| |
| static void Invoke(void* user_data, |
| iree_async_semaphore_timepoint_t* timepoint, |
| iree_status_t status) { |
| auto* self = static_cast<ConditionalChainingCallback*>(user_data); |
| self->call_count++; |
| if (iree_status_is_ok(status) && self->should_signal) { |
| iree_status_t signal_status = iree_async_semaphore_signal( |
| self->target_semaphore, self->signal_value, nullptr); |
| iree_status_ignore(signal_status); |
| } |
| iree_status_free(status); |
| } |
| }; |
| |
| TEST(SemaphoreTest, ConditionalChainSkipsSignal) { |
| iree_async_semaphore_t* sem_a = nullptr; |
| iree_async_semaphore_t* sem_b = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_a)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_b)); |
| |
| ConditionalChainingCallback chain; |
| chain.target_semaphore = sem_b; |
| chain.signal_value = 20; |
| chain.should_signal = false; // Don't signal. |
| |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = ConditionalChainingCallback::Invoke; |
| timepoint.user_data = &chain; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem_a, 10, &timepoint)); |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem_a, 10, nullptr)); |
| |
| EXPECT_EQ(chain.call_count, 1); |
| EXPECT_EQ(iree_async_semaphore_query(sem_b), 0u); // Not signaled. |
| |
| iree_async_semaphore_release(sem_a); |
| iree_async_semaphore_release(sem_b); |
| } |
| |
| TEST(SemaphoreTest, ConditionalChainSignals) { |
| iree_async_semaphore_t* sem_a = nullptr; |
| iree_async_semaphore_t* sem_b = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_a)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem_b)); |
| |
| ConditionalChainingCallback chain; |
| chain.target_semaphore = sem_b; |
| chain.signal_value = 20; |
| chain.should_signal = true; // Do signal. |
| |
| iree_async_semaphore_timepoint_t timepoint; |
| timepoint.callback = ConditionalChainingCallback::Invoke; |
| timepoint.user_data = &chain; |
| |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint(sem_a, 10, &timepoint)); |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sem_a, 10, nullptr)); |
| |
| EXPECT_EQ(chain.call_count, 1); |
| EXPECT_EQ(iree_async_semaphore_query(sem_b), 20u); // Signaled. |
| |
| iree_async_semaphore_release(sem_a); |
| iree_async_semaphore_release(sem_b); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Semaphore linking (zero-allocation relay) |
| //===----------------------------------------------------------------------===// |
| |
| TEST(LinkTest, BasicRelay) { |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| // Link: when source reaches 10, signal target to 20. |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target, 20, &link)); |
| |
| // Target should still be at 0. |
| EXPECT_EQ(iree_async_semaphore_query(target), 0u); |
| |
| // Signal source — target should be relayed to 20. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(source, 10, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query(target), 20u); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target); |
| } |
| |
| TEST(LinkTest, ImmediateRelay) { |
| // Source already past the link value — link fires synchronously. |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 100, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 50, target, 42, &link)); |
| |
| // Target should already be signaled. |
| EXPECT_EQ(iree_async_semaphore_query(target), 42u); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target); |
| } |
| |
| TEST(LinkTest, FailurePropagation) { |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target, 20, &link)); |
| |
| // Fail source — failure should propagate to target. |
| iree_async_semaphore_fail( |
| source, iree_make_status(IREE_STATUS_INTERNAL, "gpu fault")); |
| |
| // Target should be failed with the same status code. |
| iree_status_t target_failure = (iree_status_t)iree_atomic_load( |
| &target->failure_status, iree_memory_order_acquire); |
| EXPECT_FALSE(iree_status_is_ok(target_failure)); |
| EXPECT_EQ(iree_status_code(target_failure), IREE_STATUS_INTERNAL); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target); |
| } |
| |
| TEST(LinkTest, UnlinkBeforeFire) { |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target, 20, &link)); |
| |
| // Unlink before source reaches the value. |
| EXPECT_TRUE(iree_async_semaphore_unlink(&link)); |
| |
| // Signal source past the link value — target should NOT be signaled. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(source, 10, nullptr)); |
| EXPECT_EQ(iree_async_semaphore_query(target), 0u); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target); |
| } |
| |
| TEST(LinkTest, ThreeSemaphoreChain) { |
| // A → B → C: when A reaches 5, B signals to 10; when B reaches 10, C |
| // signals to 15. Same as ChainThreeSemaphores but with zero ceremony. |
| iree_async_semaphore_t* sems[3] = {}; |
| for (int i = 0; i < 3; ++i) { |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sems[i])); |
| } |
| |
| // Set up B→C first, then A→B. |
| iree_async_semaphore_link_t link_bc; |
| IREE_ASSERT_OK(iree_async_semaphore_link(sems[1], 10, sems[2], 15, &link_bc)); |
| |
| iree_async_semaphore_link_t link_ab; |
| IREE_ASSERT_OK(iree_async_semaphore_link(sems[0], 5, sems[1], 10, &link_ab)); |
| |
| // Signal A — entire chain should propagate synchronously. |
| IREE_ASSERT_OK(iree_async_semaphore_signal(sems[0], 5, nullptr)); |
| |
| EXPECT_EQ(iree_async_semaphore_query(sems[0]), 5u); |
| EXPECT_EQ(iree_async_semaphore_query(sems[1]), 10u); |
| EXPECT_EQ(iree_async_semaphore_query(sems[2]), 15u); |
| |
| for (int i = 0; i < 3; ++i) { |
| iree_async_semaphore_release(sems[i]); |
| } |
| } |
| |
| TEST(LinkTest, MultipleFanOut) { |
| // Single source fans out to two targets. |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target_a = nullptr; |
| iree_async_semaphore_t* target_b = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target_a)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target_b)); |
| |
| iree_async_semaphore_link_t link_a; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target_a, 100, &link_a)); |
| |
| iree_async_semaphore_link_t link_b; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target_b, 200, &link_b)); |
| |
| IREE_ASSERT_OK(iree_async_semaphore_signal(source, 10, nullptr)); |
| |
| EXPECT_EQ(iree_async_semaphore_query(target_a), 100u); |
| EXPECT_EQ(iree_async_semaphore_query(target_b), 200u); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target_a); |
| iree_async_semaphore_release(target_b); |
| } |
| |
| TEST(LinkTest, SourceDestroyPropagatesCancelled) { |
| // Destroying the source while a link is pending propagates CANCELLED |
| // to the target as a failure. |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target, 20, &link)); |
| |
| // Destroy source without signaling. |
| iree_async_semaphore_release(source); |
| |
| // Target should have been failed with CANCELLED. |
| iree_status_t target_failure = (iree_status_t)iree_atomic_load( |
| &target->failure_status, iree_memory_order_acquire); |
| EXPECT_FALSE(iree_status_is_ok(target_failure)); |
| EXPECT_EQ(iree_status_code(target_failure), IREE_STATUS_CANCELLED); |
| |
| iree_async_semaphore_release(target); |
| } |
| |
| TEST(LinkTest, AlreadyFailedSourcePropagatesImmediately) { |
| iree_async_semaphore_t* source = nullptr; |
| iree_async_semaphore_t* target = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &source)); |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &target)); |
| |
| // Fail source before creating the link. |
| iree_async_semaphore_fail(source, |
| iree_make_status(IREE_STATUS_ABORTED, "aborted")); |
| |
| iree_async_semaphore_link_t link; |
| IREE_ASSERT_OK(iree_async_semaphore_link(source, 10, target, 20, &link)); |
| |
| // Target should be failed immediately. |
| iree_status_t target_failure = (iree_status_t)iree_atomic_load( |
| &target->failure_status, iree_memory_order_acquire); |
| EXPECT_FALSE(iree_status_is_ok(target_failure)); |
| EXPECT_EQ(iree_status_code(target_failure), IREE_STATUS_ABORTED); |
| |
| iree_async_semaphore_release(source); |
| iree_async_semaphore_release(target); |
| } |
| |
| //===----------------------------------------------------------------------===// |
| // Concurrency |
| //===----------------------------------------------------------------------===// |
| |
| TEST(SemaphoreTest, ConcurrentSignals) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| constexpr int kNumThreads = 8; |
| constexpr int kSignalsPerThread = 1000; |
| |
| std::vector<std::thread> threads; |
| for (int t = 0; t < kNumThreads; ++t) { |
| threads.emplace_back([sem, t, kSignalsPerThread]() { |
| for (int i = 0; i < kSignalsPerThread; ++i) { |
| uint64_t value = t * kSignalsPerThread + i + 1; |
| // Try to signal — may fail if another thread signaled higher. |
| iree_status_t status = iree_async_semaphore_signal(sem, value, nullptr); |
| iree_status_free(status); // Ignore errors. |
| } |
| }); |
| } |
| |
| for (auto& thread : threads) { |
| thread.join(); |
| } |
| |
| // Final value should be somewhere in the range, and query should work. |
| uint64_t final_value = iree_async_semaphore_query(sem); |
| EXPECT_GT(final_value, 0u); |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| TEST(SemaphoreTest, ConcurrentTimepointAcquisitionAndSignal) { |
| iree_async_semaphore_t* sem = nullptr; |
| IREE_ASSERT_OK(iree_async_semaphore_create( |
| test_proactor(), 0, IREE_ASYNC_SEMAPHORE_DEFAULT_FRONTIER_CAPACITY, |
| iree_allocator_system(), &sem)); |
| |
| constexpr int kNumTimepoints = 100; |
| |
| struct ThreadData { |
| TimepointCallback callback; |
| iree_async_semaphore_timepoint_t timepoint; |
| }; |
| std::vector<ThreadData> data(kNumTimepoints); |
| |
| // Start threads that acquire timepoints. |
| std::vector<std::thread> acquire_threads; |
| for (int i = 0; i < kNumTimepoints; ++i) { |
| data[i].timepoint.callback = TimepointCallback::Invoke; |
| data[i].timepoint.user_data = &data[i].callback; |
| acquire_threads.emplace_back([sem, &data, i]() { |
| IREE_ASSERT_OK(iree_async_semaphore_acquire_timepoint( |
| sem, i + 1, &data[i].timepoint)); |
| }); |
| } |
| |
| // Signal thread. |
| std::thread signal_thread([sem, kNumTimepoints]() { |
| for (int i = 1; i <= kNumTimepoints; ++i) { |
| iree_status_t status = iree_async_semaphore_signal(sem, i, nullptr); |
| // May fail if we signal out of order, that's OK. |
| iree_status_free(status); |
| } |
| }); |
| |
| for (auto& t : acquire_threads) { |
| t.join(); |
| } |
| signal_thread.join(); |
| |
| // Ensure signal is past all timepoints. |
| iree_status_t status = |
| iree_async_semaphore_signal(sem, kNumTimepoints + 1, nullptr); |
| iree_status_free(status); |
| |
| // All timepoints should have fired. |
| for (int i = 0; i < kNumTimepoints; ++i) { |
| EXPECT_EQ(data[i].callback.call_count, 1); |
| } |
| |
| iree_async_semaphore_release(sem); |
| } |
| |
| } // namespace |
| } // namespace iree |