Rearranging iree_notification_t code to make it easier to follow. (#8877)
The mix of atomic/futex and pthreads operations made it hard to know
when the pthread mutex should be locked and when atomics were ok. The
disabled synchronization path that no-op'ed also obscured the intended
behavior. Now we have the divergent code split up more cleanly so that
we can more easily inspect it for correctness.
iree_notification_await is left shared across the implementations to
preserve program behavior (that the condition is not checked with
a lock held) though we could probably tweak that if we wanted. Erring
on the side of keeping things the same for now.
Integrates the fix from #8876 for a hang/deadlock when using pthreads.
diff --git a/iree/base/internal/synchronization.c b/iree/base/internal/synchronization.c
index a9c5f73..936238c 100644
--- a/iree/base/internal/synchronization.c
+++ b/iree/base/internal/synchronization.c
@@ -521,6 +521,125 @@
// iree_notification_t
//==============================================================================
+#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
+
+// No-op implementation that is only used when there is guaranteed to be one
+// thread at a time touching IREE-related code. It is unsafe to use in any
+// situation where either IREE or a user of IREE has multiple threads!
+
+void iree_notification_initialize(iree_notification_t* out_notification) {
+ memset(out_notification, 0, sizeof(*out_notification));
+}
+
+void iree_notification_deinitialize(iree_notification_t* notification) {}
+
+void iree_notification_post(iree_notification_t* notification, int32_t count) {}
+
+iree_wait_token_t iree_notification_prepare_wait(
+ iree_notification_t* notification) {
+ return (iree_wait_token_t)0;
+}
+
+bool iree_notification_commit_wait(iree_notification_t* notification,
+ iree_wait_token_t wait_token,
+ iree_time_t deadline_ns) {
+ return true;
+}
+
+void iree_notification_cancel_wait(iree_notification_t* notification) {}
+
+#elif !defined(IREE_PLATFORM_HAS_FUTEX)
+
+// Emulation of a lock-free futex-backed notification using pthreads.
+// This is a normal cond-var-like usage with support for our prepare/cancel API
+// so that users can still perform their own wait logic.
+
+void iree_notification_initialize(iree_notification_t* out_notification) {
+ memset(out_notification, 0, sizeof(*out_notification));
+ pthread_mutex_init(&out_notification->mutex, NULL);
+ pthread_cond_init(&out_notification->cond, NULL);
+}
+
+void iree_notification_deinitialize(iree_notification_t* notification) {
+ // Assert no more waiters (callers must tear down waiters first).
+ pthread_mutex_lock(¬ification->mutex);
+ SYNC_ASSERT(notification->waiters == 0);
+ pthread_cond_destroy(¬ification->cond);
+ pthread_mutex_unlock(¬ification->mutex);
+ pthread_mutex_destroy(¬ification->mutex);
+}
+
+void iree_notification_post(iree_notification_t* notification, int32_t count) {
+ pthread_mutex_lock(¬ification->mutex);
+ ++notification->epoch;
+ if (notification->waiters > 0) {
+ // NOTE: we only do the signal if we have waiters - this avoids a syscall
+ // in cases where no one is actively listening.
+ if (count == IREE_ALL_WAITERS) {
+ pthread_cond_broadcast(¬ification->cond);
+ } else {
+ for (int32_t i = 0; i < count; ++i) {
+ pthread_cond_signal(¬ification->cond);
+ }
+ }
+ }
+ pthread_mutex_unlock(¬ification->mutex);
+}
+
+iree_wait_token_t iree_notification_prepare_wait(
+ iree_notification_t* notification) {
+ pthread_mutex_lock(¬ification->mutex);
+ iree_wait_token_t epoch = notification->epoch;
+ ++notification->waiters;
+ pthread_mutex_unlock(¬ification->mutex);
+ return epoch;
+}
+
+bool iree_notification_commit_wait(iree_notification_t* notification,
+ iree_wait_token_t wait_token,
+ iree_time_t deadline_ns) {
+ struct timespec abs_ts = {
+ .tv_sec = (time_t)(deadline_ns / 1000000000ull),
+ .tv_nsec = (long)(deadline_ns % 1000000000ull),
+ };
+
+ pthread_mutex_lock(¬ification->mutex);
+
+ // Spin until notified and the epoch increments from what we captured during
+ // iree_notification_prepare_wait.
+ bool result = true;
+ while (notification->epoch == wait_token) {
+ int ret = pthread_cond_timedwait(¬ification->cond, ¬ification->mutex,
+ &abs_ts);
+ if (ret != 0) {
+ // Wait failed (timeout/etc); cancel the wait.
+ // This may happen in spurious wakes but that's fine - the caller is
+ // designed to handle looping again and may want the chance to do some
+ // bookkeeping while it has the thread.
+ result = false;
+ break;
+ }
+ }
+
+ // Remove us from the waiter list - the caller will need to reacquire a wait
+ // token if it wants to wait again.
+ SYNC_ASSERT(notification->waiters > 0);
+ --notification->waiters;
+
+ pthread_mutex_unlock(¬ification->mutex);
+
+ return result;
+}
+
+void iree_notification_cancel_wait(iree_notification_t* notification) {
+ pthread_mutex_lock(¬ification->mutex);
+ SYNC_ASSERT(notification->waiters > 0);
+ --notification->waiters;
+ pthread_mutex_unlock(¬ification->mutex);
+}
+
+#else
+
// The 64-bit value used to atomically read-modify-write (RMW) the state is
// split in two and treated as independent 32-bit ints:
//
@@ -553,12 +672,6 @@
void iree_notification_initialize(iree_notification_t* out_notification) {
memset(out_notification, 0, sizeof(*out_notification));
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
- // No-op.
-#elif !defined(IREE_PLATFORM_HAS_FUTEX)
- pthread_mutex_init(&out_notification->mutex, NULL);
- pthread_cond_init(&out_notification->cond, NULL);
-#endif // IREE_PLATFORM_HAS_FUTEX
}
void iree_notification_deinitialize(iree_notification_t* notification) {
@@ -566,14 +679,6 @@
SYNC_ASSERT(
(iree_atomic_load_int64(¬ification->value, iree_memory_order_seq_cst) &
IREE_NOTIFICATION_WAITER_MASK) == 0);
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
- // No-op.
-#elif !defined(IREE_PLATFORM_HAS_FUTEX)
- pthread_mutex_lock(¬ification->mutex);
- pthread_cond_destroy(¬ification->cond);
- pthread_mutex_unlock(¬ification->mutex);
- pthread_mutex_destroy(¬ification->mutex);
-#endif // IREE_PLATFORM_HAS_FUTEX
}
void iree_notification_post(iree_notification_t* notification, int32_t count) {
@@ -582,21 +687,7 @@
iree_memory_order_acq_rel);
// Ensure we have at least one waiter; wake up to |count| of them.
if (IREE_UNLIKELY(previous_value & IREE_NOTIFICATION_WAITER_MASK)) {
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
- // No-op.
-#elif defined(IREE_PLATFORM_HAS_FUTEX)
iree_futex_wake(iree_notification_epoch_address(notification), count);
-#else
- pthread_mutex_lock(¬ification->mutex);
- if (count == IREE_ALL_WAITERS) {
- pthread_cond_broadcast(¬ification->cond);
- } else {
- for (int32_t i = 0; i < count; ++i) {
- pthread_cond_signal(¬ification->cond);
- }
- }
- pthread_mutex_unlock(¬ification->mutex);
-#endif // IREE_PLATFORM_HAS_FUTEX
}
}
@@ -618,24 +709,12 @@
while ((iree_atomic_load_int64(¬ification->value,
iree_memory_order_acquire) >>
IREE_NOTIFICATION_EPOCH_SHIFT) == wait_token) {
- iree_status_code_t status_code = IREE_STATUS_OK;
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
- // TODO(benvanik): platform sleep? this spins.
-#elif defined(IREE_PLATFORM_HAS_FUTEX)
+ // NOTE: we do an abs->rel conversion within the loop so that we can account
+ // for spurious wakes that may cause us to loop several times with waits of
+ // various time inbetween.
uint32_t timeout_ms = iree_absolute_deadline_to_timeout_ms(deadline_ns);
- status_code = iree_futex_wait(iree_notification_epoch_address(notification),
- wait_token, timeout_ms);
-#else
- struct timespec abs_ts = {
- .tv_sec = (time_t)(deadline_ns / 1000000000ull),
- .tv_nsec = (long)(deadline_ns % 1000000000ull),
- };
- pthread_mutex_lock(¬ification->mutex);
- int ret = pthread_cond_timedwait(¬ification->cond, ¬ification->mutex,
- &abs_ts);
- pthread_mutex_unlock(¬ification->mutex);
- status_code = ret == 0 ? IREE_STATUS_OK : IREE_STATUS_DEADLINE_EXCEEDED;
-#endif // IREE_PLATFORM_HAS_FUTEX
+ iree_status_code_t status_code = iree_futex_wait(
+ iree_notification_epoch_address(notification), wait_token, timeout_ms);
if (status_code != IREE_STATUS_OK) {
result = false;
break;
@@ -663,6 +742,8 @@
SYNC_ASSERT((previous_value & IREE_NOTIFICATION_WAITER_MASK) != 0);
}
+#endif // DISABLED / HAS_FUTEX
+
bool iree_notification_await(iree_notification_t* notification,
iree_condition_fn_t condition_fn,
void* condition_arg, iree_timeout_t timeout) {
diff --git a/iree/base/internal/synchronization.h b/iree/base/internal/synchronization.h
index 43086ad..45f3f59 100644
--- a/iree/base/internal/synchronization.h
+++ b/iree/base/internal/synchronization.h
@@ -286,24 +286,25 @@
// https://github.com/concurrencykit/ck/blob/master/include/ck_ec.h
typedef struct iree_notification_t {
#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
- // Nothing required.
+ // Nothing required. Unused field to make compilers happy.
+ int reserved;
#elif !defined(IREE_PLATFORM_HAS_FUTEX)
- // No futex on darwin, so use mutex/condvar instead.
+ // No futex on darwin/when using TSAN, so use mutex/condvar instead.
pthread_mutex_t mutex;
pthread_cond_t cond;
-#endif // IREE_PLATFORM_*
+ uint32_t epoch;
+ uint32_t waiters;
+#else
iree_atomic_int64_t value;
+#endif // IREE_PLATFORM_*
} iree_notification_t;
#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
#define IREE_NOTIFICATION_INIT \
{ IREE_ATOMIC_VAR_INIT(0) }
#elif !defined(IREE_PLATFORM_HAS_FUTEX)
-#define IREE_NOTIFICATION_INIT \
- { \
- PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \
- IREE_ATOMIC_VAR_INIT(0), \
- }
+#define IREE_NOTIFICATION_INIT \
+ { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0 }
#else
#define IREE_NOTIFICATION_INIT \
{ IREE_ATOMIC_VAR_INIT(0) }