Rearranging iree_notification_t code to make it easier to follow. (#8877)

The mix of atomic/futex and pthreads operations made it hard to know
when the pthread mutex should be locked and when atomics were ok. The
disabled synchronization path that no-op'ed also obscured the intended
behavior. Now we have the divergent code split up more cleanly so that
we can more easily inspect it for correctness.

iree_notification_await is left shared across the implementations to
preserve program behavior (that the condition is not checked with
a lock held) though we could probably tweak that if we wanted. Erring
on the side of keeping things the same for now.

Integrates the fix from #8876 for a hang/deadlock when using pthreads.
diff --git a/iree/base/internal/synchronization.c b/iree/base/internal/synchronization.c
index a9c5f73..936238c 100644
--- a/iree/base/internal/synchronization.c
+++ b/iree/base/internal/synchronization.c
@@ -521,6 +521,125 @@
 // iree_notification_t
 //==============================================================================
 
+#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
+
+// No-op implementation that is only used when there is guaranteed to be one
+// thread at a time touching IREE-related code. It is unsafe to use in any
+// situation where either IREE or a user of IREE has multiple threads!
+
+void iree_notification_initialize(iree_notification_t* out_notification) {
+  memset(out_notification, 0, sizeof(*out_notification));
+}
+
+void iree_notification_deinitialize(iree_notification_t* notification) {}
+
+void iree_notification_post(iree_notification_t* notification, int32_t count) {}
+
+iree_wait_token_t iree_notification_prepare_wait(
+    iree_notification_t* notification) {
+  return (iree_wait_token_t)0;
+}
+
+bool iree_notification_commit_wait(iree_notification_t* notification,
+                                   iree_wait_token_t wait_token,
+                                   iree_time_t deadline_ns) {
+  return true;
+}
+
+void iree_notification_cancel_wait(iree_notification_t* notification) {}
+
+#elif !defined(IREE_PLATFORM_HAS_FUTEX)
+
+// Emulation of a lock-free futex-backed notification using pthreads.
+// This is a normal cond-var-like usage with support for our prepare/cancel API
+// so that users can still perform their own wait logic.
+
+void iree_notification_initialize(iree_notification_t* out_notification) {
+  memset(out_notification, 0, sizeof(*out_notification));
+  pthread_mutex_init(&out_notification->mutex, NULL);
+  pthread_cond_init(&out_notification->cond, NULL);
+}
+
+void iree_notification_deinitialize(iree_notification_t* notification) {
+  // Assert no more waiters (callers must tear down waiters first).
+  pthread_mutex_lock(&notification->mutex);
+  SYNC_ASSERT(notification->waiters == 0);
+  pthread_cond_destroy(&notification->cond);
+  pthread_mutex_unlock(&notification->mutex);
+  pthread_mutex_destroy(&notification->mutex);
+}
+
+void iree_notification_post(iree_notification_t* notification, int32_t count) {
+  pthread_mutex_lock(&notification->mutex);
+  ++notification->epoch;
+  if (notification->waiters > 0) {
+    // NOTE: we only do the signal if we have waiters - this avoids a syscall
+    // in cases where no one is actively listening.
+    if (count == IREE_ALL_WAITERS) {
+      pthread_cond_broadcast(&notification->cond);
+    } else {
+      for (int32_t i = 0; i < count; ++i) {
+        pthread_cond_signal(&notification->cond);
+      }
+    }
+  }
+  pthread_mutex_unlock(&notification->mutex);
+}
+
+iree_wait_token_t iree_notification_prepare_wait(
+    iree_notification_t* notification) {
+  pthread_mutex_lock(&notification->mutex);
+  iree_wait_token_t epoch = notification->epoch;
+  ++notification->waiters;
+  pthread_mutex_unlock(&notification->mutex);
+  return epoch;
+}
+
+bool iree_notification_commit_wait(iree_notification_t* notification,
+                                   iree_wait_token_t wait_token,
+                                   iree_time_t deadline_ns) {
+  struct timespec abs_ts = {
+      .tv_sec = (time_t)(deadline_ns / 1000000000ull),
+      .tv_nsec = (long)(deadline_ns % 1000000000ull),
+  };
+
+  pthread_mutex_lock(&notification->mutex);
+
+  // Spin until notified and the epoch increments from what we captured during
+  // iree_notification_prepare_wait.
+  bool result = true;
+  while (notification->epoch == wait_token) {
+    int ret = pthread_cond_timedwait(&notification->cond, &notification->mutex,
+                                     &abs_ts);
+    if (ret != 0) {
+      // Wait failed (timeout/etc); cancel the wait.
+      // This may happen in spurious wakes but that's fine - the caller is
+      // designed to handle looping again and may want the chance to do some
+      // bookkeeping while it has the thread.
+      result = false;
+      break;
+    }
+  }
+
+  // Remove us from the waiter list - the caller will need to reacquire a wait
+  // token if it wants to wait again.
+  SYNC_ASSERT(notification->waiters > 0);
+  --notification->waiters;
+
+  pthread_mutex_unlock(&notification->mutex);
+
+  return result;
+}
+
+void iree_notification_cancel_wait(iree_notification_t* notification) {
+  pthread_mutex_lock(&notification->mutex);
+  SYNC_ASSERT(notification->waiters > 0);
+  --notification->waiters;
+  pthread_mutex_unlock(&notification->mutex);
+}
+
+#else
+
 // The 64-bit value used to atomically read-modify-write (RMW) the state is
 // split in two and treated as independent 32-bit ints:
 //
@@ -553,12 +672,6 @@
 
 void iree_notification_initialize(iree_notification_t* out_notification) {
   memset(out_notification, 0, sizeof(*out_notification));
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
-  // No-op.
-#elif !defined(IREE_PLATFORM_HAS_FUTEX)
-  pthread_mutex_init(&out_notification->mutex, NULL);
-  pthread_cond_init(&out_notification->cond, NULL);
-#endif  // IREE_PLATFORM_HAS_FUTEX
 }
 
 void iree_notification_deinitialize(iree_notification_t* notification) {
@@ -566,14 +679,6 @@
   SYNC_ASSERT(
       (iree_atomic_load_int64(&notification->value, iree_memory_order_seq_cst) &
        IREE_NOTIFICATION_WAITER_MASK) == 0);
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
-  // No-op.
-#elif !defined(IREE_PLATFORM_HAS_FUTEX)
-  pthread_mutex_lock(&notification->mutex);
-  pthread_cond_destroy(&notification->cond);
-  pthread_mutex_unlock(&notification->mutex);
-  pthread_mutex_destroy(&notification->mutex);
-#endif  // IREE_PLATFORM_HAS_FUTEX
 }
 
 void iree_notification_post(iree_notification_t* notification, int32_t count) {
@@ -582,21 +687,7 @@
       iree_memory_order_acq_rel);
   // Ensure we have at least one waiter; wake up to |count| of them.
   if (IREE_UNLIKELY(previous_value & IREE_NOTIFICATION_WAITER_MASK)) {
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
-    // No-op.
-#elif defined(IREE_PLATFORM_HAS_FUTEX)
     iree_futex_wake(iree_notification_epoch_address(notification), count);
-#else
-    pthread_mutex_lock(&notification->mutex);
-    if (count == IREE_ALL_WAITERS) {
-      pthread_cond_broadcast(&notification->cond);
-    } else {
-      for (int32_t i = 0; i < count; ++i) {
-        pthread_cond_signal(&notification->cond);
-      }
-    }
-    pthread_mutex_unlock(&notification->mutex);
-#endif  // IREE_PLATFORM_HAS_FUTEX
   }
 }
 
