feat(soc): Add data-driven TileLink-UL crossbar
This commit introduces a data-driven TileLink-UL crossbar for the
Kelvin SoC. The entire crossbar topology is defined in a single
`CrossbarConfig.scala` file, which serves as the single source of
truth for hosts, devices, address maps, and connections.
The `KelvinXbar.scala` module programmatically generates the crossbar
by instantiating and connecting the necessary TileLink primitives
(sockets, FIFOs, width bridges) based on the configuration. This
approach provides a flexible and maintainable way to manage the SoC's
interconnect.
Key features:
- Centralized configuration in `CrossbarConfig.scala`.
- A validator to check for configuration errors, such as overlapping
address ranges.
- Automatic instantiation of TileLink primitives.
- Programmatic address decoding and wiring.
- Support for multiple, asynchronous clock domains.
- A comprehensive cocotb test suite (`kelvin_xbar_test.py`) that
verifies various data paths, including width and clock domain
crossings, error responses, and integrity checks.
Change-Id: I6b341aadfabcc9c2220c1818246989c35bba8ad5
diff --git a/hdl/chisel/src/soc/BUILD b/hdl/chisel/src/soc/BUILD
new file mode 100644
index 0000000..f01f1e7
--- /dev/null
+++ b/hdl/chisel/src/soc/BUILD
@@ -0,0 +1,46 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load("@kelvin_hw//rules:chisel.bzl", "chisel_binary", "chisel_cc_library", "chisel_library")
+
+package(default_visibility = ["//visibility:public"])
+
+chisel_library(
+ name = "crossbar_config_lib",
+ srcs = ["CrossbarConfig.scala"],
+)
+
+chisel_binary(
+ name = "validate_crossbar_config",
+ main_class = "kelvin.soc.CrossbarConfigValidator",
+ deps = [":crossbar_config_lib"],
+)
+
+chisel_library(
+ name = "kelvin_xbar_lib",
+ srcs = ["KelvinXbar.scala"],
+ deps = [
+ ":crossbar_config_lib",
+ "//hdl/chisel/src/bus",
+ "//hdl/chisel/src/kelvin",
+ "//hdl/chisel/src/kelvin:kelvin_params",
+ ],
+)
+
+chisel_cc_library(
+ name = "kelvin_xbar_cc_library",
+ chisel_lib = ":kelvin_xbar_lib",
+ emit_class = "kelvin.soc.KelvinXbarEmitter",
+ module_name = "KelvinXbar",
+)
diff --git a/hdl/chisel/src/soc/CrossbarConfig.scala b/hdl/chisel/src/soc/CrossbarConfig.scala
new file mode 100644
index 0000000..cc5f74e
--- /dev/null
+++ b/hdl/chisel/src/soc/CrossbarConfig.scala
@@ -0,0 +1,160 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kelvin.soc
+
+import chisel3._
+
+/**
+ * A simple case class for defining memory regions.
+ *
+ * @param base The base address of the memory region.
+ * @param size The size of the memory region in bytes.
+ */
+case class AddressRange(base: BigInt, size: BigInt) {
+ /**
+ * Checks if a given dynamic address is within this range.
+ * @param addr The address to check.
+ * @return A Chisel Bool indicating if the address is contained.
+ */
+ def contains(addr: UInt): Bool = {
+ (addr >= base.U) && (addr < (base + size).U)
+ }
+}
+
+/**
+ * Defines the parameters for a host (master) in the crossbar.
+ * @param name The unique name of the host.
+ * @param width The data width of the host interface.
+ */
+case class HostConfig(name: String, width: Int, clockDomain: String = "main")
+
+/**
+ * Defines the parameters for a device (slave) in the crossbar.
+ * @param name The unique name of the device.
+ * @param addr A sequence of AddressRanges that this device occupies.
+ * @param clockDomain An identifier for the clock domain this device belongs to.
+ * @param width The data width of the device interface.
+ */
+case class DeviceConfig(
+ name: String,
+ addr: Seq[AddressRange],
+ clockDomain: String = "main",
+ width: Int = 32
+)
+
+/**
+ * This object contains the complete, concrete configuration for the Kelvin SoC crossbar,
+ * translated from the original tl_config.hjson file.
+ */
+object CrossbarConfig {
+ // List of all host (master) interfaces.
+ val hosts = Seq(
+ HostConfig("kelvin_core", width = 128),
+ HostConfig("ibex_core_i", width = 32, clockDomain = "ibex"),
+ HostConfig("ibex_core_d", width = 32, clockDomain = "ibex")
+ )
+
+ // List of all device (slave) interfaces with their address maps.
+ val devices = Seq(
+ DeviceConfig("kelvin_device", Seq(
+ AddressRange(0x00000000, 0x2000), // 8kB
+ AddressRange(0x00010000, 0x8000), // 32kB
+ AddressRange(0x00030000, 0x1000) // 4kB
+ ), width = 128),
+ DeviceConfig("rom", Seq(AddressRange(0x10000000, 0x8000))), // 32kB
+ DeviceConfig("sram", Seq(AddressRange(0x20000000, 0x400000))), // 4MB
+ DeviceConfig("uart0", Seq(AddressRange(0x40000000, 0x1000))),
+ DeviceConfig("uart1", Seq(AddressRange(0x40010000, 0x1000))),
+ DeviceConfig(
+ name = "spi0",
+ addr = Seq(AddressRange(0x40020000, 0x1000)),
+ clockDomain = "spi" // This device is on a separate clock domain
+ )
+ )
+
+ // A map defining which hosts are allowed to connect to which devices.
+ val connections = Map(
+ "kelvin_core" -> Seq("sram", "uart1", "spi0", "kelvin_device"),
+ "ibex_core_i" -> Seq("rom", "sram"),
+ "ibex_core_d" -> Seq("rom", "sram", "uart0", "kelvin_device")
+ )
+}
+
+/**
+ * A standalone validator for the CrossbarConfig.
+ *
+ * This object can be run to check for configuration errors, such as overlapping
+ * address ranges between devices.
+ */
+object CrossbarConfigValidator extends App {
+ val devices = CrossbarConfig.devices
+
+ println("Running CrossbarConfig validation...")
+
+ // Check for address range collisions
+ for (i <- devices.indices) {
+ for (j <- i + 1 until devices.length) {
+ val dev1 = devices(i)
+ val dev2 = devices(j)
+
+ for (range1 <- dev1.addr) {
+ for (range2 <- dev2.addr) {
+ val start1 = range1.base
+ val end1 = range1.base + range1.size
+ val start2 = range2.base
+ val end2 = range2.base + range2.size
+
+ // Check for overlap: max(start1, start2) < min(end1, end2)
+ val overlap = (start1 < end2) && (start2 < end1)
+
+ if (overlap) {
+ val errorMsg =
+ s"""
+ |FATAL: Address range collision detected!
+ | Device 1: ${dev1.name} -> Range [0x${start1.toString(16)}, 0x${(end1 - 1).toString(16)}]
+ | Device 2: ${dev2.name} -> Range [0x${start2.toString(16)}, 0x${(end2 - 1).toString(16)}]
+ """.stripMargin
+ System.err.println(errorMsg)
+ throw new Exception("Crossbar configuration validation failed.")
+ }
+ }
+ }
+ }
+ }
+
+ println("Validation successful: No address range collisions found.")
+
+ // Pretty-print the configuration
+ println("\n--- Crossbar Configuration ---")
+ println("Hosts:")
+ CrossbarConfig.hosts.foreach(h => println(s" - ${h.name}"))
+
+ println("\nDevices:")
+ CrossbarConfig.devices.foreach {
+ d =>
+ println(s" - ${d.name} (${d.clockDomain} clock domain)")
+ d.addr.foreach {
+ a =>
+ println(f" - 0x${a.base}%08x - 0x${a.base + a.size - 1}%08x (Size: ${a.size / 1024}kB)")
+ }
+ }
+
+ println("\nConnections:")
+ CrossbarConfig.connections.foreach {
+ case (host, devices) =>
+ println(s" - ${host} -> [${devices.mkString(", ")}]")
+ }
+ println("\n--------------------------")
+}
diff --git a/hdl/chisel/src/soc/KelvinXbar.scala b/hdl/chisel/src/soc/KelvinXbar.scala
new file mode 100644
index 0000000..ed81b5f
--- /dev/null
+++ b/hdl/chisel/src/soc/KelvinXbar.scala
@@ -0,0 +1,287 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package kelvin.soc
+
+import chisel3._
+import chisel3.util.{MixedVec, MuxCase}
+import bus._
+import bus.TlulWidthBridge
+
+/**
+ * A dynamically generated IO bundle for the KelvinXbar.
+ *
+ * This bundle's ports are derived from the CrossbarConfig object. It automatically
+ * creates clock and reset ports for any asynchronous domains defined in the config.
+ *
+ * @param hostParams A sequence of TileLink parameters, one for each host.
+ * @param deviceParams A sequence of TileLink parameters, one for each device.
+ */
+class KelvinXbarIO(val hostParams: Seq[bus.TLULParameters], val deviceParams: Seq[bus.TLULParameters]) extends Bundle {
+ val cfg = CrossbarConfig
+
+ // --- Primary Clock and Reset ---
+ val clk_i = Input(Clock())
+ val rst_ni = Input(AsyncReset()) // Use AsyncReset for concrete reset type
+
+ // --- Host (Master) Ports ---
+ val hosts = Flipped(MixedVec(hostParams.map(p => new OpenTitanTileLink.Host2Device(p))))
+
+ // --- Device (Slave) Ports ---
+ val devices = MixedVec(deviceParams.map(p => new OpenTitanTileLink.Host2Device(p)))
+
+ // --- Dynamic Asynchronous Clock/Reset Ports ---
+ // Find all unique clock domains from the config, excluding the main one.
+ val asyncDeviceDomains = cfg.devices.map(_.clockDomain).distinct.filter(_ != "main")
+ val asyncHostDomains = cfg.hosts.map(_.clockDomain).distinct.filter(_ != "main")
+
+ // Create a Vec of Bundles for clock and reset inputs for each async domain.
+ val async_ports_devices = Input(Vec(asyncDeviceDomains.length, new Bundle {
+ val clock = Clock()
+ val reset = AsyncReset()
+ }))
+
+ val async_ports_hosts = Input(Vec(asyncHostDomains.length, new Bundle {
+ val clock = Clock()
+ val reset = AsyncReset()
+ }))
+}
+
+/**
+ * A data-driven TileLink crossbar generator for the Kelvin SoC.
+ *
+ * This RawModule constructs a crossbar by interpreting the CrossbarConfig object.
+ * This gives explicit control over clock and reset signals, which is critical
+ * for a multi-domain design.
+ *
+ * @param p The TileLink UL parameters for the bus.
+ */
+class KelvinXbar(val hostParams: Seq[bus.TLULParameters], val deviceParams: Seq[bus.TLULParameters]) extends RawModule {
+ // Load the single source of truth for the crossbar configuration.
+ val cfg = CrossbarConfig
+
+ // Create simple maps from name to index for easy port access.
+ val hostMap = cfg.hosts.map(_.name).zipWithIndex.toMap
+ val deviceMap = cfg.devices.map(_.name).zipWithIndex.toMap
+
+ // Instantiate the dynamically generated IO bundle.
+ val io = IO(new KelvinXbarIO(hostParams, deviceParams))
+
+ // Find all unique clock domains from the config, excluding the main one.
+ val asyncDeviceDomains = cfg.devices.map(_.clockDomain).distinct.filter(_ != "main")
+ val asyncHostDomains = cfg.hosts.map(_.clockDomain).distinct.filter(_ != "main")
+
+ // --- 1. Graph Analysis ---
+ // Analyze the configuration to understand the connection topology. This will be
+ // used to determine the size of sockets and how to wire them up.
+ val hostConnections = cfg.connections
+ val deviceFanIn = cfg.devices.map { device =>
+ device.name -> cfg.hosts.filter(h => hostConnections(h.name).contains(device.name))
+ }.toMap
+
+ // --- 2. Programmatic Instantiation (within the main clock domain) ---
+ // We use withClockAndReset to provide the explicit clock and reset signals
+ // required by the child modules, as KelvinXbar itself is a RawModule.
+ // The top-level reset is active-low, so we invert it for the active-high
+ // modules instantiated within this block.
+ val (hostSockets, deviceSockets, asyncDeviceFifos, asyncHostFifos, widthBridges) = withClockAndReset(io.clk_i, (!io.rst_ni.asBool).asAsyncReset) {
+ // Create a 1-to-N socket for each host.
+ val hostSocket = hostConnections.map { case (name, devices) =>
+ val hostId = hostMap(name)
+ name -> Module(new TlulSocket1N(hostParams(hostId), N = devices.length))
+ }.toMap
+
+ // Create an M-to-1 socket for each device with more than one master.
+ val deviceSocket = deviceFanIn.collect { case (name, hosts) if hosts.length > 1 =>
+ val deviceId = deviceMap(name)
+ name -> Module(new TlulSocketM1(deviceParams(deviceId), M = hosts.length))
+ }.toMap
+
+ // Create an asynchronous FIFO for each device in a different clock domain.
+ val asyncDeviceFifo = cfg.devices.filter(_.clockDomain != "main").map { device =>
+ val deviceId = deviceMap(device.name)
+ device.name -> Module(new TlulFifoAsync(deviceParams(deviceId)))
+ }.toMap
+
+ // Create an asynchronous FIFO for each host in a different clock domain.
+ val asyncHostFifo = cfg.hosts.filter(_.clockDomain != "main").map { host =>
+ val hostId = hostMap(host.name)
+ host.name -> Module(new TlulFifoAsync(hostParams(hostId)))
+ }.toMap
+
+ // Create a width bridge for each host-device connection with mismatched widths.
+ val widthBridge = hostConnections.flatMap { case (hostName, deviceNames) =>
+ deviceNames.map { deviceName =>
+ val hostId = hostMap(hostName)
+ val deviceId = deviceMap(deviceName)
+ val hostWidth = hostParams(hostId).w * 8
+ val deviceWidth = deviceParams(deviceId).w * 8
+ if (hostWidth != deviceWidth) {
+ val bridge = Module(new TlulWidthBridge(hostParams(hostId), deviceParams(deviceId)))
+ bridge.io.clk_i := io.clk_i
+ bridge.io.rst_ni := io.rst_ni
+ Some((s"${hostName}_to_${deviceName}", bridge))
+ } else {
+ None
+ }
+ }
+ }.flatten.toMap
+ (hostSocket, deviceSocket, asyncDeviceFifo, asyncHostFifo, widthBridge)
+ }
+
+ // --- 3. Programmatic Address Decoding ---
+ // Generate the dev_select logic for each host socket from the address map.
+ hostSockets.foreach { case (hostName, socket) =>
+ // Get the address from the host socket's input channel.
+ val address = socket.io.tl_h.a.bits.address
+ // Find the list of devices this host is allowed to connect to.
+ val connectedDevices = hostConnections(hostName)
+ // The default selection is an index one beyond the number of connected
+ // devices, which routes the request to the internal error responder.
+ val errorIdx = connectedDevices.length.U
+
+ socket.io.dev_select_i := errorIdx
+ when(socket.io.tl_h.a.valid) {
+ socket.io.dev_select_i := MuxCase(errorIdx,
+ connectedDevices.zipWithIndex.map { case (devName, idx) =>
+ val devConfig = cfg.devices.find(_.name == devName).get
+ // Check if the address falls within any of the device's address ranges.
+ val addrMatch = devConfig.addr.map(_.contains(address)).reduce(_ || _)
+ addrMatch -> idx.U
+ }
+ )
+ }
+ }
+
+ // --- 4. Programmatic Wiring ---
+ // This section programmatically connects the entire crossbar graph.
+
+ // A map from async domain name to its index in the IO bundle's Vec.
+ val asyncDeviceDomainMap = asyncDeviceDomains.zipWithIndex.toMap
+ val asyncHostDomainMap = asyncHostDomains.zipWithIndex.toMap
+
+ // Connect top-level host IOs to the host-side of the 1-to-N sockets.
+ for ((hostName, socket) <- hostSockets) {
+ val hostConfig = cfg.hosts.find(_.name == hostName).get
+ if (hostConfig.clockDomain != "main") {
+ asyncHostFifos(hostName).io.tl_h <> io.hosts(hostMap(hostName))
+ socket.io.tl_h <> asyncHostFifos(hostName).io.tl_d
+ } else {
+ socket.io.tl_h <> io.hosts(hostMap(hostName))
+ }
+ }
+
+ // Connect the async FIFOs to their specific clocks and resets.
+ for ((deviceName, fifo) <- asyncDeviceFifos) {
+ val deviceConfig = cfg.devices.find(_.name == deviceName).get
+ val domainIndex = asyncDeviceDomainMap(deviceConfig.clockDomain)
+ fifo.io.clk_h_i := io.clk_i
+ fifo.io.rst_h_i := !io.rst_ni.asBool
+ fifo.io.clk_d_i := io.async_ports_devices(domainIndex).clock
+ fifo.io.rst_d_i := !io.async_ports_devices(domainIndex).reset.asBool
+ }
+
+ for ((hostName, fifo) <- asyncHostFifos) {
+ val hostConfig = cfg.hosts.find(_.name == hostName).get
+ val domainIndex = asyncHostDomainMap(hostConfig.clockDomain)
+ fifo.io.clk_h_i := io.async_ports_hosts(domainIndex).clock
+ fifo.io.rst_h_i := !io.async_ports_hosts(domainIndex).reset.asBool
+ fifo.io.clk_d_i := io.clk_i
+ fifo.io.rst_d_i := !io.rst_ni.asBool
+ }
+
+ // Connect the device-side outputs of the M-to-1 sockets.
+ for ((deviceName, socket) <- deviceSockets) {
+ val deviceConfig = cfg.devices.find(_.name == deviceName).get
+ if (deviceConfig.clockDomain != "main") {
+ // If the device is async, connect the socket to the async FIFO.
+ asyncDeviceFifos(deviceName).io.tl_h <> socket.io.tl_d
+ } else {
+ // Otherwise, connect it directly to the top-level device IO.
+ io.devices(deviceMap(deviceName)) <> socket.io.tl_d
+ }
+ }
+
+ // Connect the device-side of the async FIFOs to the top-level device IOs.
+ for ((deviceName, fifo) <- asyncDeviceFifos) {
+ io.devices(deviceMap(deviceName)) <> fifo.io.tl_d
+ }
+
+ // Connect the device-side outputs of the 1-to-N host sockets.
+ for ((hostName, hostSocket) <- hostSockets) {
+ val connections = hostConnections(hostName)
+ for ((deviceName, portIndex) <- connections.zipWithIndex) {
+ val deviceConfig = cfg.devices.find(_.name == deviceName).get
+ val fanIn = deviceFanIn(deviceName).length
+
+ val hostWidth = hostParams(hostMap(hostName)).w * 8
+ val deviceWidth = deviceParams(deviceMap(deviceName)).w * 8
+
+ val socket_out = Wire(new OpenTitanTileLink.Host2Device(hostParams(hostMap(hostName))))
+ socket_out <> hostSocket.io.tl_d(portIndex)
+
+ val finalPort =
+ if (fanIn > 1) {
+ deviceSockets(deviceName).io.tl_h(deviceFanIn(deviceName).indexWhere(_.name == hostName))
+ } else if (deviceConfig.clockDomain != "main") {
+ asyncDeviceFifos(deviceName).io.tl_h
+ } else {
+ io.devices(deviceMap(deviceName))
+ }
+
+ if (hostWidth != deviceWidth) {
+ val bridge = widthBridges(s"${hostName}_to_${deviceName}")
+ bridge.io.tl_h <> socket_out
+ finalPort <> bridge.io.tl_d
+ } else {
+ finalPort <> socket_out
+ }
+ }
+ }
+}
+
+import _root_.circt.stage.{ChiselStage, FirtoolOption}
+import chisel3.stage.ChiselGeneratorAnnotation
+import kelvin.Parameters
+import scala.annotation.nowarn
+
+/**
+ * A standalone main object to generate the SystemVerilog for the KelvinXbar.
+ *
+ * This can be run via Bazel to produce the final Verilog output.
+ */
+@nowarn
+object KelvinXbarEmitter extends App {
+ // Create a sequence of TLULParameters for hosts and devices based on the config.
+ val hostParams = CrossbarConfig.hosts.map { host =>
+ val p = new Parameters
+ p.lsuDataBits = host.width
+ new bus.TLULParameters(p)
+ }
+ val deviceParams = CrossbarConfig.devices.map { device =>
+ val p = new Parameters
+ p.lsuDataBits = device.width
+ new bus.TLULParameters(p)
+ }
+
+ // Use ChiselStage to generate the Verilog.
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(
+ ChiselGeneratorAnnotation(() =>
+ new KelvinXbar(hostParams, deviceParams)
+ )
+ ) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
diff --git a/tests/cocotb/tlul/BUILD b/tests/cocotb/tlul/BUILD
index 33c324b..0e8417c 100644
--- a/tests/cocotb/tlul/BUILD
+++ b/tests/cocotb/tlul/BUILD
@@ -291,3 +291,47 @@
vcs_test_args = VCS_TEST_ARGS,
vcs_defines = VCS_DEFINES,
)
+
+# --- Rules for KelvinXbar cocotb test ---
+
+verilator_cocotb_model(
+ name = "kelvin_xbar_model",
+ cflags = ["-Wno-fatal"],
+ hdl_toplevel = "KelvinXbar",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/soc:KelvinXbar.sv",
+)
+
+# BEGIN_TESTCASES_FOR_kelvin_xbar_cocotb
+KELVIN_XBAR_TESTCASES = [
+ "test_kelvin_core_to_sram",
+ "test_ibex_d_to_kelvin_device_csr_read",
+ "test_wide_to_narrow_integrity",
+ "test_kelvin_core_to_kelvin_device",
+ "test_kelvin_core_to_uart1",
+ "test_ibex_d_to_kelvin_device",
+ "test_ibex_d_to_invalid_addr",
+]
+# END_TESTCASES_FOR_kelvin_xbar_cocotb
+
+cocotb_test_suite(
+ name = "kelvin_xbar_cocotb",
+ simulators = ["verilator", "vcs"],
+ testcases = KELVIN_XBAR_TESTCASES,
+ testcases_vname = "KELVIN_XBAR_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "KelvinXbar",
+ "waves": True,
+ "seed": "42",
+ "test_module": ["kelvin_xbar_test.py"],
+ "deps": [
+ "//kelvin_test_utils:TileLinkULInterface",
+ "//kelvin_test_utils:secded_golden",
+ ],
+ },
+ verilator_model = ":kelvin_xbar_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/soc:kelvin_xbar_cc_library_verilog"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
\ No newline at end of file
diff --git a/tests/cocotb/tlul/kelvin_xbar_test.py b/tests/cocotb/tlul/kelvin_xbar_test.py
new file mode 100644
index 0000000..0e62adb
--- /dev/null
+++ b/tests/cocotb/tlul/kelvin_xbar_test.py
@@ -0,0 +1,471 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# BEGIN_TESTCASES_FOR_kelvin_xbar_cocotb
+KELVIN_XBAR_TESTCASES = [
+ "test_kelvin_core_to_sram",
+ "test_ibex_d_to_invalid_addr",
+ "test_kelvin_core_to_uart1",
+ "test_ibex_d_to_kelvin_device",
+ "test_kelvin_core_to_kelvin_device",
+ "test_ibex_d_to_kelvin_device_specific_addr",
+ "test_wide_to_narrow_integrity",
+]
+# END_TESTCASES_FOR_kelvin_xbar_cocotb
+
+import cocotb
+from cocotb.clock import Clock
+from cocotb.triggers import RisingEdge, ClockCycles, with_timeout
+
+from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req
+from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg
+
+# --- Configuration Constants ---
+# These constants are derived from CrossbarConfig.scala to make tests readable.
+HOST_MAP = {"kelvin_core": 0, "ibex_core_i": 1, "ibex_core_d": 2}
+DEVICE_MAP = {
+ "kelvin_device": 0,
+ "rom": 1,
+ "sram": 2,
+ "uart0": 3,
+ "uart1": 4,
+ "spi0": 5,
+}
+SRAM_BASE = 0x20000000
+UART1_BASE = 0x40010000
+KELVIN_DEVICE_BASE = 0x00000000
+SPI0_BASE = 0x40020000
+INVALID_ADDR = 0xDEADBEEF
+TIMEOUT_CYCLES = 500
+
+
+# --- Test Setup ---
+async def setup_dut(dut):
+ """Common setup logic for all tests."""
+ # Start the main clock
+ clock = Clock(dut.io_clk_i, 5)
+ cocotb.start_soon(clock.start())
+
+ # Start the asynchronous SPI clock
+ spi_clock = Clock(dut.io_async_ports_devices_0_clock, 20)
+ cocotb.start_soon(spi_clock.start())
+
+ # Start the asynchronous Ibex clock
+ ibex_clock = Clock(dut.io_async_ports_hosts_0_clock, 10)
+ cocotb.start_soon(ibex_clock.start())
+
+ # Create a dictionary of TileLink interfaces for all hosts and devices
+ host_widths = {"kelvin_core": 128, "ibex_core_i": 32, "ibex_core_d": 32}
+ device_widths = {
+ "kelvin_device": 128,
+ "rom": 32,
+ "sram": 32,
+ "uart0": 32,
+ "uart1": 32,
+ "spi0": 32
+ }
+
+ interfaces = {
+ "hosts": [
+ TileLinkULInterface(dut,
+ host_if_name=f"io_hosts_{i}",
+ clock_name="io_clk_i",
+ reset_name="io_rst_ni",
+ width=host_widths[name])
+ if name == "kelvin_core" else TileLinkULInterface(
+ dut,
+ host_if_name=f"io_hosts_{i}",
+ clock_name="io_async_ports_hosts_0_clock",
+ reset_name="io_async_ports_hosts_0_reset",
+ width=host_widths[name]) for name, i in HOST_MAP.items()
+ ],
+ "devices": [
+ TileLinkULInterface(dut,
+ device_if_name=f"io_devices_{i}",
+ clock_name="io_clk_i",
+ reset_name="io_rst_ni",
+ width=device_widths[name])
+ for name, i in DEVICE_MAP.items()
+ ],
+ }
+ # Special case for the async SPI port
+ interfaces["devices"][DEVICE_MAP["spi0"]] = TileLinkULInterface(
+ dut,
+ device_if_name=f"io_devices_{DEVICE_MAP['spi0']}",
+ clock_name="io_async_ports_devices_0_clock",
+ reset_name="io_async_ports_devices_0_reset",
+ width=32)
+
+ # Reset the DUT
+ dut.io_rst_ni.value = 0
+ dut.io_async_ports_devices_0_reset.value = 0
+ dut.io_async_ports_hosts_0_reset.value = 0
+ await ClockCycles(dut.io_clk_i, 5)
+ dut.io_rst_ni.value = 1
+ dut.io_async_ports_devices_0_reset.value = 1
+ dut.io_async_ports_hosts_0_reset.value = 1
+ await ClockCycles(dut.io_clk_i, 5)
+
+ return interfaces, clock
+
+
+# --- Test Cases ---
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_kelvin_core_to_sram(dut):
+ """Verify a simple write/read transaction from kelvin_core to sram."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
+ device_if = interfaces["devices"][DEVICE_MAP["sram"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 128-bit write request from the host
+ test_data = 0x112233445566778899AABBCCDDEEFF00
+ write_txn = create_a_channel_req(address=SRAM_BASE,
+ data=test_data,
+ mask=0xFFFF,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect four 32-bit transactions on the device side, order not guaranteed
+ received_reqs = []
+ for _ in range(4):
+ req = await with_timeout(device_if.device_get_request(), timeout_ns,
+ "ns")
+ received_reqs.append(req)
+ await with_timeout(
+ device_if.device_respond(opcode=0,
+ param=0,
+ size=req["size"],
+ source=req["source"]), timeout_ns, "ns")
+
+ # Sort received requests by address for comparison
+ received_reqs.sort(key=lambda r: r["address"].integer)
+
+ # Verify all beats were received correctly
+ for i in range(4):
+ assert received_reqs[i]["address"] == SRAM_BASE + (i * 4)
+ expected_data = (test_data >> (i * 32)) & 0xFFFFFFFF
+ assert received_reqs[i]["data"] == expected_data
+
+ # Use the last beat (highest address) for the response source
+ last_req = received_reqs[-1]
+
+ # Receive the response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+ assert resp["source"] == write_txn["source"]
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_ibex_d_to_invalid_addr(dut):
+ """Verify that a request to an unmapped address gets an error response."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a write request to an invalid address
+ write_txn = create_a_channel_req(address=INVALID_ADDR,
+ data=0,
+ mask=0xF,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect an error response
+ try:
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns,
+ "ns")
+ assert resp["error"] == 1
+ assert resp["source"] == write_txn["source"]
+ except Exception as e:
+ # Allow the simulation to run for a few more cycles to get a clean waveform
+ await ClockCycles(dut.io_clk_i, 20)
+ raise e
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_kelvin_core_to_uart1(dut):
+ """Verify a 128-bit to 32-bit write transaction."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
+ device_if = interfaces["devices"][DEVICE_MAP["uart1"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 128-bit write request
+ test_data = 0x112233445566778899AABBCCDDEEFF00
+ write_txn = create_a_channel_req(address=UART1_BASE,
+ data=test_data,
+ mask=0xF0F0,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect four 32-bit transactions on the device side, order not guaranteed
+ received_reqs = []
+ for i in range(2):
+ req = await with_timeout(device_if.device_get_request(), timeout_ns,
+ "ns")
+ received_reqs.append(req)
+ await with_timeout(
+ device_if.device_respond(opcode=0,
+ param=0,
+ size=req["size"],
+ source=req["source"],
+ width=device_if.width), timeout_ns, "ns")
+
+ # Sort received requests by address for comparison
+ received_reqs.sort(key=lambda r: r["address"].integer)
+
+ # Verify all beats were received correctly
+ for idx, key in [(0, 1), (1, 3)]:
+ assert received_reqs[idx]["address"] == UART1_BASE + (key * 4)
+ expected_data = (test_data >> (key * 32)) & 0xFFFFFFFF
+ assert received_reqs[idx]["data"] == expected_data
+
+ # Use the last beat (highest address) for the response source
+ last_req = received_reqs[-1]
+
+ # Receive the response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+ assert resp["source"] == write_txn["source"]
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_ibex_d_to_kelvin_device(dut):
+ """Verify a 32-bit to 128-bit write transaction."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
+ device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 32-bit write request
+ write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE,
+ data=0x12345678,
+ mask=0xF,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect a single 128-bit transaction on the device side
+ req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
+ assert req["address"] == KELVIN_DEVICE_BASE
+ assert req["data"] == 0x12345678
+
+ # Send a response from the device
+ await with_timeout(
+ device_if.device_respond(opcode=0,
+ param=0,
+ size=req["size"],
+ source=req["source"],
+ width=device_if.width), timeout_ns, "ns")
+
+ # Expect a single response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_kelvin_core_to_kelvin_device(dut):
+ """Verify a 128-bit to 128-bit write transaction (no bridge)."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
+ device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 128-bit write request
+ write_txn = create_a_channel_req(address=KELVIN_DEVICE_BASE,
+ data=0x112233445566778899AABBCCDDEEFF00,
+ mask=0xFFFF,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect a single 128-bit transaction on the device side
+ req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
+ assert req["address"] == KELVIN_DEVICE_BASE
+ assert req["data"] == 0x112233445566778899AABBCCDDEEFF00
+
+ # Send a response from the device
+ await with_timeout(
+ device_if.device_respond(opcode=0,
+ param=0,
+ size=req["size"],
+ source=req["source"]), timeout_ns, "ns")
+
+ # Expect a single response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+
+
+
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_ibex_d_to_kelvin_device_csr_read(dut):
+ """Verify that Ibex can correctly read a CSR from the Kelvin device.
+
+ This test specifically checks the return path through the width bridge.
+ """
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
+ device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+ csr_addr = KELVIN_DEVICE_BASE + 0x8 # Match the CSR address
+ halted_status = 0x1 # Bit 0 for halted
+
+ async def device_responder():
+ """A mock responder for the kelvin_device."""
+ req = await with_timeout(device_if.device_get_request(), timeout_ns,
+ "ns")
+ assert req["address"] == csr_addr
+ # The CSR data is in the third 32-bit lane of the 128-bit bus.
+ resp_data = halted_status << 64
+ await with_timeout(
+ device_if.device_respond(
+ opcode=1, # AccessAckData
+ param=0,
+ size=req["size"],
+ source=req["source"],
+ data=resp_data,
+ width=device_if.width,
+ ),
+ timeout_ns,
+ "ns")
+
+ # Start the device responder coroutine
+ cocotb.start_soon(device_responder())
+
+ # Send a 32-bit read request from the host
+ # TODO(atv): Do this thru helper?
+ read_txn = {
+ "opcode": 4, # Get
+ "param": 0,
+ "size": 2, # 4 bytes
+ "source": 1,
+ "address": csr_addr,
+ "mask": 0xF,
+ "data": 0,
+ "user": {
+ "cmd_intg": 0,
+ "data_intg": 0,
+ "instr_type": 0,
+ "rsvd": 0
+ }
+ }
+ read_txn["user"]["cmd_intg"] = get_cmd_intg(read_txn, width=host_if.width)
+ read_txn["user"]["data_intg"] = get_data_intg(read_txn["data"],
+ width=host_if.width)
+ await with_timeout(host_if.host_put(read_txn), timeout_ns, "ns")
+
+ # Expect a single response on the host side with the correct data
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+ assert resp[
+ "data"] == halted_status, f"Expected CSR data {halted_status}, but got {resp['data']}"
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_ibex_d_to_kelvin_device_specific_addr(dut):
+ """Verify a write to a specific address in the kelvin_device range."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["ibex_core_d"]]
+ device_if = interfaces["devices"][DEVICE_MAP["kelvin_device"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 32-bit write request to 0x30000
+ test_addr = 0x30000
+ write_txn = create_a_channel_req(address=test_addr,
+ data=0xDEADBEEF,
+ mask=0xF,
+ width=host_if.width)
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect a single 128-bit transaction on the device side
+ req = await with_timeout(device_if.device_get_request(), timeout_ns, "ns")
+ assert req[
+ "address"] == test_addr, f"Expected address 0x{test_addr:X}, but got 0x{req['address'].integer:X}"
+ assert req["data"] == 0xDEADBEEF
+
+ # Send a response from the device
+ await with_timeout(
+ device_if.device_respond(opcode=0,
+ param=0,
+ size=req["size"],
+ source=req["source"],
+ width=device_if.width), timeout_ns, "ns")
+
+ # Expect a single response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+ assert resp["error"] == 0
+
+
+@cocotb.test(timeout_time=10, timeout_unit="us")
+async def test_wide_to_narrow_integrity(dut):
+ """Verify integrity is checked and regenerated across the width bridge."""
+ interfaces, clock = await setup_dut(dut)
+ host_if = interfaces["hosts"][HOST_MAP["kelvin_core"]]
+ device_if = interfaces["devices"][DEVICE_MAP["uart1"]]
+ timeout_ns = TIMEOUT_CYCLES * clock.period
+
+ # Send a 128-bit write request from the host with correct integrity
+ test_data = 0x112233445566778899AABBCCDDEEFF00
+ write_txn = create_a_channel_req(address=UART1_BASE,
+ data=test_data,
+ mask=0xFFFF,
+ width=host_if.width)
+
+ await with_timeout(host_if.host_put(write_txn), timeout_ns, "ns")
+
+ # Expect four 32-bit transactions on the device side
+ received_reqs = []
+ for i in range(4):
+ req = await with_timeout(device_if.device_get_request(), timeout_ns,
+ "ns")
+
+ # Verify that the bridge regenerated integrity correctly for each beat
+ assert req["user"]["cmd_intg"] == get_cmd_intg(req,
+ width=device_if.width)
+ assert req["user"]["data_intg"] == get_data_intg(req["data"],
+ width=device_if.width)
+
+ received_reqs.append(req)
+
+ # Create a response with correct integrity
+ resp_beat = {
+ "opcode": 0,
+ "param": 0,
+ "size": req["size"],
+ "source": req["source"],
+ "sink": 0,
+ "data": 0,
+ "error": 0
+ }
+ resp_beat["user"] = {
+ "rsp_intg": get_rsp_intg(resp_beat, width=device_if.width),
+ "data_intg": get_data_intg(0, width=device_if.width)
+ }
+ await device_if.device_d_fifo.put(resp_beat)
+
+ # Receive the final assembled response on the host side
+ resp = await with_timeout(host_if.host_get_response(), timeout_ns, "ns")
+
+ # Verify that the bridge checked and regenerated integrity correctly
+ expected_resp = resp.copy()
+ expected_resp["error"] = 0
+ assert resp["user"]["rsp_intg"] == get_rsp_intg(expected_resp,
+ width=host_if.width)
+ assert resp["user"]["data_intg"] == get_data_intg(resp["data"],
+ width=host_if.width)
+ assert resp["error"] == 0
+
+
+