blob: 0e62adbbaf6b373823f0482fe16b163401a1bcef [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# BEGIN_TESTCASES_FOR_kelvin_xbar_cocotb
KELVIN_XBAR_TESTCASES = [
"test_kelvin_core_to_sram",
"test_ibex_d_to_invalid_addr",
"test_kelvin_core_to_uart1",
"test_ibex_d_to_kelvin_device",
"test_kelvin_core_to_kelvin_device",
"test_ibex_d_to_kelvin_device_specific_addr",
"test_wide_to_narrow_integrity",
]
# END_TESTCASES_FOR_kelvin_xbar_cocotb
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, ClockCycles, with_timeout
from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req
from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg
# --- Configuration Constants ---
# These constants are derived from CrossbarConfig.scala to make tests readable.
HOST_MAP = {"kelvin_core": 0, "ibex_core_i": 1, "ibex_core_d": 2}
DEVICE_MAP = {
"kelvin_device": 0,
"rom": 1,
"sram": 2,
"uart0": 3,
"uart1": 4,
"spi0": 5,
}
SRAM_BASE = 0x20000000
UART1_BASE = 0x40010000
KELVIN_DEVICE_BASE = 0x00000000
SPI0_BASE = 0x40020000
INVALID_ADDR = 0xDEADBEEF
TIMEOUT_CYCLES = 500
# --- Test Setup ---
async def setup_dut(dut):
"""Common setup logic for all tests."""
# Start the main clock
clock = Clock(dut.io_clk_i, 5)
cocotb.start_soon(clock.start())
# Start the asynchronous SPI clock
spi_clock = Clock(dut.io_async_ports_devices_0_clock, 20)
cocotb.start_soon(spi_clock.start())
# Start the asynchronous Ibex clock
ibex_clock = Clock(dut.io_async_ports_hosts_0_clock, 10)
cocotb.start_soon(ibex_clock.start())
# Create a dictionary of TileLink interfaces for all hosts and devices
host_widths = {"kelvin_core": 128, "ibex_core_i": 32, "ibex_core_d": 32}
device_widths = {
"kelvin_device": 128,
"rom": 32,
"sram": 32,
"uart0": 32,
"uart1": 32,
"spi0": 32
}
interfaces = {
"hosts": [
TileLinkULInterface(dut,
host_if_name=f"io_hosts_{i}",
clock_name="io_clk_i",
reset_name="io_rst_ni",
width=host_widths[name])
if name == "kelvin_core" else TileLinkULInterface(
dut,
host_if_name=f"io_hosts_{i}",
clock_name="io_async_ports_hosts_0_clock",
reset_name="io_async_ports_hosts_0_reset",
width=host_widths[name]) for name, i in HOST_MAP.items()
],
"devices": [
TileLinkULInterface(dut,
device_if_name=f"io_devices_{i}",
clock_name="io_clk_i",
reset_name="io_rst_ni",
width=device_widths[name])
for name, i in DEVICE_MAP.items()
],
}
# Special case for the async SPI port
interfaces["devices"][DEVICE_MAP["spi0"]] = TileLinkULInterface(
dut,
device_if_name=f"io_devices_{DEVICE_MAP['spi0']}",
clock_name="io_async_ports_devices_0_clock",
reset_name="io_async_ports_devices_0_reset",
width=32)
# Reset the DUT
dut.io_rst_ni.value = 0
dut.io_async_ports_devices_0_reset.value = 0
dut.io_async_ports_hosts_0_reset.value = 0
await ClockCycles(dut.io_clk_i, 5)
dut.io_rst_ni.value = 1
dut.io_async_ports_devices_0_reset.value = 1
dut.io_async_ports_hosts_0_reset.value = 1
await ClockCycles(dut.io_clk_i, 5)
return interfaces, clock
# --- Test Cases ---
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_kelvin_core_to_sram(dut):
"""Verify a simple write/read transaction from kelvin_core to sram."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
device_if = interfaces["devices"][DEVICE_MAP["sram"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 128-bit write request from the host
test_data = 0x112233445566778899AABBCCDDEEFF00
write_txn = create_a_channel_req(address=SRAM_BASE,
data=test_data,
mask=0xFFFF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect four 32-bit transactions on the device side, order not guaranteed
received_reqs = []
for _ in range(4):
req = await with_timeout(device_if.device_get_request(), timeout_ns,
"ns")
received_reqs.append(req)
await with_timeout(
device_if.device_respond(opcode=0,
param=0,
size=req["size"],
source=req["source"]), timeout_ns, "ns")
# Sort received requests by address for comparison
received_reqs.sort(key=lambda r: r["address"].integer)
# Verify all beats were received correctly
for i in range(4):
assert received_reqs[i]["address"] == SRAM_BASE + (i * 4)
expected_data = (test_data >> (i * 32)) & 0xFFFFFFFF
assert received_reqs[i]["data"] == expected_data
# Use the last beat (highest address) for the response source
last_req = received_reqs[-1]
# Receive the response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
assert resp["source"] == write_txn["source"]
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_ibex_d_to_invalid_addr(dut):
"""Verify that a request to an unmapped address gets an error response."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a write request to an invalid address
write_txn = create_a_channel_req(address=INVALID_ADDR,
data=0,
mask=0xF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect an error response
try:
resp = await with_timeout(host_if.host_get_response(), timeout_ns,
"ns")
assert resp["error"] == 1
assert resp["source"] == write_txn["source"]
except Exception as e:
# Allow the simulation to run for a few more cycles to get a clean waveform
await ClockCycles(dut.io_clk_i, 20)
raise e
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_kelvin_core_to_uart1(dut):
"""Verify a 128-bit to 32-bit write transaction."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
device_if = interfaces["devices"][DEVICE_MAP["uart1"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 128-bit write request
test_data = 0x112233445566778899AABBCCDDEEFF00
write_txn = create_a_channel_req(address=UART1_BASE,
data=test_data,
mask=0xF0F0,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect four 32-bit transactions on the device side, order not guaranteed
received_reqs = []
for i in range(2):
req = await with_timeout(device_if.device_get_request(), timeout_ns,
"ns")
received_reqs.append(req)
await with_timeout(
device_if.device_respond(opcode=0,
param=0,
size=req["size"],
source=req["source"],
width=device_if.width), timeout_ns, "ns")
# Sort received requests by address for comparison
received_reqs.sort(key=lambda r: r["address"].integer)
# Verify all beats were received correctly
for idx, key in [(0, 1), (1, 3)]:
assert received_reqs[idx]["address"] == UART1_BASE + (key * 4)
expected_data = (test_data >> (key * 32)) & 0xFFFFFFFF
assert received_reqs[idx]["data"] == expected_data
# Use the last beat (highest address) for the response source
last_req = received_reqs[-1]
# Receive the response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
assert resp["source"] == write_txn["source"]
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_ibex_d_to_kelvin_device(dut):
"""Verify a 32-bit to 128-bit write transaction."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 32-bit write request
write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE,
data=0x12345678,
mask=0xF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect a single 128-bit transaction on the device side
req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
assert req["address"] == KELVIN_DEVICE_BASE
assert req["data"] == 0x12345678
# Send a response from the device
await with_timeout(
device_if.device_respond(opcode=0,
param=0,
size=req["size"],
source=req["source"],
width=device_if.width), timeout_ns, "ns")
# Expect a single response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_kelvin_core_to_kelvin_device(dut):
"""Verify a 128-bit to 128-bit write transaction (no bridge)."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 128-bit write request
write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE,
data=0x112233445566778899AABBCCDDEEFF00,
mask=0xFFFF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect a single 128-bit transaction on the device side
req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
assert req["address"] == KELVIN_DEVICE_BASE
assert req["data"] == 0x112233445566778899AABBCCDDEEFF00
# Send a response from the device
await with_timeout(
device_if.device_respond(opcode=0,
param=0,
size=req["size"],
source=req["source"]), timeout_ns, "ns")
# Expect a single response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_ibex_d_to_kelvin_device_csr_read(dut):
"""Verify that Ibex can correctly read a CSR from the Kelvin device.
This test specifically checks the return path through the width bridge.
"""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
csr_addr = KELVIN_DEVICE_BASE + 0x8 # Match the CSR address
halted_status = 0x1 # Bit 0 for halted
async def device_responder():
"""A mock responder for the kelvin_device."""
req = await with_timeout(device_if.device_get_request(), timeout_ns,
"ns")
assert req["address"] == csr_addr
# The CSR data is in the third 32-bit lane of the 128-bit bus.
resp_data = halted_status << 64
await with_timeout(
device_if.device_respond(
opcode=1, # AccessAckData
param=0,
size=req["size"],
source=req["source"],
data=resp_data,
width=device_if.width,
),
timeout_ns,
"ns")
# Start the device responder coroutine
cocotb.start_soon(device_responder())
# Send a 32-bit read request from the host
# TODO(atv): Do this thru helper?
read_txn = {
"opcode": 4, # Get
"param": 0,
"size": 2, # 4 bytes
"source": 1,
"address": csr_addr,
"mask": 0xF,
"data": 0,
"user": {
"cmd_intg": 0,
"data_intg": 0,
"instr_type": 0,
"rsvd": 0
}
}
read_txn["user"]["cmd_intg"] = get_cmd_intg(read_txn, width=host_if.width)
read_txn["user"]["data_intg"] = get_data_intg(read_txn["data"],
width=host_if.width)
await with_timeout(host_if.host_put(read_txn), timeout_ns, "ns")
# Expect a single response on the host side with the correct data
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
assert resp[
"data"] == halted_status, f"Expected CSR data {halted_status}, but got {resp['data']}"
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_ibex_d_to_kelvin_device_specific_addr(dut):
"""Verify a write to a specific address in the kelvin_device range."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 32-bit write request to 0x30000
test_addr = 0x30000
write_txn = create_a_channel_req(address=test_addr,
data=0xDEADBEEF,
mask=0xF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect a single 128-bit transaction on the device side
req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
assert req[
"address"] == test_addr, f"Expected address 0x{test_addr:X}, but got 0x{req['address'].integer:X}"
assert req["data"] == 0xDEADBEEF
# Send a response from the device
await with_timeout(
device_if.device_respond(opcode=0,
param=0,
size=req["size"],
source=req["source"],
width=device_if.width), timeout_ns, "ns")
# Expect a single response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
assert resp["error"] == 0
@cocotb.test(timeout_time=10, timeout_unit="us")
async def test_wide_to_narrow_integrity(dut):
"""Verify integrity is checked and regenerated across the width bridge."""
interfaces, clock = await setup_dut(dut)
host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
device_if = interfaces["devices"][DEVICE_MAP["uart1"]]
timeout_ns = TIMEOUT_CYCLES * clock.period
# Send a 128-bit write request from the host with correct integrity
test_data = 0x112233445566778899AABBCCDDEEFF00
write_txn = create_a_channel_req(address=UART1_BASE,
data=test_data,
mask=0xFFFF,
width=host_if.width)
await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
# Expect four 32-bit transactions on the device side
received_reqs = []
for i in range(4):
req = await with_timeout(device_if.device_get_request(), timeout_ns,
"ns")
# Verify that the bridge regenerated integrity correctly for each beat
assert req["user"]["cmd_intg"] == get_cmd_intg(req,
width=device_if.width)
assert req["user"]["data_intg"] == get_data_intg(req["data"],
width=device_if.width)
received_reqs.append(req)
# Create a response with correct integrity
resp_beat = {
"opcode": 0,
"param": 0,
"size": req["size"],
"source": req["source"],
"sink": 0,
"data": 0,
"error": 0
}
resp_beat["user"] = {
"rsp_intg": get_rsp_intg(resp_beat, width=device_if.width),
"data_intg": get_data_intg(0, width=device_if.width)
}
await device_if.device_d_fifo.put(resp_beat)
# Receive the final assembled response on the host side
resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
# Verify that the bridge checked and regenerated integrity correctly
expected_resp = resp.copy()
expected_resp["error"] = 0
assert resp["user"]["rsp_intg"] == get_rsp_intg(expected_resp,
width=host_if.width)
assert resp["user"]["data_intg"] == get_data_intg(resp["data"],
width=host_if.width)
assert resp["error"] == 0