|  | # Copyright 2025 Google LLC | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #     https://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | # BEGIN_TESTCASES_FOR_kelvin_xbar_cocotb | 
|  | KELVIN_XBAR_TESTCASES = [ | 
|  | "test_kelvin_core_to_sram", | 
|  | "test_ibex_d_to_invalid_addr", | 
|  | "test_kelvin_core_to_uart1", | 
|  | "test_ibex_d_to_kelvin_device", | 
|  | "test_kelvin_core_to_kelvin_device", | 
|  | "test_ibex_d_to_kelvin_device_specific_addr", | 
|  | "test_wide_to_narrow_integrity", | 
|  | ] | 
|  | # END_TESTCASES_FOR_kelvin_xbar_cocotb | 
|  |  | 
|  | import cocotb | 
|  | from cocotb.clock import Clock | 
|  | from cocotb.triggers import RisingEdge, ClockCycles, with_timeout | 
|  |  | 
|  | from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req | 
|  | from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg | 
|  |  | 
|  | # --- Configuration Constants --- | 
|  | # These constants are derived from CrossbarConfig.scala to make tests readable. | 
|  | HOST_MAP = {"kelvin_core": 0, "ibex_core_i": 1, "ibex_core_d": 2} | 
|  | DEVICE_MAP = { | 
|  | "kelvin_device": 0, | 
|  | "rom": 1, | 
|  | "sram": 2, | 
|  | "uart0": 3, | 
|  | "uart1": 4, | 
|  | "spi0": 5, | 
|  | } | 
|  | SRAM_BASE = 0x20000000 | 
|  | UART1_BASE = 0x40010000 | 
|  | KELVIN_DEVICE_BASE = 0x00000000 | 
|  | SPI0_BASE = 0x40020000 | 
|  | INVALID_ADDR = 0xDEADBEEF | 
|  | TIMEOUT_CYCLES = 500 | 
|  |  | 
|  |  | 
|  | # --- Test Setup --- | 
|  | async def setup_dut(dut): | 
|  | """Common setup logic for all tests.""" | 
|  | # Start the main clock | 
|  | clock = Clock(dut.io_clk_i, 5) | 
|  | cocotb.start_soon(clock.start()) | 
|  |  | 
|  | # Start the asynchronous SPI clock | 
|  | spi_clock = Clock(dut.io_async_ports_devices_0_clock, 20) | 
|  | cocotb.start_soon(spi_clock.start()) | 
|  |  | 
|  | # Start the asynchronous Ibex clock | 
|  | ibex_clock = Clock(dut.io_async_ports_hosts_0_clock, 10) | 
|  | cocotb.start_soon(ibex_clock.start()) | 
|  |  | 
|  | # Create a dictionary of TileLink interfaces for all hosts and devices | 
|  | host_widths = {"kelvin_core": 128, "ibex_core_i": 32, "ibex_core_d": 32} | 
|  | device_widths = { | 
|  | "kelvin_device": 128, | 
|  | "rom": 32, | 
|  | "sram": 32, | 
|  | "uart0": 32, | 
|  | "uart1": 32, | 
|  | "spi0": 32 | 
|  | } | 
|  |  | 
|  | interfaces = { | 
|  | "hosts": [ | 
|  | TileLinkULInterface(dut, | 
|  | host_if_name=f"io_hosts_{i}", | 
|  | clock_name="io_clk_i", | 
|  | reset_name="io_rst_ni", | 
|  | width=host_widths[name]) | 
|  | if name == "kelvin_core" else TileLinkULInterface( | 
|  | dut, | 
|  | host_if_name=f"io_hosts_{i}", | 
|  | clock_name="io_async_ports_hosts_0_clock", | 
|  | reset_name="io_async_ports_hosts_0_reset", | 
|  | width=host_widths[name]) for name, i in HOST_MAP.items() | 
|  | ], | 
|  | "devices": [ | 
|  | TileLinkULInterface(dut, | 
|  | device_if_name=f"io_devices_{i}", | 
|  | clock_name="io_clk_i", | 
|  | reset_name="io_rst_ni", | 
|  | width=device_widths[name]) | 
|  | for name, i in DEVICE_MAP.items() | 
|  | ], | 
|  | } | 
|  | # Special case for the async SPI port | 
|  | interfaces["devices"][DEVICE_MAP["spi0"]] = TileLinkULInterface( | 
|  | dut, | 
|  | device_if_name=f"io_devices_{DEVICE_MAP['spi0']}", | 
|  | clock_name="io_async_ports_devices_0_clock", | 
|  | reset_name="io_async_ports_devices_0_reset", | 
|  | width=32) | 
|  |  | 
|  | # Reset the DUT | 
|  | dut.io_rst_ni.value = 0 | 
|  | dut.io_async_ports_devices_0_reset.value = 0 | 
|  | dut.io_async_ports_hosts_0_reset.value = 0 | 
|  | await ClockCycles(dut.io_clk_i, 5) | 
|  | dut.io_rst_ni.value = 1 | 
|  | dut.io_async_ports_devices_0_reset.value = 1 | 
|  | dut.io_async_ports_hosts_0_reset.value = 1 | 
|  | await ClockCycles(dut.io_clk_i, 5) | 
|  |  | 
|  | return interfaces, clock | 
|  |  | 
|  |  | 
|  | # --- Test Cases --- | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_kelvin_core_to_sram(dut): | 
|  | """Verify a simple write/read transaction from kelvin_core to sram.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["sram"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 128-bit write request from the host | 
|  | test_data = 0x112233445566778899AABBCCDDEEFF00 | 
|  | write_txn = create_a_channel_req(address=SRAM_BASE, | 
|  | data=test_data, | 
|  | mask=0xFFFF, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect four 32-bit transactions on the device side, order not guaranteed | 
|  | received_reqs = [] | 
|  | for _ in range(4): | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, | 
|  | "ns") | 
|  | received_reqs.append(req) | 
|  | await with_timeout( | 
|  | device_if.device_respond(opcode=0, | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"]), timeout_ns, "ns") | 
|  |  | 
|  | # Sort received requests by address for comparison | 
|  | received_reqs.sort(key=lambda r: r["address"].integer) | 
|  |  | 
|  | # Verify all beats were received correctly | 
|  | for i in range(4): | 
|  | assert received_reqs[i]["address"] == SRAM_BASE + (i * 4) | 
|  | expected_data = (test_data >> (i * 32)) & 0xFFFFFFFF | 
|  | assert received_reqs[i]["data"] == expected_data | 
|  |  | 
|  | # Use the last beat (highest address) for the response source | 
|  | last_req = received_reqs[-1] | 
|  |  | 
|  | # Receive the response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  | assert resp["source"] == write_txn["source"] | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_ibex_d_to_invalid_addr(dut): | 
|  | """Verify that a request to an unmapped address gets an error response.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a write request to an invalid address | 
|  | write_txn = create_a_channel_req(address=INVALID_ADDR, | 
|  | data=0, | 
|  | mask=0xF, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect an error response | 
|  | try: | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, | 
|  | "ns") | 
|  | assert resp["error"] == 1 | 
|  | assert resp["source"] == write_txn["source"] | 
|  | except Exception as e: | 
|  | # Allow the simulation to run for a few more cycles to get a clean waveform | 
|  | await ClockCycles(dut.io_clk_i, 20) | 
|  | raise e | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_kelvin_core_to_uart1(dut): | 
|  | """Verify a 128-bit to 32-bit write transaction.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["uart1"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 128-bit write request | 
|  | test_data = 0x112233445566778899AABBCCDDEEFF00 | 
|  | write_txn = create_a_channel_req(address=UART1_BASE, | 
|  | data=test_data, | 
|  | mask=0xF0F0, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect four 32-bit transactions on the device side, order not guaranteed | 
|  | received_reqs = [] | 
|  | for i in range(2): | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, | 
|  | "ns") | 
|  | received_reqs.append(req) | 
|  | await with_timeout( | 
|  | device_if.device_respond(opcode=0, | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"], | 
|  | width=device_if.width), timeout_ns, "ns") | 
|  |  | 
|  | # Sort received requests by address for comparison | 
|  | received_reqs.sort(key=lambda r: r["address"].integer) | 
|  |  | 
|  | # Verify all beats were received correctly | 
|  | for idx, key in [(0, 1), (1, 3)]: | 
|  | assert received_reqs[idx]["address"] == UART1_BASE + (key * 4) | 
|  | expected_data = (test_data >> (key * 32)) & 0xFFFFFFFF | 
|  | assert received_reqs[idx]["data"] == expected_data | 
|  |  | 
|  | # Use the last beat (highest address) for the response source | 
|  | last_req = received_reqs[-1] | 
|  |  | 
|  | # Receive the response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  | assert resp["source"] == write_txn["source"] | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_ibex_d_to_kelvin_device(dut): | 
|  | """Verify a 32-bit to 128-bit write transaction.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 32-bit write request | 
|  | write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE, | 
|  | data=0x12345678, | 
|  | mask=0xF, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single 128-bit transaction on the device side | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns") | 
|  | assert req["address"] == KELVIN_DEVICE_BASE | 
|  | assert req["data"] == 0x12345678 | 
|  |  | 
|  | # Send a response from the device | 
|  | await with_timeout( | 
|  | device_if.device_respond(opcode=0, | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"], | 
|  | width=device_if.width), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_kelvin_core_to_kelvin_device(dut): | 
|  | """Verify a 128-bit to 128-bit write transaction (no bridge).""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 128-bit write request | 
|  | write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE, | 
|  | data=0x112233445566778899AABBCCDDEEFF00, | 
|  | mask=0xFFFF, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single 128-bit transaction on the device side | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns") | 
|  | assert req["address"] == KELVIN_DEVICE_BASE | 
|  | assert req["data"] == 0x112233445566778899AABBCCDDEEFF00 | 
|  |  | 
|  | # Send a response from the device | 
|  | await with_timeout( | 
|  | device_if.device_respond(opcode=0, | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"]), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_ibex_d_to_kelvin_device_csr_read(dut): | 
|  | """Verify that Ibex can correctly read a CSR from the Kelvin device. | 
|  |  | 
|  | This test specifically checks the return path through the width bridge. | 
|  | """ | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  | csr_addr = KELVIN_DEVICE_BASE + 0x8  # Match the CSR address | 
|  | halted_status = 0x1  # Bit 0 for halted | 
|  |  | 
|  | async def device_responder(): | 
|  | """A mock responder for the kelvin_device.""" | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, | 
|  | "ns") | 
|  | assert req["address"] == csr_addr | 
|  | # The CSR data is in the third 32-bit lane of the 128-bit bus. | 
|  | resp_data = halted_status << 64 | 
|  | await with_timeout( | 
|  | device_if.device_respond( | 
|  | opcode=1,  # AccessAckData | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"], | 
|  | data=resp_data, | 
|  | width=device_if.width, | 
|  | ), | 
|  | timeout_ns, | 
|  | "ns") | 
|  |  | 
|  | # Start the device responder coroutine | 
|  | cocotb.start_soon(device_responder()) | 
|  |  | 
|  | # Send a 32-bit read request from the host | 
|  | # TODO(atv): Do this thru helper? | 
|  | read_txn = { | 
|  | "opcode": 4,  # Get | 
|  | "param": 0, | 
|  | "size": 2,  # 4 bytes | 
|  | "source": 1, | 
|  | "address": csr_addr, | 
|  | "mask": 0xF, | 
|  | "data": 0, | 
|  | "user": { | 
|  | "cmd_intg": 0, | 
|  | "data_intg": 0, | 
|  | "instr_type": 0, | 
|  | "rsvd": 0 | 
|  | } | 
|  | } | 
|  | read_txn["user"]["cmd_intg"] = get_cmd_intg(read_txn, width=host_if.width) | 
|  | read_txn["user"]["data_intg"] = get_data_intg(read_txn["data"], | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(read_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single response on the host side with the correct data | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  | assert resp[ | 
|  | "data"] == halted_status, f"Expected CSR data {halted_status}, but got {resp['data']}" | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_ibex_d_to_kelvin_device_specific_addr(dut): | 
|  | """Verify a write to a specific address in the kelvin_device range.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 32-bit write request to 0x30000 | 
|  | test_addr = 0x30000 | 
|  | write_txn = create_a_channel_req(address=test_addr, | 
|  | data=0xDEADBEEF, | 
|  | mask=0xF, | 
|  | width=host_if.width) | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single 128-bit transaction on the device side | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns") | 
|  | assert req[ | 
|  | "address"] == test_addr, f"Expected address 0x{test_addr:X}, but got 0x{req['address'].integer:X}" | 
|  | assert req["data"] == 0xDEADBEEF | 
|  |  | 
|  | # Send a response from the device | 
|  | await with_timeout( | 
|  | device_if.device_respond(opcode=0, | 
|  | param=0, | 
|  | size=req["size"], | 
|  | source=req["source"], | 
|  | width=device_if.width), timeout_ns, "ns") | 
|  |  | 
|  | # Expect a single response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  | assert resp["error"] == 0 | 
|  |  | 
|  |  | 
|  | @cocotb.test(timeout_time=10, timeout_unit="us") | 
|  | async def test_wide_to_narrow_integrity(dut): | 
|  | """Verify integrity is checked and regenerated across the width bridge.""" | 
|  | interfaces, clock = await setup_dut(dut) | 
|  | host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]] | 
|  | device_if = interfaces["devices"][DEVICE_MAP["uart1"]] | 
|  | timeout_ns = TIMEOUT_CYCLES * clock.period | 
|  |  | 
|  | # Send a 128-bit write request from the host with correct integrity | 
|  | test_data = 0x112233445566778899AABBCCDDEEFF00 | 
|  | write_txn = create_a_channel_req(address=UART1_BASE, | 
|  | data=test_data, | 
|  | mask=0xFFFF, | 
|  | width=host_if.width) | 
|  |  | 
|  | await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns") | 
|  |  | 
|  | # Expect four 32-bit transactions on the device side | 
|  | received_reqs = [] | 
|  | for i in range(4): | 
|  | req = await with_timeout(device_if.device_get_request(), timeout_ns, | 
|  | "ns") | 
|  |  | 
|  | # Verify that the bridge regenerated integrity correctly for each beat | 
|  | assert req["user"]["cmd_intg"] == get_cmd_intg(req, | 
|  | width=device_if.width) | 
|  | assert req["user"]["data_intg"] == get_data_intg(req["data"], | 
|  | width=device_if.width) | 
|  |  | 
|  | received_reqs.append(req) | 
|  |  | 
|  | # Create a response with correct integrity | 
|  | resp_beat = { | 
|  | "opcode": 0, | 
|  | "param": 0, | 
|  | "size": req["size"], | 
|  | "source": req["source"], | 
|  | "sink": 0, | 
|  | "data": 0, | 
|  | "error": 0 | 
|  | } | 
|  | resp_beat["user"] = { | 
|  | "rsp_intg": get_rsp_intg(resp_beat, width=device_if.width), | 
|  | "data_intg": get_data_intg(0, width=device_if.width) | 
|  | } | 
|  | await device_if.device_d_fifo.put(resp_beat) | 
|  |  | 
|  | # Receive the final assembled response on the host side | 
|  | resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns") | 
|  |  | 
|  | # Verify that the bridge checked and regenerated integrity correctly | 
|  | expected_resp = resp.copy() | 
|  | expected_resp["error"] = 0 | 
|  | assert resp["user"]["rsp_intg"] == get_rsp_intg(expected_resp, | 
|  | width=host_if.width) | 
|  | assert resp["user"]["data_intg"] == get_data_intg(resp["data"], | 
|  | width=host_if.width) | 
|  | assert resp["error"] == 0 | 
|  |  | 
|  |  | 
|  |  |