fetch and sched: models for fetch and dispatch queues Also includes the TBM model of a queue, and a class for reading traces. Change-Id: I9dbcb5fe94611935e1f0b61d3f2da38d522c71a5
diff --git a/buffered_queue.py b/buffered_queue.py new file mode 100644 index 0000000..fe3e592 --- /dev/null +++ b/buffered_queue.py
@@ -0,0 +1,96 @@ +"""Queue Module.""" + +import collections +import itertools +from typing import Optional, Sequence, TypeVar + +import interfaces + + +# Declare type variable +T = TypeVar('T') + + +# pylint: disable-next=abstract-method +# __len__ and __iter__ are implemented by deque +class BufferedQueue(collections.deque[T], interfaces.ConsumableQueue[T]): + """Queue model. + + For the owner this is a buffered queue; for the consumer this is + ConsumableQueue. Elements that should not be visible from outside (e.g. + during the computation of next state) can be added to the queue using + `buffer(e)`. Call `flush` to make them visible. + """ + + def __init__(self, size: Optional[int]) -> None: + """Construct a queue. + + Args: + size: the size of the queue. `None` (or -1) for infinite queue. + """ + super().__init__([]) + self._size = size if size != -1 else None + self._buff = collections.deque() + + def is_buffer_full(self) -> bool: + if self._size is not None: + return len(self) + len(self._buff) >= self._size + + return False + + def buffer(self, item) -> None: + self._buff.append(item) + + def flush(self) -> None: + if self._size is None or len(self) + len(self._buff) <= self._size: + self.extend(self._buff) + self._buff.clear() + else: + for _ in range(self._size - len(self)): + self.append(self._buff.popleft()) + + def chain(self): + return itertools.chain(self, self._buff) + + def pp_three_valued(self, vals: Sequence[str]) -> str: + if self.is_buffer_full(): + # Full + return vals[2] + + if any(self.chain()): + # Partial + return vals[1] + + # Empty + return vals[0] + + # Implements interfaces.ConsumableQueue + @property + def size(self) -> Optional[int]: + return self._size + + # Implements interfaces.ConsumableQueue + def full(self) -> bool: + return self._size is not None and len(self) >= self._size + + # Implements interfaces.ConsumableQueue + def dequeue(self) -> Optional[T]: + return self.popleft() + + # Implements interfaces.ConsumableQueue + def peek(self) -> Optional[T]: + return self[0] if self else None + + # Implements interfaces.ConsumableQueue + # The following is useless, but without it pylint will issue an error + # (E0110) for every instantiation of BufferedQueue. + # pylint: disable-next=useless-parent-delegation + def __len__(self) -> int: + return super().__len__() + + # Implements interfaces.ConsumableQueue + # The following is useless, but without it pylint will issue an error + # (E0110) for every instantiation of BufferedQueue. + # pylint: disable-next=useless-parent-delegation + def __iter__(self): + return super().__iter__()
diff --git a/fetch_unit.py b/fetch_unit.py new file mode 100644 index 0000000..383e5ae --- /dev/null +++ b/fetch_unit.py
@@ -0,0 +1,216 @@ +"""Fetch Unit module.""" + +from typing import Any, Dict, Optional, Sequence + +from buffered_queue import BufferedQueue +import counter +from counter import Counter +from functional_trace import FunctionalTrace +import interfaces + +class NextFetch: + """Hold the sate of next-addr fetching. + + `addr` is the memory location from which the next batch of instructions + should be fetched from. This can be None if there are no more instructions + in the trace, or when the next instruction (after a branch) is not the + normal +4 bytes successor. + """ + + def __init__(self) -> None: + self._addr = None + self._stall = False + + @property + def addr(self) -> Optional[int]: + return self._addr + + @addr.setter + def addr(self, val: int) -> None: + self._addr = val + self._stall = False + + @property + def stall(self) -> bool: + return self._stall + + @stall.setter + def stall(self, val: bool) -> None: + self._addr = None + self._stall = val + + +class FetchUnit(interfaces.FetchUnit): + def __init__(self, config: Dict[str, Any], trace: FunctionalTrace): + super().__init__("FE") + + self._trace = trace + self._branch_prediction = config["branch_prediction"] + self._fetch_rate = config["fetch_rate"] + + + ## Current state + # The queue from which `SchedUnit` reads. + self._queue = BufferedQueue(config.get("fetch_queue_size")) + + # The next address to fetch a batch from, or indicate a stall (waiting + # for branch target to be computed). + self._next_fetch_addr = NextFetch() + + ## Next state + self._next_fetch_stall = None + + # Implements interfaces.FetchUnit + @property + def queue(self) -> interfaces.ConsumableQueue: + return self._queue + + # Implements interfaces.FetchUnit + def eof(self) -> bool: + return self._trace.eof() + + # Implements interfaces.FetchUnit + def pending(self) -> int: + return len(self._queue) + + # Implements interfaces.FetchUnit + def reset(self, cntr: Counter) -> None: + super().reset(cntr) + # TODO(sflur): implement proper reset + cntr.stalls[self.name] = 0 + cntr.utilizations[self.name] = counter.Utilization(self.queue.size) + + # Implements interfaces.FetchUnit + def tick(self, cntr: Counter) -> None: + super().tick(cntr) + + if self._trace.eof(): + self.log("can't fetch new instructions:" + " no more instructions in trace.") + return + + if (self._queue.size is not None and + len(self._queue) + self._fetch_rate > + self._queue.size): + self.log("can't fetch new instructions:" + " not enough room in the fetch queue.") + cntr.stalls[self.name] += 1 + return + + # TODO(sflur): make `inst_size` configurable. + inst_size = 4 # bytes + + if self._next_fetch_addr.addr is not None: + if self._trace.next_addr() != self._next_fetch_addr.addr: + if self._branch_prediction == "none": + self.log( + "generating memory accesses for" + f" {self._next_fetch_addr.addr} (but next trace" + f" instruction is at {self._trace.next_addr()})") + + # TODO(sflur): generate memory accesses for the whole batch. + + self._next_fetch_addr.stall = True + return + + assert self._branch_prediction == "perfect", ( + # pylint: disable-next=consider-using-f-string + "Error: Unknown branch prediction option %s" % + self._branch_prediction) + + elif self._next_fetch_addr.stall: + self.log("stalling") + cntr.stalls[self.name] += 1 + return + + # The first address of the current batch. After a branch this might not + # be properly aligned. We should still generate memory accesses for the + # missing lower bytes! + fetch_addr = self._trace.next_addr() + # TODO(sflur): generate memory accesses for the whole batch. + + # TODO(sflur): handle compressed instructions, and misaligned + # instructions? + + # Set the address for the next batch, and force it to be aligned. + next_addr = fetch_addr + (inst_size * self._fetch_rate) + next_addr -= next_addr % (inst_size * self._fetch_rate) + self._next_fetch_addr.addr = next_addr + + # Buffer the current batch of instructions. + for fetch_addr in range(fetch_addr, next_addr, inst_size): + if fetch_addr != self._trace.next_addr(): + # This instruction was not executed in the functional trace, + # hence it's not in the trace. But, a uarch would fetch this + # instruction from memory, and it would occupy a place in the + # queue, so we simulate that (with a None). + self._queue.buffer(None) + continue + + inst = self._trace.dequeue() + if inst is None: + self.log("no more instructions in trace") + break + + self.log(inst.mnemonic + " from mem/trace") + self._queue.buffer(inst) + + if (not inst.is_branch and + inst.addr + inst_size != self._trace.next_addr()): + # This could happen when an exception is taken + # TODO(sflur): what do we need to do to handle an exception? + self.log("next fetch is an exception handler?") + self._next_fetch_addr.addr = self._trace.next_addr() + + # We count all the instructions a uarch would actually fetch. + cntr.utilizations[self.name].count += self._fetch_rate + + # Implements interfaces.FetchUnit + def tock(self, cntr: Counter) -> None: + super().tock(cntr) + + self._queue.flush() + + if self._next_fetch_stall is not None: + self._next_fetch_addr.stall = self._next_fetch_stall + self._next_fetch_stall = None + + cntr.utilizations[self.name].occupied += len(self._queue) + + # Implements interfaces.FetchUnit + def branch_resolved(self) -> None: + """Inform the FU that branch target is now avilable.""" + self.log("branch resolved") + + assert self._branch_prediction == "none" + + # The branch target might have already been placed in the the fetch + # queue, so we only clean Nones (fake instructions) from the queue. + while self._queue.buff and self._queue.buff[0] is None: + self._queue.buff.popleft() + while self._queue and self._queue[0] is None: + self._queue.popleft() + + if self.phase == interfaces.CyclePhase.TICK: + self._next_fetch_stall = False + else: + assert self.phase == interfaces.CyclePhase.TOCK + self._next_fetch_addr.stall = False + + # Implements interfaces.FetchUnit + def print_state_detailed(self, file) -> None: + if self._queue: + queue_str = ", ".join(str(i) if i else "X" + # pylint: disable-next=bad-reversed-sequence + for i in reversed(self._queue)) + else: + queue_str = "-" + print(f"[{self.name}] {queue_str}", file=file) + + # Implements interfaces.FetchUnit + def get_state_three_valued_header(self) -> Sequence[str]: + return [self.name] + + # Implements interfaces.FetchUnit + def get_state_three_valued(self,vals: Sequence[str]) -> Sequence[str]: + return [self._queue.pp_three_valued(vals)]
diff --git a/functional_trace.py b/functional_trace.py new file mode 100644 index 0000000..3c1da0f --- /dev/null +++ b/functional_trace.py
@@ -0,0 +1,120 @@ +"""Trace module.""" + +from typing import Any, IO, Optional + +# Generated by `flatc`. +import FBInstruction.Instructions as FBInstrs +from instruction import Instruction +import tbm_options +from utilities import FileFormat + +class FunctionalTrace: + """Class representing a functional simulator trace.""" + + def __init__(self, input_file: IO[Any], input_format: FileFormat, + instructions_range: str) -> None: + self.input_file = input_file + + self.input_format = input_format + if input_format == FileFormat.JSON: + self.read_instructions = self.read_json_instructions + else: + assert input_format == FileFormat.FLATBUFFERS + self.read_instructions = self.read_fb_instructions + + start, _, end = instructions_range.partition(":") + self.end = int(end) if end else None + + self.instr_count = 0 + self.instrs = [] + self.read_instructions() + self.skip(int(start)) + + @classmethod + def from_json(cls, input_file: IO[Any], instructions_range: str): + return cls(input_file, FileFormat.JSON, instructions_range) + + @classmethod + def from_fb(cls, input_file: IO[Any], instructions_range: str): + return cls(input_file, FileFormat.FLATBUFFERS, instructions_range) + + def next_addr(self) -> Optional[int]: + return self.instrs[-1].addr if self.instrs else None + + def dequeue(self) -> Optional[Instruction]: + if self.eof(): + return None + + return_val = self.instrs.pop() + if not self.instrs: + self.read_instructions() + self.instr_count += 1 + + return return_val + + def skip(self, n: int) -> None: + if self.eof() or n <= 0: + return + + if len(self.instrs) >= n: + self.instr_count += n + self.instrs = self.instrs[:-n] + if not self.instrs: + self.read_instructions() + return + + n -= len(self.instrs) + self.instr_count += len(self.instrs) + self.instrs.clear() + + if self.input_format == FileFormat.JSON: + for _ in range(n): + self.input_file.readline() + self.instr_count += n + self.read_instructions() + return + + assert self.input_format == FileFormat.FLATBUFFERS + + while len(self.instrs) < n: + n -= len(self.instrs) + self.instr_count += len(self.instrs) + self.instrs.clear() + # TODO(sflur): self.read_fb_instructions() constructs the + # instructions but we don't use most of them. + self.read_fb_instructions() + if not self.instrs: + return + + if n > 0: + self.instr_count += n + self.instrs = self.instrs[:-n] + + def eof(self) -> bool: + return (not self.instrs or + (self.end is not None and self.instr_count >= self.end)) + + def read_json_instructions(self) -> None: + instrs = [] + for _ in range(tbm_options.args.json_trace_buffer_size): + line = self.input_file.readline() + if line: + instrs.append(Instruction.from_json(line)) + else: + break + instrs.reverse() + self.instrs = instrs + + def read_fb_instructions(self) -> None: + length = self.input_file.read(4) + if not length: + return + + length = int.from_bytes(length, byteorder="little") + + buf = bytearray(self.input_file.read(length)) + instrs = FBInstrs.Instructions.GetRootAsInstructions(buf, 0) + self.instrs = [ + Instruction.from_fb(instrs.Instructions(i)) + for i in reversed(range(instrs.InstructionsLength())) + ]
diff --git a/sched_unit.py b/sched_unit.py new file mode 100644 index 0000000..11998db --- /dev/null +++ b/sched_unit.py
@@ -0,0 +1,191 @@ +from typing import Any, Dict, Iterable, Sequence + +from buffered_queue import BufferedQueue +import counter +from counter import Counter +from instruction import Instruction +import interfaces + + +# TODO(b/261690182): rename the SchedUnit +class SchedUnit(interfaces.SchedUnit): + """Issue unit model.""" + + def __init__(self, config: Dict[str, Any]): + super().__init__("SC") + + self._decode_rate = config.get("decode_rate") + self._branch_prediction = config["branch_prediction"] + + self._fetch_unit = None + self._exec_unit = None + + ## Current states + self._queues = {} + self._branch_stalling = False + + ## Next state + self._next_branch_stalling = None + + def add_queue(self, uid: str, desc) -> None: + self._queues[uid] = BufferedQueue(desc.get("size")) + + def connect(self, fetch_unit: interfaces.FetchUnit, + exec_unit: interfaces.ExecUnit) -> None: + self._fetch_unit = fetch_unit + self._exec_unit = exec_unit + + # Implements interfaces.SchedUnit + @property + def queues(self) -> Iterable[BufferedQueue[Instruction]]: + return self._queues.values() + + # Implements interfaces.SchedUnit + def pending(self) -> int: + return sum(len(q) for q in self._queues.values()) + + # Implements interfaces.SchedUnit + def reset(self, cntr: Counter) -> None: + super().reset(cntr) + # TODO(sflur): implement proper reset + cntr.stalls[self.name] = 0 + for uid, q in self._queues.items(): + cntr.utilizations[uid] = counter.Utilization(q.size) + + # Implements interfaces.SchedUnit + def tick(self, cntr: Counter) -> None: + super().tick(cntr) + + if self._branch_stalling: + self.log("queuing stalled: unresolved branch") + return + + for _ in range(self._decode_rate if self._decode_rate + else len(self._fetch_unit.queue)): + if not self._fetch_unit.queue: + # Fetch queue is empty + break + + fetched_instr = self._fetch_unit.queue.peek() + if not fetched_instr: + # A None instruction in the fetch queue stands for instruction + # that the functional simulator did not execute (or fetch), so + # we don't know what instruction that was, or how it behaved. + # In a real uarch this instruction will take some resources + # until the uarch figures out it should be evicted. + # TODO(sflur): count these instructions and apply some + # proportional penalty to the performance TBM reports? + self._fetch_unit.queue.dequeue() + continue + + # Check if we need to flush pending instructions. + if fetched_instr.is_flush and (self.pending() or + self._exec_unit.pending()): + # TODO(sflur): Currently flush instructions wait in the fetch + # queue, is that the right place to wait in? + cntr.stalls[self.name] += 1 + self.log(f"queueing stalled: flush in effect: {fetched_instr}") + break + + if fetched_instr.is_nop: + self.log(f"retired NOP instruction: {fetched_instr}") + self._fetch_unit.queue.dequeue() + cntr.retired_instruction_count += 1 + continue + + qid = self._exec_unit.get_issue_queue_id(fetched_instr) + + # Check if the queue is available. + if self._queues[qid].is_buffer_full(): + cntr.stalls[self.name] += 1 + self.log(f"queueing stalled: '{qid}' is full") + break + + # TODO(sflur): instead of check_conflicts, we could add the + # instructions to the scoreboard at this point. + if not self.check_conflicts(fetched_instr, qid): + # TODO(sflur): the blocking instruction is still in the fetch + # queue, maybe move it somewhere else? + cntr.stalls[self.name] += 1 + self.log("queueing stalled: conflict with queued instruction") + break + + # It is safe to queue the instruction. + self._queues[qid].buffer(fetched_instr) + self._fetch_unit.queue.dequeue() + cntr.utilizations[qid].count += 1 + self.log(f"instruction '{fetched_instr}' queued") + + if fetched_instr.is_branch: + cntr.branch_count += 1 + + if self._branch_prediction == "none": + self._branch_stalling = True + break + + # Implements interfaces.SchedUnit + def tock(self, cntr: Counter) -> None: + super().tock(cntr) + + for q in self._queues.values(): + q.flush() + + if self._next_branch_stalling is not None: + self._branch_stalling = self._next_branch_stalling + self._next_branch_stalling = None + + for uid, q in self._queues.items(): + cntr.utilizations[uid].occupied += len(q) + + def check_conflicts(self, new_instr: Instruction, qid: str) -> bool: + """Check if `instr` conflicts with other instructions. + + Check whether it is safe to reorder `instr` wrt the instructions + already in other queues. + There is no need to check conflicts with instructions that are already + in execution pipes, as that is handled by the scoreboard. + + Args: + new_instr: fetched instructions. + qid: the dispatch queue the instruction will be placed in. + Returns: + True if there are no conflicts, False otherwise. + """ + + for name, q in self._queues.items(): + if name == qid: + # skip the queue new_instr is going to, as it's an in-order + # queue. + continue + + for instr in q.chain(): + if new_instr.conflicts_with(instr): + return False + + return True + + # Implements interfaces.SchedUnit + def branch_resolved(self) -> None: + if self.phase == interfaces.CyclePhase.TICK: + self._next_branch_stalling = False + else: + assert self.phase == interfaces.CyclePhase.TOCK + self._branch_stalling = False + + # Implements interfaces.SchedUnit + def print_state_detailed(self, file) -> None: + for uid, dq in self._queues.items(): + if dq: + queue_str = ", ".join(str(i) for i in reversed(dq)) + else: + queue_str = "-" + print(f"[qu-{uid}] {queue_str}", file=file) + + + # Implements interfaces.SchedUnit + def get_state_three_valued_header(self) -> Sequence[str]: + return self._queues.keys() + + # Implements interfaces.SchedUnit + def get_state_three_valued(self,vals: Sequence[str]) -> Sequence[str]: + return [q.pp_three_valued(vals) for q in self._queues.values()]