|  |  | 
|  | #pragma once | 
|  | #include <limits> | 
|  | #include <locks.hh> | 
|  | #include <thread.h> | 
|  | #include <utils.hh> | 
|  |  | 
|  | /** | 
|  | * A simple single-producer, single-consumer ring buffer that can be made | 
|  | * multi-producer and / or multi-consumer by adding locking.  The first two | 
|  | * template parameters are the type of the message and the size of the buffer | 
|  | * (in messages).  The next template arguments are the locks to use for | 
|  | * protecting the producer and consumer ends. | 
|  | * | 
|  | * In the uncontended (or single-producer, single-consumer) case where the | 
|  | * producer and consumer are balanced and the ring is neither full nor empty, | 
|  | * this will not use any cross-compartment calls.  When the queue is full, | 
|  | * producers will block on a futex.  When the queue is empty, consumers will | 
|  | * block on a futex.  On transitions to non-empty and non-full, the queue will | 
|  | * issue a `futex_wake` cross-compartment call. If locks are used, they may | 
|  | * introduce additional cross-compartment calls. | 
|  | * | 
|  | * Note: The size must be a power of two. | 
|  | */ | 
|  | template<typename Message, | 
|  | size_t BufferSize, | 
|  | typename PushLock = NoLock, | 
|  | typename PopLock  = NoLock> | 
|  | class RingBuffer | 
|  | { | 
|  | /// The ring buffer. | 
|  | std::array<Message, BufferSize> ring; | 
|  | /// The lock protecting the push end, if one exists. | 
|  | [[no_unique_address]] PushLock pushLock; | 
|  | /// The lock protecting the pop end, if one exists. | 
|  | [[no_unique_address]] PopLock popLock; | 
|  | /// Free-running producer counter. | 
|  | cheriot::atomic<uint32_t> producer; | 
|  | /// Free-running consumer counter. | 
|  | cheriot::atomic<uint32_t> consumer; | 
|  |  | 
|  | static_assert(BufferSize < std::numeric_limits<uint32_t>::max() / 2, | 
|  | "The buffer size cannot be more than half the range of the " | 
|  | "counter types or overflow will give incorrect values"); | 
|  | static_assert((1 << utils::log2<BufferSize>()) == BufferSize, | 
|  | "Buffer size must be a power of two"); | 
|  |  | 
|  | /** | 
|  | * Helper to check if the ring is full.  The counters are free running and | 
|  | * so the displacement between them (wrapping for overflow) will be the | 
|  | * number of elements in the buffer if the ring is full. | 
|  | */ | 
|  | bool is_full() | 
|  | { | 
|  | return producer - consumer == BufferSize; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Check if the queue is empty.  If the consumer counter has caught up with | 
|  | * the producer then this ring is empty. | 
|  | */ | 
|  | bool is_empty() | 
|  | { | 
|  | return producer == consumer; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Helper to convert the counter value to an array index.  The counters are | 
|  | * never reset and so this is simple modulo arithmetic.  Power of two sizes | 
|  | * are more efficient because they become bit masks. | 
|  | */ | 
|  | size_t counter_to_index(size_t counter) | 
|  | { | 
|  | return counter % BufferSize; | 
|  | } | 
|  |  | 
|  | public: | 
|  | /** | 
|  | * Push an element into the ring.  The caller is responsible for ensuring | 
|  | * that, if this contains pointers, they have the global permission and so | 
|  | * the move operation will not fault. | 
|  | */ | 
|  | void push(Message &&message) | 
|  | { | 
|  | bool wasEmpty; | 
|  | { | 
|  | LockGuard g{pushLock}; | 
|  | // Wait for the queue to not be full | 
|  | while (is_full()) | 
|  | { | 
|  | consumer.wait(producer - BufferSize); | 
|  | } | 
|  | auto i   = counter_to_index(producer); | 
|  | ring[i]  = std::move(message); | 
|  | wasEmpty = is_empty(); | 
|  | producer++; | 
|  | } | 
|  | if (wasEmpty) | 
|  | { | 
|  | producer.notify_all(); | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Pop the next message from the queue. | 
|  | */ | 
|  | Message pop() | 
|  | { | 
|  | Message result; | 
|  | bool    wasFull; | 
|  | { | 
|  | LockGuard g{popLock}; | 
|  | // Wait for the queue to not be empty | 
|  | while (is_empty()) | 
|  | { | 
|  | producer.wait(consumer); | 
|  | } | 
|  | auto i  = counter_to_index(consumer); | 
|  | result  = std::move(ring[i]); | 
|  | wasFull = is_full(); | 
|  | consumer++; | 
|  | } | 
|  | if (wasFull) | 
|  | { | 
|  | consumer.notify_all(); | 
|  | } | 
|  | return result; | 
|  | } | 
|  | }; | 
|  |  | 
|  | // Make sure that locks consume no space if not used. | 
|  | static_assert(sizeof(RingBuffer<uint32_t, 2>) == 16); |