@@ -618,24 +709,12 @@
   while ((iree_atomic_load_int64(&notification->value,
                                  iree_memory_order_acquire) >>
           IREE_NOTIFICATION_EPOCH_SHIFT) == wait_token) {
-    iree_status_code_t status_code = IREE_STATUS_OK;
-#if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
-    // TODO(benvanik): platform sleep? this spins.
-#elif defined(IREE_PLATFORM_HAS_FUTEX)
+    // NOTE: we do an abs->rel conversion within the loop so that we can account
+    // for spurious wakes that may cause us to loop several times with waits of
+    // various time inbetween.
     uint32_t timeout_ms = iree_absolute_deadline_to_timeout_ms(deadline_ns);
-    status_code = iree_futex_wait(iree_notification_epoch_address(notification),
-                                  wait_token, timeout_ms);
-#else
-    struct timespec abs_ts = {
-        .tv_sec = (time_t)(deadline_ns / 1000000000ull),
-        .tv_nsec = (long)(deadline_ns % 1000000000ull),
-    };
-    pthread_mutex_lock(&notification->mutex);
-    int ret = pthread_cond_timedwait(&notification->cond, &notification->mutex,
-                                     &abs_ts);
-    pthread_mutex_unlock(&notification->mutex);
-    status_code = ret == 0 ? IREE_STATUS_OK : IREE_STATUS_DEADLINE_EXCEEDED;
-#endif  // IREE_PLATFORM_HAS_FUTEX
+    iree_status_code_t status_code = iree_futex_wait(
+        iree_notification_epoch_address(notification), wait_token, timeout_ms);
     if (status_code != IREE_STATUS_OK) {
       result = false;
       break;
@@ -663,6 +742,8 @@
   SYNC_ASSERT((previous_value & IREE_NOTIFICATION_WAITER_MASK) != 0);
 }
 
+#endif  // DISABLED / HAS_FUTEX
+
 bool iree_notification_await(iree_notification_t* notification,
                              iree_condition_fn_t condition_fn,
                              void* condition_arg, iree_timeout_t timeout) {
diff --git a/iree/base/internal/synchronization.h b/iree/base/internal/synchronization.h
index 43086ad..45f3f59 100644
--- a/iree/base/internal/synchronization.h
+++ b/iree/base/internal/synchronization.h
@@ -286,24 +286,25 @@
 // https://github.com/concurrencykit/ck/blob/master/include/ck_ec.h
 typedef struct iree_notification_t {
 #if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
-  // Nothing required.
+  // Nothing required. Unused field to make compilers happy.
+  int reserved;
 #elif !defined(IREE_PLATFORM_HAS_FUTEX)
-  // No futex on darwin, so use mutex/condvar instead.
+  // No futex on darwin/when using TSAN, so use mutex/condvar instead.
   pthread_mutex_t mutex;
   pthread_cond_t cond;
-#endif  // IREE_PLATFORM_*
+  uint32_t epoch;
+  uint32_t waiters;
+#else
   iree_atomic_int64_t value;
+#endif  // IREE_PLATFORM_*
 } iree_notification_t;
 
 #if IREE_SYNCHRONIZATION_DISABLE_UNSAFE
 #define IREE_NOTIFICATION_INIT \
   { IREE_ATOMIC_VAR_INIT(0) }
 #elif !defined(IREE_PLATFORM_HAS_FUTEX)
-#define IREE_NOTIFICATION_INIT                           \
-  {                                                      \
-    PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, \
-        IREE_ATOMIC_VAR_INIT(0),                         \
-  }
+#define IREE_NOTIFICATION_INIT \
+  { PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER, 0, 0 }
 #else
 #define IREE_NOTIFICATION_INIT \
   { IREE_ATOMIC_VAR_INIT(0) }