|  | # Copyright 2025 Google LLC | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #     https://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | import cocotb | 
|  | import enum | 
|  | import random | 
|  |  | 
|  | from cocotb.clock import Clock | 
|  | from cocotb.triggers import ClockCycles, RisingEdge | 
|  |  | 
|  | class TLUL_OpcodeA(enum.IntEnum): | 
|  | PutFullData    = 0 | 
|  | PutPartialData = 1 | 
|  | Get            = 4 | 
|  |  | 
|  | class TLUL_OpcodeD(enum.IntEnum): | 
|  | AccessAck     = 0 | 
|  | AccessAckData = 1 | 
|  |  | 
|  | async def reset_dut(dut): | 
|  | """Applies reset to the DUT.""" | 
|  | dut.reset.value = 1 | 
|  | dut.io_axi_read_addr_valid.value = 0 | 
|  | dut.io_axi_write_addr_valid.value = 0 | 
|  | dut.io_axi_write_data_valid.value = 0 | 
|  | dut.io_axi_write_resp_ready.value = 0 | 
|  | dut.io_axi_read_data_ready.value = 0 | 
|  | await ClockCycles(dut.clock, 2) | 
|  | dut.reset.value = 0 | 
|  | await ClockCycles(dut.clock, 2) | 
|  |  | 
|  | async def axi_send_write(dut, address, source, size, data, strb, timeout_cycles=1000): | 
|  | """Sends an AXI write transaction.""" | 
|  | dut.io_axi_write_addr_valid.value = 1 | 
|  | dut.io_axi_write_addr_bits_addr.value = address | 
|  | dut.io_axi_write_addr_bits_id.value = source | 
|  | dut.io_axi_write_addr_bits_size.value = size | 
|  |  | 
|  | dut.io_axi_write_data_valid.value = 1 | 
|  | dut.io_axi_write_data_bits_data.value = data | 
|  | dut.io_axi_write_data_bits_strb.value = strb | 
|  |  | 
|  | for _ in range(timeout_cycles): | 
|  | await RisingEdge(dut.clock) | 
|  | if dut.io_axi_write_addr_ready.value == 1 and dut.io_axi_write_data_ready.value == 1: | 
|  | break | 
|  | else: | 
|  | raise RuntimeError(f"Timeout waiting for AXI write ready") | 
|  |  | 
|  | dut.io_axi_write_addr_valid.value = 0 | 
|  | dut.io_axi_write_data_valid.value = 0 | 
|  |  | 
|  | async def axi_send_read(dut, address, source, size, timeout_cycles=1000): | 
|  | """Sends an AXI read transaction.""" | 
|  | dut.io_axi_read_addr_valid.value = 1 | 
|  | dut.io_axi_read_addr_bits_addr.value = address | 
|  | dut.io_axi_read_addr_bits_id.value = source | 
|  | dut.io_axi_read_addr_bits_size.value = size | 
|  |  | 
|  | for _ in range(timeout_cycles): | 
|  | await RisingEdge(dut.clock) | 
|  | if dut.io_axi_read_addr_ready.value == 1: | 
|  | break | 
|  | else: | 
|  | raise RuntimeError(f"Timeout waiting for AXI read ready") | 
|  |  | 
|  | dut.io_axi_read_addr_valid.value = 0 | 
|  |  | 
|  | @cocotb.test() | 
|  | async def test_write_request(dut): | 
|  | """Tests a simple AXI write request.""" | 
|  | clock = Clock(dut.clock, 10, unit="us") | 
|  | cocotb.start_soon(clock.start()) | 
|  |  | 
|  | await reset_dut(dut) | 
|  |  | 
|  | dut.io_tl_a_ready.value = 0 | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | addr_width = 32 | 
|  | source_width = 6 | 
|  | data_width_bytes = 32 | 
|  | timeout_cycles = 1000 | 
|  |  | 
|  | size_power = random.randint(0, 5) | 
|  | test_size = size_power | 
|  | num_bytes = 2**size_power | 
|  |  | 
|  | test_addr = random.randint(0, (2**addr_width) - 1) | 
|  | test_source = random.randint(0, (2**source_width) - 1) | 
|  | test_data = random.randint(0, (2**(data_width_bytes*8)) - 1) | 
|  | test_strb = (1 << num_bytes) - 1 | 
|  |  | 
|  | await axi_send_write(dut, address=test_addr, source=test_source, size=test_size, data=test_data, strb=test_strb, timeout_cycles=timeout_cycles) | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | assert dut.io_tl_a_valid.value, "TL A_VALID should be high" | 
|  | assert dut.io_tl_a_bits_opcode.value == TLUL_OpcodeA.PutFullData, "TL A_OPCODE should be PutFullData" | 
|  | assert dut.io_tl_a_bits_address.value == test_addr, "TL A_ADDRESS is incorrect" | 
|  | assert dut.io_tl_a_bits_source.value == test_source, "TL A_SOURCE is incorrect" | 
|  | assert dut.io_tl_a_bits_size.value == test_size, "TL A_SIZE is incorrect" | 
|  | assert dut.io_tl_a_bits_data.value == test_data, "TL A_DATA is incorrect" | 
|  | assert dut.io_tl_a_bits_mask.value == test_strb, "TL A_MASK is incorrect" | 
|  |  | 
|  | dut.io_tl_a_ready.value = 1 | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_a_ready.value = 0 | 
|  |  | 
|  | dut.io_axi_write_resp_ready.value = 1 | 
|  | dut.io_tl_d_valid.value = 1 | 
|  | dut.io_tl_d_bits_opcode.value = TLUL_OpcodeD.AccessAck | 
|  | dut.io_tl_d_bits_source.value = test_source | 
|  |  | 
|  | for _ in range(timeout_cycles): | 
|  | await RisingEdge(dut.clock) | 
|  | if dut.io_tl_d_ready.value: | 
|  | assert dut.io_axi_write_resp_valid.value, "AXI BVALID should be high" | 
|  | assert dut.io_axi_write_resp_bits_id.value == test_source, "AXI BID is incorrect" | 
|  | assert dut.io_axi_write_resp_bits_resp.value == 0, "AXI BRESP is incorrect" | 
|  | dut.io_axi_write_resp_ready.value = 0 | 
|  | break | 
|  | else: | 
|  | raise RuntimeError("Timeout waiting for io_tl_d_ready") | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | await ClockCycles(dut.clock, 5) | 
|  |  | 
|  | @cocotb.test() | 
|  | async def test_read_request(dut): | 
|  | """Tests a simple AXI read request.""" | 
|  | clock = Clock(dut.clock, 10, unit="us") | 
|  | cocotb.start_soon(clock.start()) | 
|  |  | 
|  | await reset_dut(dut) | 
|  |  | 
|  | dut.io_tl_a_ready.value = 0 | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | addr_width = 32 | 
|  | source_width = 6 | 
|  | data_width_bytes = 32 | 
|  | timeout_cycles = 1000 | 
|  |  | 
|  | size_power = random.randint(0, 5) | 
|  | test_size = size_power | 
|  |  | 
|  | test_addr = random.randint(0, (2**addr_width) - 1) | 
|  | test_source = random.randint(0, (2**source_width) - 1) | 
|  | test_data = random.randint(0, (2**(data_width_bytes*8)) - 1) | 
|  |  | 
|  | await axi_send_read(dut, address=test_addr, source=test_source, size=test_size, timeout_cycles=timeout_cycles) | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | assert dut.io_tl_a_valid.value, "TL A_VALID should be high" | 
|  | assert dut.io_tl_a_bits_opcode.value == TLUL_OpcodeA.Get, "TL A_OPCODE should be Get" | 
|  | assert dut.io_tl_a_bits_address.value == test_addr, "TL A_ADDRESS is incorrect" | 
|  | assert dut.io_tl_a_bits_source.value == test_source, "TL A_SOURCE is incorrect" | 
|  | assert dut.io_tl_a_bits_size.value == test_size, "TL A_SIZE is incorrect" | 
|  |  | 
|  | dut.io_tl_a_ready.value = 1 | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_a_ready.value = 0 | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_d_valid.value = 1 | 
|  | dut.io_tl_d_bits_opcode.value = TLUL_OpcodeD.AccessAckData | 
|  | dut.io_tl_d_bits_source.value = test_source | 
|  | dut.io_tl_d_bits_data.value = test_data | 
|  | dut.io_tl_d_bits_error.value = 0 | 
|  |  | 
|  | dut.io_axi_read_data_ready.value = 1 | 
|  |  | 
|  | for _ in range(timeout_cycles): | 
|  | await RisingEdge(dut.clock) | 
|  | if dut.io_tl_d_ready.value: | 
|  | assert dut.io_axi_read_data_valid.value, "AXI RVALID should be high" | 
|  | assert dut.io_axi_read_data_bits_id.value == test_source, "AXI RID is incorrect" | 
|  | assert dut.io_axi_read_data_bits_data.value == test_data, "AXI RDATA is incorrect" | 
|  | assert dut.io_axi_read_data_bits_resp.value == 0, "AXI RRESP is incorrect" | 
|  | dut.io_axi_read_data_ready.value = 0 | 
|  | break | 
|  | else: | 
|  | raise RuntimeError("Timeout waiting for io_tl_d_ready") | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | await ClockCycles(dut.clock, 5) | 
|  |  | 
|  |  | 
|  | @cocotb.test() | 
|  | async def test_read_error(dut): | 
|  | """Tests a simple AXI read request that results in a TL error.""" | 
|  | clock = Clock(dut.clock, 10, unit="us") | 
|  | cocotb.start_soon(clock.start()) | 
|  |  | 
|  | await reset_dut(dut) | 
|  |  | 
|  | dut.io_tl_a_ready.value = 0 | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | addr_width = 32 | 
|  | source_width = 6 | 
|  | data_width_bytes = 32 | 
|  | timeout_cycles = 1000 | 
|  |  | 
|  | size_power = random.randint(0, 5) | 
|  | test_size = size_power | 
|  |  | 
|  | test_addr = random.randint(0, (2**addr_width) - 1) | 
|  | test_source = random.randint(0, (2**source_width) - 1) | 
|  | test_data = random.randint(0, (2**(data_width_bytes*8)) - 1) | 
|  |  | 
|  | await axi_send_read(dut, address=test_addr, source=test_source, size=test_size, timeout_cycles=timeout_cycles) | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | assert dut.io_tl_a_valid.value, "TL A_VALID should be high" | 
|  | assert dut.io_tl_a_bits_opcode.value == TLUL_OpcodeA.Get, "TL A_OPCODE should be Get" | 
|  | assert dut.io_tl_a_bits_address.value == test_addr, "TL A_ADDRESS is incorrect" | 
|  | assert dut.io_tl_a_bits_source.value == test_source, "TL A_SOURCE is incorrect" | 
|  | assert dut.io_tl_a_bits_size.value == test_size, "TL A_SIZE is incorrect" | 
|  |  | 
|  | dut.io_tl_a_ready.value = 1 | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_a_ready.value = 0 | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_d_valid.value = 1 | 
|  | dut.io_tl_d_bits_opcode.value = TLUL_OpcodeD.AccessAckData | 
|  | dut.io_tl_d_bits_source.value = test_source | 
|  | dut.io_tl_d_bits_data.value = test_data | 
|  | dut.io_tl_d_bits_error.value = 1 | 
|  |  | 
|  | dut.io_axi_read_data_ready.value = 1 | 
|  |  | 
|  | for _ in range(timeout_cycles): | 
|  | await RisingEdge(dut.clock) | 
|  | if dut.io_tl_d_ready.value: | 
|  | assert dut.io_axi_read_data_valid.value, "AXI RVALID should be high" | 
|  | assert dut.io_axi_read_data_bits_id.value == test_source, "AXI RID is incorrect" | 
|  | assert dut.io_axi_read_data_bits_data.value == test_data, "AXI RDATA is incorrect" | 
|  | assert dut.io_axi_read_data_bits_resp.value == 2, "AXI RRESP is incorrect" | 
|  | dut.io_axi_read_data_ready.value = 0 | 
|  | break | 
|  | else: | 
|  | raise RuntimeError("Timeout waiting for io_tl_d_ready") | 
|  |  | 
|  | await RisingEdge(dut.clock) | 
|  | dut.io_tl_d_valid.value = 0 | 
|  |  | 
|  | await ClockCycles(dut.clock, 5) |