| // Copyright Microsoft and CHERIoT Contributors. |
| // SPDX-License-Identifier: MIT |
| |
| #include <cstdint> |
| #define TEST_NAME "Thread pool" |
| #include "tests.hh" |
| #include <cheri.hh> |
| #include <cheriot-atomic.hh> |
| #include <switcher.h> |
| #include <thread.h> |
| #include <thread_pool.h> |
| |
| int counter; |
| |
| using CHERI::with_interrupts_disabled; |
| using namespace thread_pool; |
| |
| cheriot::atomic<bool> errorHandled = false; |
| cheriot::atomic<bool> interruptStarted = false; |
| cheriot::atomic<int> interruptThreadNumber; |
| |
| extern "C" ErrorRecoveryBehaviour |
| compartment_error_handler(ErrorState *frame, size_t mcause, size_t mtval) |
| { |
| debug_log("Thread {} error handler invoked with mcause {}. PCC: {}", |
| thread_id_get(), |
| mcause, |
| frame->pcc); |
| if (mcause != 25) |
| { |
| return ErrorRecoveryBehaviour::ForceUnwind; |
| } |
| if (thread_id_get() != interruptThreadNumber) |
| { |
| debug_log( |
| "Explicit thread interrupt delivered on the wrong thread (thread {}, " |
| "expected {})", |
| thread_id_get(), |
| interruptThreadNumber.load()); |
| return ErrorRecoveryBehaviour::ForceUnwind; |
| } |
| errorHandled = true; |
| debug_log("Expected software interrupt, installing context"); |
| return ErrorRecoveryBehaviour::InstallContext; |
| } |
| |
| int test_thread_pool() |
| { |
| // We can't share stack variables, so create a heap allocation that we can |
| // capture as an explicit pointer. |
| int *heapInt = new (malloc(sizeof(int))) int(0); |
| TEST(thread_id_get() == 1, |
| "Thread id of main thread should be 1, is {}", |
| thread_id_get()); |
| // Run a simple stateless callback that increments a global in the thread |
| // pool. This demonstrates that we can correctly capture a stateless |
| // function and pass it to the worker thread. |
| async([]() { |
| with_interrupts_disabled([]() { |
| counter++; |
| debug_log("Calling stateless function from thread pool"); |
| }); |
| }); |
| async([=]() { |
| with_interrupts_disabled([=]() { |
| debug_log( |
| "Calling stateful function from thread pool with {} captured", |
| heapInt); |
| counter++; |
| (*heapInt)++; |
| }); |
| }); |
| debug_log("Counter: {}", counter); |
| debug_log("heapInt: {}", *heapInt); |
| int sleeps = 0; |
| while (counter < 2) |
| { |
| Timeout t{1}; |
| thread_sleep(&t); |
| TEST(sleeps < 100, "Gave up after too many sleeps"); |
| } |
| debug_log("Yielded {} times for the thread pool to run our jobs", sleeps); |
| TEST(counter == 2, "Counter is {}, should be 2", counter); |
| TEST(*heapInt == 1, "Heap-allocated integer is {}, should be 1", *heapInt); |
| debug_log("Freeing heap int: {}", heapInt); |
| free(heapInt); |
| |
| async([]() { |
| auto fast = thread_id_get(); |
| auto slow = thread_id_get(); |
| TEST(fast == slow, |
| "Thread ID is different in fast ({}) and slow ({}) accessors", |
| fast, |
| slow); |
| TEST(fast != 1, "Thread ID for thread pool thread should not be 1"); |
| }); |
| |
| CHERI::Capability<void> mainThread{switcher_current_thread()}; |
| TEST(mainThread.is_sealed(), "Thread should be sealed: {}", mainThread); |
| TEST(mainThread.type() == 10, |
| "Thread should be sealed with otype 10: {}", |
| mainThread); |
| TEST(!switcher_interrupt_thread(mainThread), |
| "Interrupting the current thread should fail"); |
| TEST(!switcher_interrupt_thread(nullptr), |
| "Interrupting null thread should fail"); |
| TEST(!switcher_interrupt_thread(&sleeps), |
| "Interrupting invalid thread should fail"); |
| |
| static void *asyncThread; |
| static bool interrupted; |
| async([=]() mutable { |
| interruptThreadNumber = thread_id_get(); |
| asyncThread = switcher_current_thread(); |
| while (!interruptStarted) |
| { |
| yield(); |
| } |
| TEST(errorHandled, |
| "Worker thread was not interrupted from higher-priority one"); |
| interrupted = true; |
| }); |
| |
| for (int i = 0; i < 3; i++) |
| { |
| if (!asyncThread) |
| { |
| Timeout t{1}; |
| thread_sleep(&t); |
| } |
| } |
| TEST(asyncThread, "Worker thread did not provide thread pointer"); |
| debug_log("Interrupting other thread"); |
| bool ret = switcher_interrupt_thread(asyncThread); |
| interruptStarted = true; |
| TEST(ret, "Interrupting worker thread failed: {}", ret); |
| Timeout t{3}; |
| thread_sleep(&t); |
| TEST(interrupted, "Worker thread was not interrupted"); |
| return 0; |
| static cheriot::atomic<uint32_t> barrier{3}; |
| auto barrierWait = []() { |
| uint32_t value = barrier--; |
| if (value == 0) |
| { |
| barrier.notify_all(); |
| } |
| while (value != 0) |
| { |
| barrier.wait(value); |
| value = barrier; |
| } |
| }; |
| // Make sure that the thread pool threads have both finished. |
| async(barrierWait); |
| async(barrierWait); |
| barrierWait(); |
| debug_log("Thread pool quiesced"); |
| return 0; |
| } |