blob: 6061ef265fbc29b12b166e1595618918faf76db1 [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cocotb
from cocotb.clock import Clock
from cocotb.triggers import RisingEdge, ClockCycles, with_timeout
import math
import random
from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req
async def setup_dut(dut):
"""Common setup for all tests."""
clock = Clock(dut.clock, 10)
cocotb.start_soon(clock.start())
dut.reset.value = 1
await ClockCycles(dut.clock, 2)
dut.reset.value = 0
await RisingEdge(dut.clock)
@cocotb.test()
async def test_arbitration(dut):
"""Verify requests are arbitrated and responses are routed correctly."""
await setup_dut(dut)
M = 0
while hasattr(dut, f"io_tl_h_{M}_a_valid"):
M += 1
StIdW = math.ceil(math.log2(M))
host_ifs = [
TileLinkULInterface(dut, host_if_name=f"io_tl_h_{i}") for i in range(M)
]
device_if = TileLinkULInterface(dut, device_if_name="io_tl_d")
reqs = {
i:
create_a_channel_req(address=0x1000 + i * 0x100,
data=0x11223344 + i,
mask=0xF,
source=i)
for i in range(M)
}
received_reqs = {}
async def device_responder():
while len(received_reqs) < M:
req_seen = await device_if.device_get_request()
host_index = req_seen["source"].to_unsigned() & ((1 << StIdW) - 1)
assert req_seen["source"].to_unsigned(
) >> StIdW == reqs[host_index]["source"]
received_reqs[host_index] = req_seen
await device_if.device_respond(opcode=0,
param=0,
size=req_seen["size"],
source=req_seen["source"])
device_task = cocotb.start_soon(device_responder())
for i in range(M):
await host_ifs[i].host_put(reqs[i])
for i in range(M):
response = await host_ifs[i].host_get_response()
assert response["source"] == reqs[i]["source"]
await with_timeout(device_task, 1000)