blob: 57c1cb5b34a54c901868d04463be0acf852b80f0 [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cocotb
from cocotb.clock import Clock
from cocotb.queue import Queue
from cocotb.triggers import FallingEdge, RisingEdge, with_timeout
import cocotb.result
import math
from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg
def create_a_channel_req(address,
data,
mask,
source=1,
size=None,
param=0,
width=32):
"""Creates a standard TileLink-UL PutFullData request dictionary."""
num_bytes = width // 8
if size is None:
size = int(math.log2(num_bytes))
full_mask = (1 << num_bytes) - 1
opcode = 0 if mask == full_mask else 1 # PutFull vs PutPartial
txn = {
"opcode": opcode,
"param": param,
"size": size,
"source": source,
"address": address,
"mask": mask,
"data": data,
"user": {
"cmd_intg": 0,
"data_intg": 0,
"instr_type": 0,
"rsvd": 0
}
}
txn["user"]["cmd_intg"] = get_cmd_intg(txn, width=width)
txn["user"]["data_intg"] = get_data_intg(txn["data"], width=width)
return txn
class TileLinkULInterface:
"""A testbench interface for a TileLink-UL bus.
This class provides a high-level, transaction-based interface to a TileLink-UL
bus in the DUT. It uses cocotb queues and background coroutines ("agents")
to handle the low-level signal handshaking.
Args:
dut: The cocotb DUT object.
host_if_name (str, optional): The prefix for the host-side interface signals.
device_if_name (str, optional): The prefix for the device-side interface signals.
"""
def __init__(self,
dut,
host_if_name=None,
device_if_name=None,
clock_name="clock",
reset_name="reset",
width=32):
self.dut = dut
self.clock = getattr(dut, clock_name)
self.reset = getattr(dut, reset_name)
self.width = width
self.name = host_if_name or device_if_name
if host_if_name is None and device_if_name is None:
raise ValueError(
"At least one of host_if_name or device_if_name must be provided."
)
self._agents = []
if host_if_name:
self.host_a_fifo = Queue()
self.host_d_fifo = Queue()
self._agents.append(
cocotb.start_soon(self._host_a_driver(host_if_name)))
self._agents.append(
cocotb.start_soon(self._host_d_monitor(host_if_name)))
if device_if_name:
self.device_a_fifo = Queue()
self.device_d_fifo = Queue()
self._device_a_ready = True # Default to being ready
self._agents.append(
cocotb.start_soon(self._device_a_monitor(device_if_name)))
self._agents.append(
cocotb.start_soon(self._device_d_driver(device_if_name)))
def device_a_set_ready(self, value):
"""Set the ready signal for the device A channel monitor."""
self._device_a_ready = value
async def init(self):
"""Starts the agents."""
# This method is currently a placeholder for starting agents.
# In this implementation, agents are started in the constructor.
# This can be extended if more complex initialization is needed.
pass
# --- Private Methods (Agents) ---
# slave_a{r|w}agent
async def _host_a_driver(self, prefix, timeout=4096):
"""Drives the host A channel from the host_a_fifo."""
a_valid = getattr(self.dut, f"{prefix}_a_valid")
a_ready = getattr(self.dut, f"{prefix}_a_ready")
a_valid.value = 0
for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
getattr(self.dut, f"{prefix}_a_bits_{prop}").value = 0
while True:
while True:
await RisingEdge(self.clock)
a_valid.value = 0
if self.host_a_fifo.qsize():
break
txn = await self.host_a_fifo.get()
a_valid.value = 1
for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
getattr(self.dut, f"{prefix}_a_bits_{prop}").value = txn[prop]
for field, value in txn["user"].items():
getattr(self.dut,
f"{prefix}_a_bits_user_{field}").value = value
await FallingEdge(self.clock)
timeout_count = 0
while a_ready.value == 0:
await FallingEdge(self.clock)
timeout_count += 1
if timeout_count >= timeout:
assert False, "timeout waiting for a_ready"
# slave_bagent
async def _host_d_monitor(self, prefix):
"""Monitors the host D channel and puts transactions into host_d_fifo."""
d_valid = getattr(self.dut, f"{prefix}_d_valid")
d_ready = getattr(self.dut, f"{prefix}_d_ready")
d_ready.value = 1
while True:
await RisingEdge(self.clock)
try:
if d_valid.value:
# Capture the transaction
txn = {'user': {}}
for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
txn[prop] = getattr(self.dut, f"{prefix}_d_bits_{prop}").value
user_fields = ["rsp_intg", "data_intg"]
for field in user_fields:
signal_name = f"{prefix}_d_bits_user_{field}"
if hasattr(self.dut, signal_name):
txn["user"][field] = getattr(self.dut, signal_name).value
await self.host_d_fifo.put(txn)
except Exception as e:
print('X seen in _host_d_monitor: ' + str(e) + ' ' + prefix)
# raise e
# master_aragent
async def _device_a_monitor(self, prefix):
"""Monitors the device A channel and puts transactions into device_a_fifo."""
a_valid = getattr(self.dut, f"{prefix}_a_valid")
a_ready = getattr(self.dut, f"{prefix}_a_ready")
a_ready.value = 1
while True:
await RisingEdge(self.clock)
try:
if a_valid.value:
txn = {"user": {}}
for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
txn[prop] = getattr(self.dut, f"{prefix}_a_bits_{prop}").value
user_fields = ["cmd_intg", "data_intg", "instr_type", "rsvd"]
for field in user_fields:
signal_name = f"{prefix}_a_bits_user_{field}"
if hasattr(self.dut, signal_name):
txn["user"][field] = getattr(self.dut,
signal_name).value
await self.device_a_fifo.put(txn)
except Exception as e:
print('X seen in _device_a_monitor: ' + str(e) + ' ' + prefix)
# master_bagent
async def _device_d_driver(self, prefix, timeout=4096):
"""Drives the device D channel from the device_d_fifo."""
d_valid = getattr(self.dut, f"{prefix}_d_valid")
d_ready = getattr(self.dut, f"{prefix}_d_ready")
d_valid.value = 0
for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
getattr(self.dut, f"{prefix}_d_bits_{prop}").value = 0
while True:
while True:
await RisingEdge(self.clock)
d_valid.value = 0
if self.device_d_fifo.qsize():
break
txn = await self.device_d_fifo.get()
d_valid.value = 1
for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
getattr(self.dut, f"{prefix}_d_bits_{prop}").value = txn[prop]
for field, value in txn["user"].items():
getattr(self.dut,
f"{prefix}_d_bits_user_{field}").value = value
await FallingEdge(self.clock)
timeout_count = 0
while d_ready.value == 0:
await FallingEdge(self.clock)
timeout_count += 1
if timeout_count >= timeout:
assert False, "timeout waiting for d_ready"
# --- Public API Methods ---
async def host_put(self, txn):
"""Send a PutFullData or PutPartialData request from the host."""
await self.host_a_fifo.put(txn)
async def host_get_response(self):
"""Get a response from the host D channel."""
return await self.host_d_fifo.get()
async def device_get_request(self):
"""Get a request from the device A channel."""
return await self.device_a_fifo.get()
async def device_respond(self,
opcode,
param,
size,
source,
sink=0,
data=0,
error=0,
width=32):
"""Send a response from the device."""
txn = {
"opcode": opcode,
"param": param,
"size": size,
"source": source,
"sink": sink,
"data": data,
"error": error,
"user": {
"rsp_intg": 0,
"data_intg": 0
}
}
txn["user"]["rsp_intg"] = get_rsp_intg(txn, width)
txn["user"]["data_intg"] = get_data_intg(txn["data"], width)
await self.device_d_fifo.put(txn)