blob: 122577909c0b7a55dba9c8e902dca2687ed7a284 [file] [log] [blame]
#pragma once
#include <limits>
#include <locks.hh>
#include <thread.h>
#include <utils.hh>
/**
* A simple single-producer, single-consumer ring buffer that can be made
* multi-producer and / or multi-consumer by adding locking. The first two
* template parameters are the type of the message and the size of the buffer
* (in messages). The next template arguments are the locks to use for
* protecting the producer and consumer ends.
*
* In the uncontended (or single-producer, single-consumer) case where the
* producer and consumer are balanced and the ring is neither full nor empty,
* this will not use any cross-compartment calls. When the queue is full,
* producers will block on a futex. When the queue is empty, consumers will
* block on a futex. On transitions to non-empty and non-full, the queue will
* issue a `futex_wake` cross-compartment call. If locks are used, they may
* introduce additional cross-compartment calls.
*
* Note: The size must be a power of two.
*/
template<typename Message,
size_t BufferSize,
typename PushLock = NoLock,
typename PopLock = NoLock>
class RingBuffer
{
/// The ring buffer.
std::array<Message, BufferSize> ring;
/// The lock protecting the push end, if one exists.
[[no_unique_address]] PushLock pushLock;
/// The lock protecting the pop end, if one exists.
[[no_unique_address]] PopLock popLock;
/// Free-running producer counter.
cheriot::atomic<uint32_t> producer;
/// Free-running consumer counter.
cheriot::atomic<uint32_t> consumer;
static_assert(BufferSize < std::numeric_limits<uint32_t>::max() / 2,
"The buffer size cannot be more than half the range of the "
"counter types or overflow will give incorrect values");
static_assert((1 << utils::log2<BufferSize>()) == BufferSize,
"Buffer size must be a power of two");
/**
* Helper to check if the ring is full. The counters are free running and
* so the displacement between them (wrapping for overflow) will be the
* number of elements in the buffer if the ring is full.
*/
bool is_full()
{
return producer - consumer == BufferSize;
}
/**
* Check if the queue is empty. If the consumer counter has caught up with
* the producer then this ring is empty.
*/
bool is_empty()
{
return producer == consumer;
}
/**
* Helper to convert the counter value to an array index. The counters are
* never reset and so this is simple modulo arithmetic. Power of two sizes
* are more efficient because they become bit masks.
*/
size_t counter_to_index(size_t counter)
{
return counter % BufferSize;
}
public:
/**
* Push an element into the ring. The caller is responsible for ensuring
* that, if this contains pointers, they have the global permission and so
* the move operation will not fault.
*/
void push(Message &&message)
{
bool wasEmpty;
{
LockGuard g{pushLock};
// Wait for the queue to not be full
while (is_full())
{
consumer.wait(producer - BufferSize);
}
auto i = counter_to_index(producer);
ring[i] = std::move(message);
wasEmpty = is_empty();
producer++;
}
if (wasEmpty)
{
producer.notify_all();
}
}
/**
* Pop the next message from the queue.
*/
Message pop()
{
Message result;
bool wasFull;
{
LockGuard g{popLock};
// Wait for the queue to not be empty
while (is_empty())
{
producer.wait(consumer);
}
auto i = counter_to_index(consumer);
result = std::move(ring[i]);
wasFull = is_full();
consumer++;
}
if (wasFull)
{
consumer.notify_all();
}
return result;
}
};
// Make sure that locks consume no space if not used.
static_assert(sizeof(RingBuffer<uint32_t, 2>) == 16);