| # Copyright 2025 Google LLC | 
 | # | 
 | # Licensed under the Apache License, Version 2.0 (the "License"); | 
 | # you may not use this file except in compliance with the License. | 
 | # You may obtain a copy of the License at | 
 | # | 
 | #     https://www.apache.org/licenses/LICENSE-2.0 | 
 | # | 
 | # Unless required by applicable law or agreed to in writing, software | 
 | # distributed under the License is distributed on an "AS IS" BASIS, | 
 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | # See the License for the specific language governing permissions and | 
 | # limitations under the License. | 
 |  | 
 | import cocotb | 
 | from cocotb.clock import Clock | 
 | from cocotb.triggers import RisingEdge, ClockCycles | 
 |  | 
 | from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req | 
 |  | 
 |  | 
 | async def setup_dut(dut): | 
 |     """Common setup for all tests.""" | 
 |     h_clock = Clock(dut.io_clk_h_i, 10) | 
 |     d_clock = Clock(dut.io_clk_d_i, 13)  # Asymmetric clocks | 
 |     cocotb.start_soon(h_clock.start()) | 
 |     cocotb.start_soon(d_clock.start()) | 
 |  | 
 |     dut.io_rst_h_i.value = 1 | 
 |     dut.io_rst_d_i.value = 1 | 
 |     await ClockCycles(dut.io_clk_h_i, 2) | 
 |     await ClockCycles(dut.io_clk_d_i, 2) | 
 |     dut.io_rst_h_i.value = 0 | 
 |     dut.io_rst_d_i.value = 0 | 
 |     await RisingEdge(dut.io_clk_h_i) | 
 |     await RisingEdge(dut.io_clk_d_i) | 
 |  | 
 |  | 
 | @cocotb.test() | 
 | async def test_async_crossing(dut): | 
 |     """Verify requests are arbitrated and responses are routed correctly.""" | 
 |     await setup_dut(dut) | 
 |  | 
 |     host_if = TileLinkULInterface(dut, | 
 |                                   host_if_name="io_tl_h", | 
 |                                   clock_name="io_clk_h_i", | 
 |                                   reset_name="io_rst_h_i") | 
 |     device_if = TileLinkULInterface(dut, | 
 |                                     device_if_name="io_tl_d", | 
 |                                     clock_name="io_clk_d_i", | 
 |                                     reset_name="io_rst_d_i") | 
 |  | 
 |     req = create_a_channel_req(address=0x1000, | 
 |                                data=0x11223344, | 
 |                                mask=0xF, | 
 |                                source=1) | 
 |  | 
 |     # Start a concurrent task to handle the device-side interaction | 
 |     async def device_responder(): | 
 |         req_seen = await device_if.device_get_request() | 
 |         assert req_seen["source"] == req["source"] | 
 |         await device_if.device_respond(opcode=0, | 
 |                                        param=0, | 
 |                                        size=req_seen["size"], | 
 |                                        source=req_seen["source"]) | 
 |  | 
 |     device_task = cocotb.start_soon(device_responder()) | 
 |  | 
 |     # Send the request from the host | 
 |     await host_if.host_put(req) | 
 |  | 
 |     # Wait for the response on the host side | 
 |     response = await host_if.host_get_response() | 
 |     assert response["source"] == req["source"] | 
 |  | 
 |     # Wait for the device task to complete | 
 |     await device_task |