blob: fe3e592a869ae3ed4863c82b35ca0d9a44f39f49 [file] [log] [blame]
"""Queue Module."""
import collections
import itertools
from typing import Optional, Sequence, TypeVar
import interfaces
# Declare type variable
T = TypeVar('T')
# pylint: disable-next=abstract-method
# __len__ and __iter__ are implemented by deque
class BufferedQueue(collections.deque[T], interfaces.ConsumableQueue[T]):
"""Queue model.
For the owner this is a buffered queue; for the consumer this is
ConsumableQueue. Elements that should not be visible from outside (e.g.
during the computation of next state) can be added to the queue using
`buffer(e)`. Call `flush` to make them visible.
"""
def __init__(self, size: Optional[int]) -> None:
"""Construct a queue.
Args:
size: the size of the queue. `None` (or -1) for infinite queue.
"""
super().__init__([])
self._size = size if size != -1 else None
self._buff = collections.deque()
def is_buffer_full(self) -> bool:
if self._size is not None:
return len(self) + len(self._buff) >= self._size
return False
def buffer(self, item) -> None:
self._buff.append(item)
def flush(self) -> None:
if self._size is None or len(self) + len(self._buff) <= self._size:
self.extend(self._buff)
self._buff.clear()
else:
for _ in range(self._size - len(self)):
self.append(self._buff.popleft())
def chain(self):
return itertools.chain(self, self._buff)
def pp_three_valued(self, vals: Sequence[str]) -> str:
if self.is_buffer_full():
# Full
return vals[2]
if any(self.chain()):
# Partial
return vals[1]
# Empty
return vals[0]
# Implements interfaces.ConsumableQueue
@property
def size(self) -> Optional[int]:
return self._size
# Implements interfaces.ConsumableQueue
def full(self) -> bool:
return self._size is not None and len(self) >= self._size
# Implements interfaces.ConsumableQueue
def dequeue(self) -> Optional[T]:
return self.popleft()
# Implements interfaces.ConsumableQueue
def peek(self) -> Optional[T]:
return self[0] if self else None
# Implements interfaces.ConsumableQueue
# The following is useless, but without it pylint will issue an error
# (E0110) for every instantiation of BufferedQueue.
# pylint: disable-next=useless-parent-delegation
def __len__(self) -> int:
return super().__len__()
# Implements interfaces.ConsumableQueue
# The following is useless, but without it pylint will issue an error
# (E0110) for every instantiation of BufferedQueue.
# pylint: disable-next=useless-parent-delegation
def __iter__(self):
return super().__iter__()