blob: 0b4059b98bab7facd70aa5afc2a3353225e0319f [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cocotb
import random
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, ClockCycles, FallingEdge
from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface
from kelvin_test_utils.spi_master import SPIMaster
async def setup_dut(dut):
# Main clock started by the test
dut.io_spi_csb.value = 1 # Start with chip select inactive
dut.reset.value = 1
await ClockCycles(dut.clock, 2)
dut.reset.value = 0
await RisingEdge(dut.clock)
@cocotb.test()
async def test_register_read_write(dut):
# Start the main clock
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
# Write Transaction
write_data = random.randint(0, 255)
await spi_master.write_reg(0x04, write_data)
# Read Transaction
read_data = await spi_master.read_reg(0x04)
assert read_data == write_data, f"Read data 0x{read_data:x} does not match written data 0x{write_data:x}"
await ClockCycles(dut.clock, 20)
@cocotb.test()
async def test_tlul_read(dut):
"""Tests back-to-back TileLink UL read transactions initiated via SPI."""
# Start the main clock
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
await tl_device.init()
# --- Device Responder Task ---
async def device_responder():
for i in range(3):
req = await tl_device.device_get_request()
assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}"
# Formulate a unique response for each transaction
response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
await tl_device.device_respond(
opcode=1, # AccessAckData
param=0,
size=req['size'],
source=req['source'],
data=response_data,
error=0,
width=128
)
responder_task = cocotb.start_soon(device_responder())
# --- Main Test Logic ---
for i in range(3):
# 1. Configure the TileLink read via SPI
target_addr = 0x40001000 + (i * 16) # Use a new address for each transaction
# Write address (32 bits) byte by byte
for j in range(4):
addr_byte = (target_addr >> (j * 8)) & 0xFF
await spi_master.write_reg(0x00 + j, addr_byte)
# Write length (0 means 1 beat)
await spi_master.write_reg(0x04, 0x00)
# 2. Issue the read command
await spi_master.write_reg(0x05, 0x01, wait_cycles=0)
# --- Verification ---
# 1. Poll the status register until the transaction is done
assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done"
# 2. Read the data from the buffer port
read_data = await spi_master.bulk_read_data(0x07, 16)
# 3. Compare with expected data
expected_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
assert read_data == expected_data
# 4. Clear the status to return FSM to Idle
await spi_master.write_reg(0x05, 0x00)
await responder_task
@cocotb.test()
async def test_tlul_multi_beat_read(dut):
"""Tests a multi-beat TileLink UL read transaction initiated via SPI."""
# Start the main clock
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
await tl_device.init()
num_beats = 4
# --- Device Responder Task ---
async def device_responder():
for i in range(num_beats):
req = await tl_device.device_get_request()
assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}"
# Formulate a unique response for each transaction
response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
await tl_device.device_respond(
opcode=1, # AccessAckData
param=0,
size=req['size'],
source=req['source'],
data=response_data,
error=0,
width=128
)
responder_task = cocotb.start_soon(device_responder())
# --- Main Test Logic ---
# 1. Configure the TileLink read via SPI
target_addr = 0x40001000
# Write address (32 bits) byte by byte
for j in range(4):
addr_byte = (target_addr >> (j * 8)) & 0xFF
await spi_master.write_reg(0x00 + j, addr_byte)
# Write length (N-1 for N beats)
await spi_master.write_reg(0x04, num_beats - 1)
# 2. Issue the read command
await spi_master.write_reg(0x05, 0x01, wait_cycles=0)
# Add a delay to allow the status to propagate across the CDC
await ClockCycles(dut.clock, 20)
# --- Verification ---
# 1. Poll the status register until the transaction is done
assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done"
# 2. Read the data from the buffer port
bytes_to_read = num_beats * 16
read_data = await spi_master.bulk_read_data(0x07, bytes_to_read)
# 3. Compare with expected data
expected_data = 0
for i in range(num_beats):
word = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
expected_data |= (word << (i * 128))
assert read_data == expected_data
# 4. Clear the status to return FSM to Idle
await spi_master.write_reg(0x05, 0x00)
await responder_task
@cocotb.test()
async def test_tlul_write(dut):
"""Tests back-to-back TileLink UL write transactions initiated via SPI."""
# Start the main clock
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
await tl_device.init()
# --- Device Responder Task ---
# This task will receive the write requests and send acknowledgments.
received_data_list = []
async def device_responder():
for _ in range(3):
req = await tl_device.device_get_request()
# For a 'Put' request, we expect opcode 0 (PutFull) or 1 (PutPartial)
assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}"
# Capture the data for verification
received_data_list.append(int(req['data']))
# A 'Put' operation is acknowledged with a single 'AccessAck'
await tl_device.device_respond(
opcode=0, # AccessAck
param=0,
size=req['size'],
source=req['source'],
error=0,
width=128
)
responder_task = cocotb.start_soon(device_responder())
# --- Main Test Logic ---
expected_data_list = []
for i in range(3):
# 1. Write data to the DUT's internal buffer
write_data = 0x11223344_55667788_99AABBCC_DDEEFF00 + i
expected_data_list.append(write_data)
await spi_master.bulk_write_data(0x07, write_data, 16)
# 2. Configure the TileLink write via SPI
target_addr = 0x40002000 + (i * 16)
# Write address (32 bits) byte by byte
for j in range(4):
addr_byte = (target_addr >> (j * 8)) & 0xFF
await spi_master.write_reg(0x00 + j, addr_byte)
# Write length (0 means 1 beat)
await spi_master.write_reg(0x04, 0x00)
# 3. Issue the write command
await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command
# --- Verification ---
# 1. Poll the status register until the transaction is done
assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done"
# 4. Clear the status to return FSM to Idle
await spi_master.write_reg(0x05, 0x00)
# Wait for the responder to finish handling all requests
await responder_task
# Verify all data received by the responder
assert len(received_data_list) == 3, f"Responder received {len(received_data_list)} transactions, expected 3"
assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}"
@cocotb.test()
async def test_tlul_multi_beat_write(dut):
"""Tests a multi-beat TileLink UL write transaction initiated via SPI."""
# Start the main clock
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
await tl_device.init()
num_beats = 4
# --- Device Responder Task ---
received_data_list = []
async def device_responder():
# For a multi-beat write, we expect num_beats requests, with an ack after each.
for i in range(num_beats):
req = await tl_device.device_get_request()
assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}"
received_data_list.append(int(req['data']))
# Send an AccessAck after each beat
await tl_device.device_respond(
opcode=0, # AccessAck
param=0,
size=req['size'],
source=req['source'],
error=0,
width=128
)
responder_task = cocotb.start_soon(device_responder())
# --- Main Test Logic ---
# 1. Prepare and write data to the DUT's internal buffer
expected_data_list = []
full_write_data = 0
for i in range(num_beats):
word = 0x11223344_55667788_99AABBCC_DDEEFF00 + i
expected_data_list.append(word)
full_write_data |= (word << (i * 128))
bytes_to_write = num_beats * 16
await spi_master.bulk_write_data(0x07, full_write_data, bytes_to_write)
# 2. Configure the TileLink write via SPI
target_addr = 0x40002000
# Write address (32 bits) byte by byte
for j in range(4):
addr_byte = (target_addr >> (j * 8)) & 0xFF
await spi_master.write_reg(0x00 + j, addr_byte)
# Write length (N-1 for N beats)
await spi_master.write_reg(0x04, num_beats - 1)
# 3. Issue the write command
await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command
# --- Verification ---
# 1. Poll the status register until the transaction is done
assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done"
# 2. Wait for the responder to finish
await responder_task
# 3. Verify the data received by the responder
assert len(received_data_list) == num_beats, f"Responder received {len(received_data_list)} beats, expected {num_beats}"
assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}"
# 4. Clear the status to return FSM to Idle
await spi_master.write_reg(0x05, 0x00)
@cocotb.test()
async def test_packed_write_transaction(dut):
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
await setup_dut(dut)
spi_master = SPIMaster(
clk=dut.io_spi_clk,
csb=dut.io_spi_csb,
mosi=dut.io_spi_mosi,
miso=dut.io_spi_miso,
main_clk=dut.clock,
log=dut._log
)
tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
await tl_device.init()
num_beats = 16
async def device_responder():
for i in range(num_beats):
req = await tl_device.device_get_request()
assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}"
assert req['data'] == 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
# Send an AccessAck after each beat
await tl_device.device_respond(
opcode=0, # AccessAck
param=0,
size=req['size'],
source=req['source'],
error=0,
width=128
)
responder_task = cocotb.start_soon(device_responder())
def data_generator(beat):
return 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + beat
await spi_master.packed_write_transaction(
target_addr=0x40001000,
num_beats=num_beats,
data_generator=data_generator
)
assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done"
await spi_master.write_reg(0x05, 0x00)
await responder_task