
#pragma once
#include <limits>
#include <locks.hh>
#include <thread.h>
#include <utils.hh>

/**
 * A simple single-producer, single-consumer ring buffer that can be made
 * multi-producer and / or multi-consumer by adding locking.  The first two
 * template parameters are the type of the message and the size of the buffer
 * (in messages).  The next template arguments are the locks to use for
 * protecting the producer and consumer ends.
 *
 * In the uncontended (or single-producer, single-consumer) case where the
 * producer and consumer are balanced and the ring is neither full nor empty,
 * this will not use any cross-compartment calls.  When the queue is full,
 * producers will block on a futex.  When the queue is empty, consumers will
 * block on a futex.  On transitions to non-empty and non-full, the queue will
 * issue a `futex_wake` cross-compartment call. If locks are used, they may
 * introduce additional cross-compartment calls.
 *
 * Note: The size must be a power of two.
 */
template<typename Message,
         size_t BufferSize,
         typename PushLock = NoLock,
         typename PopLock  = NoLock>
class RingBuffer
{
	/// The ring buffer.
	std::array<Message, BufferSize> ring;
	/// The lock protecting the push end, if one exists.
	[[no_unique_address]] PushLock pushLock;
	/// The lock protecting the pop end, if one exists.
	[[no_unique_address]] PopLock popLock;
	/// Free-running producer counter.
	cheriot::atomic<uint32_t> producer;
	/// Free-running consumer counter.
	cheriot::atomic<uint32_t> consumer;

	static_assert(BufferSize < std::numeric_limits<uint32_t>::max() / 2,
	              "The buffer size cannot be more than half the range of the "
	              "counter types or overflow will give incorrect values");
	static_assert((1 << utils::log2<BufferSize>()) == BufferSize,
	              "Buffer size must be a power of two");

	/**
	 * Helper to check if the ring is full.  The counters are free running and
	 * so the displacement between them (wrapping for overflow) will be the
	 * number of elements in the buffer if the ring is full.
	 */
	bool is_full()
	{
		return producer - consumer == BufferSize;
	}

	/**
	 * Check if the queue is empty.  If the consumer counter has caught up with
	 * the producer then this ring is empty.
	 */
	bool is_empty()
	{
		return producer == consumer;
	}

	/**
	 * Helper to convert the counter value to an array index.  The counters are
	 * never reset and so this is simple modulo arithmetic.  Power of two sizes
	 * are more efficient because they become bit masks.
	 */
	size_t counter_to_index(size_t counter)
	{
		return counter % BufferSize;
	}

	public:
	/**
	 * Push an element into the ring.  The caller is responsible for ensuring
	 * that, if this contains pointers, they have the global permission and so
	 * the move operation will not fault.
	 */
	void push(Message &&message)
	{
		bool wasEmpty;
		{
			LockGuard g{pushLock};
			// Wait for the queue to not be full
			while (is_full())
			{
				consumer.wait(producer - BufferSize);
			}
			auto i   = counter_to_index(producer);
			ring[i]  = std::move(message);
			wasEmpty = is_empty();
			producer++;
		}
		if (wasEmpty)
		{
			producer.notify_all();
		}
	}

	/**
	 * Pop the next message from the queue.
	 */
	Message pop()
	{
		Message result;
		bool    wasFull;
		{
			LockGuard g{popLock};
			// Wait for the queue to not be empty
			while (is_empty())
			{
				producer.wait(consumer);
			}
			auto i  = counter_to_index(consumer);
			result  = std::move(ring[i]);
			wasFull = is_full();
			consumer++;
		}
		if (wasFull)
		{
			consumer.notify_all();
		}
		return result;
	}
};

// Make sure that locks consume no space if not used.
static_assert(sizeof(RingBuffer<uint32_t, 2>) == 16);
