feat(soc): Add Spi2TLUL bridge and tests
This commit introduces a new Chisel module, `Spi2TLUL`, which functions as a bridge between a SPI slave interface and a TileLink UL master interface. This allows an external SPI master to initiate TileLink transactions within the SoC.
The bridge includes:
- A register map accessible via SPI for configuring TileLink transactions (address, length, command).
- A data buffer for staging data for both read and write operations.
- Asynchronous queues to handle clock domain crossing between the SPI clock and the main SoC clock.
- State machines to manage SPI commands and TileLink transactions for both reads and writes.
To support verification, this commit also adds:
- A Python-based `SPIMaster` class for cocotb, providing an easy-to-use interface for driving the SPI slave.
- A comprehensive cocotb test suite (`test_spi_to_tlul.py`) with tests for:
- Register read/write access.
- Single and multi-beat TileLink reads.
- Single and multi-beat TileLink writes.
The necessary BUILD file modifications are included to integrate the new module and its tests into the Chisel and cocotb build systems.
Change-Id: Ie1280db53e77cec7b3f734b5bd6d63c8d41b2ca9
diff --git a/hdl/chisel/src/bus/BUILD b/hdl/chisel/src/bus/BUILD
index d793ef6..6c88270 100644
--- a/hdl/chisel/src/bus/BUILD
+++ b/hdl/chisel/src/bus/BUILD
@@ -36,6 +36,7 @@
"KelvinMemIO.scala",
"KelvinToTlul.scala",
"SecdedEncoderTestbench.scala",
+ "Spi2TLUL.scala",
"TileLinkUL.scala",
"TlulFifoAsync.scala",
"TlulFifoSync.scala",
@@ -101,6 +102,21 @@
module_name = "TlulSocketM1_3_128",
)
+chisel_cc_library(
+ name = "spi2tlul_128_cc_library",
+ chisel_lib = ":bus",
+ emit_class = "bus.Spi2TLUL_128_Emitter",
+ module_name = "Spi2TLUL",
+)
+
+verilator_cocotb_model(
+ name = "spi2tlul_128_model",
+ cflags = VERILATOR_BUILD_ARGS,
+ hdl_toplevel = "Spi2TLUL",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/bus:Spi2TLUL.sv",
+)
+
verilator_cocotb_model(
name = "tlul_socket_m1_2_128_model",
cflags = VERILATOR_BUILD_ARGS,
diff --git a/hdl/chisel/src/bus/Spi2TLUL.scala b/hdl/chisel/src/bus/Spi2TLUL.scala
new file mode 100644
index 0000000..a134f93
--- /dev/null
+++ b/hdl/chisel/src/bus/Spi2TLUL.scala
@@ -0,0 +1,343 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bus
+
+import chisel3._
+import chisel3.util._
+import freechips.rocketchip.util._
+
+import kelvin.Parameters
+
+class Spi2TLUL(p: Parameters) extends Module {
+ val tlul_p = new TLULParameters(p)
+ val io = IO(new Bundle {
+ val spi = new Bundle {
+ val clk = Input(Clock())
+ val csb = Input(Bool())
+ val mosi = Input(Bool())
+ val miso = Output(Bool())
+ }
+ val tl = new OpenTitanTileLink.Host2Device(new TLULParameters(p))
+ })
+
+
+ // Synchronize the main asynchronous reset to the SPI clock domain.
+ val spi_domain_reset = withClock(io.spi.clk) {
+ val rst_sync = RegNext(RegNext(reset.asBool, true.B), true.B)
+ rst_sync.asAsyncReset
+ }
+
+ // Combine the main reset with the chip-select reset.
+ // Reset is active when csb is high (inactive) OR when the main reset is active.
+ val combined_reset = io.spi.csb || spi_domain_reset.asBool
+
+ val (mosi_data_reg, miso_data_reg, miso_valid_reg, bit_count_reg, deq_attempted_reg) =
+ withClockAndReset(io.spi.clk, combined_reset.asAsyncReset) {
+ val mosi = RegInit(0.U(8.W))
+ val miso = RegInit(0.U(8.W))
+ // Gate MISO output until the first SPI clock cycle to prevent X propagation.
+ val miso_valid = RegInit(false.B)
+ val bit_count = RegInit(0.U(3.W))
+ val deq_attempted = RegInit(false.B)
+ (mosi, miso, miso_valid, bit_count, deq_attempted)
+ }
+ miso_valid_reg := miso_valid_reg || !io.spi.csb
+ io.spi.miso := Mux(miso_valid_reg, miso_data_reg(7), 0.U)
+
+ val spi2tlul_q = Module(new AsyncQueue(UInt(8.W), AsyncQueueParams(depth = 2, safe = false)))
+ spi2tlul_q.io.enq_clock := io.spi.clk
+ spi2tlul_q.io.enq_reset := reset.asBool
+ spi2tlul_q.io.deq_clock := clock
+ spi2tlul_q.io.deq_reset := reset.asBool
+
+ val completed_byte = Cat(mosi_data_reg(6,0), io.spi.mosi)
+ spi2tlul_q.io.enq.valid := bit_count_reg === 7.U
+ spi2tlul_q.io.enq.bits := completed_byte
+ dontTouch(spi2tlul_q.io.enq)
+
+ object SpiState extends ChiselEnum {
+ val sIDLE, sWAIT_WRITE_DATA, sSEND_READ_DATA = Value
+ }
+ val spi_state_reg = RegInit(SpiState.sIDLE)
+
+ // Define the SPI register map
+ object SpiRegAddress extends ChiselEnum {
+ val TL_ADDR_REG_0 = 0x00.U
+ val TL_ADDR_REG_1 = 0x01.U
+ val TL_ADDR_REG_2 = 0x02.U
+ val TL_ADDR_REG_3 = 0x03.U
+ val TL_LEN_REG = 0x04.U
+ val TL_CMD_REG = 0x05.U
+ val TL_STATUS_REG = 0x06.U
+ val DATA_BUF_PORT = 0x07.U
+ val TL_WRITE_STATUS_REG = 0x08.U
+ }
+
+ // Physical registers backing the map
+ val tl_addr_reg = RegInit(VecInit(Seq.fill(4)(0.U(8.W))))
+ val tl_len_reg = RegInit(0.U(8.W))
+ // Command and Status registers are handled by the TL FSM, not stored directly here.
+ val data_buffer = RegInit(VecInit(Seq.fill(16)(0.U(128.W))))
+ val bulk_read_ptr = RegInit(0.U(8.W)) // Byte pointer into the data buffer
+ val bulk_write_ptr = RegInit(0.U(8.W)) // Byte pointer for writes
+
+ val addr_reg = RegInit(0.U(7.W))
+
+ // === TileLink Read FSM ===
+ object TlReadState extends ChiselEnum {
+ val sIdle, sSendBeat, sWaitBeatAck, sDone, sError = Value
+ }
+ val tl_read_state_reg = RegInit(TlReadState.sIdle)
+
+ // Internal registers for the TL transaction
+ val tl_addr_fsm_reg = RegInit(0.U(32.W))
+ val tl_len_fsm_reg = RegInit(0.U(8.W))
+ val tl_beat_count_reg = RegInit(0.U(8.W))
+
+ // === TileLink Write FSM ===
+ object TlWriteState extends ChiselEnum {
+ val sIdle, sSendBeat, sWaitBeatAck, sDone, sError = Value
+ }
+ val tl_write_state_reg = RegInit(TlWriteState.sIdle)
+
+ // Internal registers for the TL write transaction
+ val tl_write_addr_fsm_reg = RegInit(0.U(32.W))
+ val tl_write_len_fsm_reg = RegInit(0.U(8.W))
+ val tl_write_beat_count_reg = RegInit(0.U(8.W))
+
+ // Wire to detect a write to the command register
+ val do_write = spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire
+ val tl_cmd_reg_write = do_write && (addr_reg === SpiRegAddress.TL_CMD_REG.asUInt)
+ val tl_cmd_reg_data = spi2tlul_q.io.deq.bits
+
+ val tlul2spi_q = Module(new AsyncQueue(UInt(8.W), AsyncQueueParams.singleton(safe = false)))
+ tlul2spi_q.io.enq_clock := clock
+ tlul2spi_q.io.enq_reset := reset.asBool
+ tlul2spi_q.io.deq_clock := io.spi.clk
+ tlul2spi_q.io.deq_reset := reset.asBool
+
+ // Add queues for TileLink channels to handle backpressure
+ val tl_a_q = Module(new Queue(new OpenTitanTileLink.A_Channel(tlul_p), 1))
+ val tl_d_q = Module(new Queue(new OpenTitanTileLink.D_Channel(tlul_p), 1))
+ io.tl.a <> tl_a_q.io.deq
+ io.tl.a.bits := RequestIntegrityGen(tlul_p, tl_a_q.io.deq.bits)
+ tl_d_q.io.enq <> io.tl.d
+ tlul2spi_q.io.deq.ready := !io.spi.csb && !deq_attempted_reg
+
+ // FSM logic
+ val deq_ready = spi_state_reg === SpiState.sIDLE ||
+ spi_state_reg === SpiState.sWAIT_WRITE_DATA
+ spi2tlul_q.io.deq.ready := deq_ready
+ tlul2spi_q.io.enq.valid := (spi_state_reg === SpiState.sSEND_READ_DATA)
+
+ val is_write = spi2tlul_q.io.deq.bits(7)
+ val state_next = MuxCase(spi_state_reg, Seq(
+ (spi_state_reg === SpiState.sIDLE && spi2tlul_q.io.deq.fire) ->
+ Mux(is_write, SpiState.sWAIT_WRITE_DATA, SpiState.sSEND_READ_DATA),
+ (spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire) ->
+ SpiState.sIDLE,
+ (spi_state_reg === SpiState.sSEND_READ_DATA && tlul2spi_q.io.enq.fire) ->
+ SpiState.sIDLE
+ ))
+ spi_state_reg := state_next
+
+ // sIDLE
+ val addr_reg_next = spi2tlul_q.io.deq.bits(6,0)
+ addr_reg := Mux(spi_state_reg === SpiState.sIDLE && spi2tlul_q.io.deq.fire,
+ addr_reg_next,
+ addr_reg)
+
+ // sWAIT_WRITE_DATA
+ val data = spi2tlul_q.io.deq.bits
+ val writing_addr_reg = spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire
+ for (i <- 0 until 4) {
+ tl_addr_reg(i) := Mux(writing_addr_reg && (addr_reg === (SpiRegAddress.TL_ADDR_REG_0 + i.U)), data, tl_addr_reg(i))
+ }
+
+ val writing_len_reg = do_write && addr_reg === SpiRegAddress.TL_LEN_REG.asUInt
+ tl_len_reg := Mux(writing_len_reg, data, tl_len_reg)
+
+ val write_word_index = bulk_write_ptr(7,4)
+ val write_byte_index = bulk_write_ptr(3,0)
+ val write_shift = write_byte_index << 3
+ val write_mask = ~(0xFF.U << write_shift)
+ val write_old_word = data_buffer(write_word_index)
+ val write_new_word = (write_old_word & write_mask) | (data << write_shift)
+ val write_cmd_fire = tl_cmd_reg_write && tl_cmd_reg_data === 2.U
+ val writing_data_buf = do_write && addr_reg === SpiRegAddress.DATA_BUF_PORT.asUInt
+ bulk_write_ptr := Mux(write_cmd_fire, 0.U,
+ Mux(writing_data_buf, bulk_write_ptr + 1.U, bulk_write_ptr))
+
+ // sSEND_READ_DATA
+ val word_index = bulk_read_ptr(7,4)
+ val byte_index = bulk_read_ptr(3,0)
+ val selected_word = data_buffer(word_index)
+
+ val status_map = Seq(
+ TlReadState.sIdle.asUInt -> 0x00.U,
+ TlReadState.sSendBeat.asUInt -> 0x01.U,
+ TlReadState.sWaitBeatAck.asUInt -> 0x01.U,
+ TlReadState.sDone.asUInt -> 0x02.U,
+ TlReadState.sError.asUInt -> 0xFF.U
+ )
+
+ val write_status_map = Seq(
+ TlWriteState.sIdle.asUInt -> 0x00.U,
+ TlWriteState.sSendBeat.asUInt -> 0x01.U,
+ TlWriteState.sWaitBeatAck.asUInt -> 0x01.U,
+ TlWriteState.sDone.asUInt -> 0x02.U,
+ TlWriteState.sError.asUInt -> 0xFF.U
+ )
+
+ val read_map = Seq(
+ SpiRegAddress.TL_ADDR_REG_0.asUInt -> tl_addr_reg(0),
+ SpiRegAddress.TL_ADDR_REG_1.asUInt -> tl_addr_reg(1),
+ SpiRegAddress.TL_ADDR_REG_2.asUInt -> tl_addr_reg(2),
+ SpiRegAddress.TL_ADDR_REG_3.asUInt -> tl_addr_reg(3),
+ SpiRegAddress.TL_LEN_REG.asUInt -> tl_len_reg,
+ SpiRegAddress.TL_STATUS_REG.asUInt -> MuxLookup(tl_read_state_reg.asUInt, 0.U)(status_map),
+ SpiRegAddress.TL_WRITE_STATUS_REG.asUInt ->
+ MuxLookup(tl_write_state_reg.asUInt, 0.U)(write_status_map),
+ SpiRegAddress.DATA_BUF_PORT.asUInt -> (selected_word.asUInt >> (byte_index << 3.U))(7,0),
+ )
+ tlul2spi_q.io.enq.bits := MuxLookup(addr_reg, 0.U(8.W))(read_map)
+
+ val read_cmd_fire = tl_cmd_reg_write && tl_cmd_reg_data === 1.U
+ val reading_data_buf = spi_state_reg === SpiState.sSEND_READ_DATA &&
+ tlul2spi_q.io.enq.fire &&
+ addr_reg === SpiRegAddress.DATA_BUF_PORT.asUInt
+ bulk_read_ptr := Mux(read_cmd_fire, 0.U,
+ Mux(reading_data_buf, bulk_read_ptr + 1.U, bulk_read_ptr))
+
+ withClock(io.spi.clk) {
+ mosi_data_reg := Cat(mosi_data_reg(6,0), io.spi.mosi)
+ bit_count_reg := bit_count_reg + 1.U
+
+ deq_attempted_reg := Mux(bit_count_reg === 0.U, true.B, deq_attempted_reg)
+
+ miso_data_reg := MuxCase(miso_data_reg, Seq(
+ (bit_count_reg === 0.U && tlul2spi_q.io.deq.fire) -> tlul2spi_q.io.deq.bits,
+ (bit_count_reg =/= 0.U) -> Cat(miso_data_reg(6,0), 0.U(1.W)),
+ ))
+ }
+
+ // === TileLink FSM Logic ===
+ val read_fsm_active = tl_read_state_reg =/= TlReadState.sIdle
+ val write_fsm_active = tl_write_state_reg =/= TlWriteState.sIdle
+
+ tl_a_q.io.enq.valid := MuxCase(false.B, Seq(
+ read_fsm_active -> (tl_read_state_reg === TlReadState.sSendBeat),
+ write_fsm_active -> (tl_write_state_reg === TlWriteState.sSendBeat)
+ ))
+
+ tl_d_q.io.deq.ready := MuxCase(false.B, Seq(
+ read_fsm_active -> (tl_read_state_reg === TlReadState.sWaitBeatAck),
+ write_fsm_active -> (tl_write_state_reg === TlWriteState.sWaitBeatAck)
+ ))
+
+ val a_bits = Wire(new OpenTitanTileLink.A_Channel(tlul_p))
+ a_bits.param := 0.U
+ a_bits.size := log2Ceil(tlul_p.w).U
+ a_bits.source := 0.U
+ a_bits.mask := Fill(tlul_p.w, 1.U)
+ a_bits.user := 0.U.asTypeOf(a_bits.user)
+ a_bits.user.instr_type := 9.U // MuBi4False
+
+ a_bits.opcode := Mux(write_fsm_active, TLULOpcodesA.PutFullData.asUInt, TLULOpcodesA.Get.asUInt)
+ a_bits.address := Mux(write_fsm_active,
+ tl_write_addr_fsm_reg + (tl_write_beat_count_reg << log2Ceil(tlul_p.w)),
+ tl_addr_fsm_reg + (tl_beat_count_reg << log2Ceil(tlul_p.w)))
+ a_bits.data := Mux(write_fsm_active, data_buffer(tl_write_beat_count_reg(3,0)), 0.U)
+
+ tl_a_q.io.enq.bits := a_bits
+
+ val reading_tl = tl_read_state_reg === TlReadState.sWaitBeatAck &&
+ tl_d_q.io.deq.fire &&
+ !tl_d_q.io.deq.bits.error
+ for (i <- 0 until data_buffer.length) {
+ val write_to_buffer = i.U === write_word_index && writing_data_buf
+ val read_from_buffer = i.U === tl_beat_count_reg(3,0) && reading_tl
+ data_buffer(i) := MuxCase(data_buffer(i), Seq(
+ write_to_buffer -> write_new_word,
+ read_from_buffer -> tl_d_q.io.deq.bits.data,
+ ))
+ }
+
+ val clear_command = tl_cmd_reg_write && tl_cmd_reg_data === 0.U
+
+ // === TileLink Read FSM Logic ===
+ val tl_state_next = MuxCase(tl_read_state_reg, Seq(
+ (tl_read_state_reg === TlReadState.sIdle && read_cmd_fire) -> TlReadState.sSendBeat,
+ (tl_read_state_reg === TlReadState.sSendBeat && tl_a_q.io.enq.fire) ->
+ TlReadState.sWaitBeatAck,
+ (tl_read_state_reg === TlReadState.sWaitBeatAck && tl_d_q.io.deq.fire) ->
+ MuxCase(TlReadState.sSendBeat, Seq(
+ tl_d_q.io.deq.bits.error -> TlReadState.sError,
+ (tl_beat_count_reg === tl_len_fsm_reg) -> TlReadState.sDone
+ )),
+ (tl_read_state_reg === TlReadState.sDone && clear_command) -> TlReadState.sIdle,
+ (tl_read_state_reg === TlReadState.sError && clear_command) -> TlReadState.sIdle
+ ))
+ tl_read_state_reg := tl_state_next
+
+ val tl_beat_count_next = Mux(tl_read_state_reg === TlReadState.sWaitBeatAck &&
+ tl_d_q.io.deq.fire &&
+ !tl_d_q.io.deq.bits.error,
+ tl_beat_count_reg + 1.U,
+ tl_beat_count_reg)
+ tl_beat_count_reg := Mux(read_cmd_fire, 0.U, tl_beat_count_next)
+
+ tl_addr_fsm_reg := Mux(read_cmd_fire, tl_addr_reg.asUInt, tl_addr_fsm_reg)
+ tl_len_fsm_reg := Mux(read_cmd_fire, tl_len_reg, tl_len_fsm_reg)
+
+ // === TileLink Write FSM Logic ===
+ val tl_write_state_next = MuxCase(tl_write_state_reg, Seq(
+ (tl_write_state_reg === TlWriteState.sIdle && write_cmd_fire) -> TlWriteState.sSendBeat,
+ (tl_write_state_reg === TlWriteState.sSendBeat && tl_a_q.io.enq.fire) ->
+ TlWriteState.sWaitBeatAck,
+ (tl_write_state_reg === TlWriteState.sWaitBeatAck && tl_d_q.io.deq.fire) ->
+ MuxCase(TlWriteState.sSendBeat, Seq(
+ tl_d_q.io.deq.bits.error -> TlWriteState.sError,
+ (tl_write_beat_count_reg === tl_write_len_fsm_reg) -> TlWriteState.sDone
+ )),
+ (tl_write_state_reg === TlWriteState.sDone && clear_command) -> TlWriteState.sIdle,
+ (tl_write_state_reg === TlWriteState.sError && clear_command) -> TlWriteState.sIdle
+ ))
+ tl_write_state_reg := tl_write_state_next
+
+ val tl_write_beat_count_next = Mux(tl_write_state_reg === TlWriteState.sWaitBeatAck &&
+ tl_d_q.io.deq.fire &&
+ !tl_d_q.io.deq.bits.error,
+ tl_write_beat_count_reg + 1.U,
+ tl_write_beat_count_reg)
+ tl_write_beat_count_reg := Mux(write_cmd_fire, 0.U, tl_write_beat_count_next)
+
+ tl_write_addr_fsm_reg := Mux(write_cmd_fire, tl_addr_reg.asUInt, tl_write_addr_fsm_reg)
+ tl_write_len_fsm_reg := Mux(write_cmd_fire, tl_len_reg, tl_write_len_fsm_reg)
+}
+
+import _root_.circt.stage.{ChiselStage,FirtoolOption}
+import chisel3.stage.ChiselGeneratorAnnotation
+import scala.annotation.nowarn
+
+@nowarn
+object Spi2TLUL_128_Emitter extends App {
+ var p = Parameters()
+ p.lsuDataBits = 128
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(ChiselGeneratorAnnotation(() => new Spi2TLUL(p))) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
diff --git a/hdl/chisel/src/bus/TlulIntegrity.scala b/hdl/chisel/src/bus/TlulIntegrity.scala
index fbde102..ec9a254 100644
--- a/hdl/chisel/src/bus/TlulIntegrity.scala
+++ b/hdl/chisel/src/bus/TlulIntegrity.scala
@@ -112,6 +112,14 @@
/**
* Generates TileLink integrity fields for the A-channel (Request).
*/
+object RequestIntegrityGen {
+ def apply(tlul_p: TLULParameters, a_i: OpenTitanTileLink.A_Channel): OpenTitanTileLink.A_Channel = {
+ val req_intg_gen = Module(new RequestIntegrityGen(tlul_p))
+ req_intg_gen.io.a_i := a_i
+ req_intg_gen.io.a_o
+ }
+}
+
class RequestIntegrityGen(p: TLULParameters) extends Module {
override val desiredName = s"RequestIntegrityGen_${p.w}"
val io = IO(new Bundle {
diff --git a/kelvin_test_utils/BUILD b/kelvin_test_utils/BUILD
index 94c7b6a..89b0269 100644
--- a/kelvin_test_utils/BUILD
+++ b/kelvin_test_utils/BUILD
@@ -25,6 +25,15 @@
)
py_library(
+ name = "spi_master",
+ srcs = ["spi_master.py"],
+ deps = [
+ requirement("cocotb"),
+ ],
+ visibility = ["//visibility:public"],
+)
+
+py_library(
name = "secded_golden",
srcs = ["secded_golden.py"],
visibility = ["//visibility:public"],
diff --git a/kelvin_test_utils/spi_master.py b/kelvin_test_utils/spi_master.py
new file mode 100644
index 0000000..c5182fc
--- /dev/null
+++ b/kelvin_test_utils/spi_master.py
@@ -0,0 +1,136 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import cocotb
+from cocotb.clock import Clock
+from cocotb.triggers import ClockCycles, FallingEdge
+
+class SPIMaster:
+ def __init__(self, clk, csb, mosi, miso, main_clk, log):
+ self.clk = clk
+ self.csb = csb
+ self.mosi = mosi
+ self.miso = miso
+ self.main_clk = main_clk
+ self.log = log
+ self.spi_clk_driver = Clock(self.clk, 10)
+ self.clock_task = None
+
+ # Initialize signal values
+ self.clk.value = 0
+ self.csb.value = 1
+ self.mosi.value = 0
+
+ async def start_clock(self):
+ if self.clock_task is None:
+ self.clock_task = cocotb.start_soon(self.spi_clk_driver.start())
+
+ async def stop_clock(self):
+ if self.clock_task:
+ self.clock_task.kill()
+ self.clock_task = None
+ self.clk.value = 0
+
+ async def _set_cs(self, active):
+ self.csb.value = not active
+
+ async def _clock_byte(self, data_out):
+ data_in = 0
+ for i in range(8):
+ self.mosi.value = (data_out >> (7-i)) & 1
+ await FallingEdge(self.clk)
+ data_in = (data_in << 1) | int(self.miso.value)
+ return data_in
+
+ async def idle_clocking(self, cycles):
+ await self.start_clock()
+ await ClockCycles(self.clk, cycles)
+ await self.stop_clock()
+
+ async def spi_transaction(self, byte_out):
+ # Provide a setup time for CSb before the clock starts
+ await self._set_cs(True)
+ await ClockCycles(self.main_clk, 1)
+
+ await self.start_clock()
+ byte_in = await self._clock_byte(byte_out)
+ await ClockCycles(self.clk, 2)
+ await self.stop_clock()
+
+ # Provide a hold time for CSb after the clock stops
+ await ClockCycles(self.main_clk, 1)
+ await self._set_cs(False)
+ await ClockCycles(self.main_clk, 2) # Small delay between transactions
+ return byte_in
+
+ async def write_reg(self, reg_addr, data, wait_cycles=10):
+ """Writes a byte to a register via SPI."""
+ write_cmd = (1 << 7) | reg_addr
+ await self.spi_transaction(write_cmd)
+ await self.spi_transaction(data)
+ if wait_cycles > 0:
+ await ClockCycles(self.main_clk, wait_cycles)
+
+ async def read_reg(self, reg_addr):
+ """Reads a byte from a register via SPI."""
+ read_cmd = reg_addr # MSB is 0 for read
+ await self.spi_transaction(read_cmd)
+ await ClockCycles(self.main_clk, 10)
+ await self.idle_clocking(5)
+ await ClockCycles(self.main_clk, 10)
+ read_data = await self.spi_transaction(0x00)
+ return read_data
+
+ async def poll_reg_for_value(self, reg_addr, expected_value, max_polls=20):
+ """Polls a register until it reads an expected value."""
+ status = -1
+ for _ in range(max_polls):
+ status = await self.read_reg(reg_addr)
+ if status == expected_value:
+ return True
+ await ClockCycles(self.main_clk, 5) # Wait before next poll
+ self.log.error(f"Timed out after {max_polls} polls waiting for register 0x{reg_addr:x} to be 0x{expected_value:x}, got 0x{status:x}")
+ return False
+
+ async def bulk_read_data(self, reg_addr, num_bytes):
+ """Reads a block of data from a pipelined port."""
+ read_cmd = reg_addr
+
+ # The read pipeline is two stages deep. We need to send two commands
+ # to discard two junk bytes before the first valid data byte is received.
+ for _ in range(2):
+ await self.spi_transaction(read_cmd)
+ await ClockCycles(self.main_clk, 10)
+ await self.idle_clocking(5)
+ await ClockCycles(self.main_clk, 10)
+
+ # Read the valid bytes.
+ received_bytes = []
+ for _ in range(num_bytes):
+ read_byte = await self.spi_transaction(read_cmd)
+ received_bytes.append(read_byte)
+ await ClockCycles(self.main_clk, 5)
+
+ # Assemble the received bytes into a single large integer
+ read_data = 0
+ for i, byte in enumerate(received_bytes):
+ read_data |= (byte << (i * 8))
+
+ return read_data
+
+ async def bulk_write_data(self, reg_addr, data, num_bytes):
+ """Writes a block of data to a port."""
+ for i in range(num_bytes):
+ byte = (data >> (i * 8)) & 0xFF
+ await self.write_reg(reg_addr, byte, wait_cycles=5)
diff --git a/tests/cocotb/tlul/BUILD b/tests/cocotb/tlul/BUILD
index 41cae74..71676eb 100644
--- a/tests/cocotb/tlul/BUILD
+++ b/tests/cocotb/tlul/BUILD
@@ -345,4 +345,36 @@
vcs_build_args = VCS_BUILD_ARGS,
vcs_test_args = VCS_TEST_ARGS,
vcs_defines = VCS_DEFINES,
-)
\ No newline at end of file
+)
+
+# BEGIN_TESTCASES_FOR_spi2tlul_cocotb
+SPI2TLUL_TESTCASES = [
+ "test_register_read_write",
+ "test_tlul_read",
+ "test_tlul_multi_beat_read",
+ "test_tlul_write",
+ "test_tlul_multi_beat_write",
+]
+# END_TESTCASES_FOR_spi2tlul_cocotb
+
+cocotb_test_suite(
+ name = "spi2tlul_cocotb",
+ simulators = ["verilator", "vcs"],
+ testcases = SPI2TLUL_TESTCASES,
+ testcases_vname = "SPI2TLUL_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "Spi2TLUL",
+ "waves": True,
+ "seed": "42",
+ "test_module": ["test_spi_to_tlul.py"],
+ "deps": [
+ "//kelvin_test_utils:TileLinkULInterface",
+ "//kelvin_test_utils:spi_master",
+ ],
+ },
+ verilator_model = "//hdl/chisel/src/bus:spi2tlul_128_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/bus:Spi2TLUL.sv"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
diff --git a/tests/cocotb/tlul/test_spi_to_tlul.py b/tests/cocotb/tlul/test_spi_to_tlul.py
new file mode 100644
index 0000000..2cc6599
--- /dev/null
+++ b/tests/cocotb/tlul/test_spi_to_tlul.py
@@ -0,0 +1,365 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import cocotb
+import random
+from cocotb.clock import Clock
+from cocotb.triggers import RisingEdge, ClockCycles, FallingEdge
+from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface
+from kelvin_test_utils.spi_master import SPIMaster
+
+async def setup_dut(dut):
+ # Main clock started by the test
+ dut.io_spi_csb.value = 1 # Start with chip select inactive
+ dut.reset.value = 1
+ await ClockCycles(dut.clock, 2)
+ dut.reset.value = 0
+ await RisingEdge(dut.clock)
+
+@cocotb.test()
+async def test_register_read_write(dut):
+ # Start the main clock
+ clock = Clock(dut.clock, 10)
+ cocotb.start_soon(clock.start())
+
+ await setup_dut(dut)
+ spi_master = SPIMaster(
+ clk=dut.io_spi_clk,
+ csb=dut.io_spi_csb,
+ mosi=dut.io_spi_mosi,
+ miso=dut.io_spi_miso,
+ main_clk=dut.clock,
+ log=dut._log
+ )
+
+ # Write Transaction
+ write_data = random.randint(0, 255)
+ await spi_master.write_reg(0x04, write_data)
+
+ # Read Transaction
+ read_data = await spi_master.read_reg(0x04)
+ assert read_data == write_data, f"Read data 0x{read_data:x} does not match written data 0x{write_data:x}"
+
+ await ClockCycles(dut.clock, 20)
+
+@cocotb.test()
+async def test_tlul_read(dut):
+ """Tests back-to-back TileLink UL read transactions initiated via SPI."""
+ # Start the main clock
+ clock = Clock(dut.clock, 10)
+ cocotb.start_soon(clock.start())
+
+ await setup_dut(dut)
+ spi_master = SPIMaster(
+ clk=dut.io_spi_clk,
+ csb=dut.io_spi_csb,
+ mosi=dut.io_spi_mosi,
+ miso=dut.io_spi_miso,
+ main_clk=dut.clock,
+ log=dut._log
+ )
+ tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
+ await tl_device.init()
+
+ # --- Device Responder Task ---
+ async def device_responder():
+ for i in range(3):
+ req = await tl_device.device_get_request()
+ assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}"
+
+ # Formulate a unique response for each transaction
+ response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
+
+ await tl_device.device_respond(
+ opcode=1, # AccessAckData
+ param=0,
+ size=req['size'],
+ source=req['source'],
+ data=response_data,
+ error=0,
+ width=128
+ )
+
+ responder_task = cocotb.start_soon(device_responder())
+
+ # --- Main Test Logic ---
+ for i in range(3):
+ # 1. Configure the TileLink read via SPI
+ target_addr = 0x40001000 + (i * 16) # Use a new address for each transaction
+ # Write address (32 bits) byte by byte
+ for j in range(4):
+ addr_byte = (target_addr >> (j * 8)) & 0xFF
+ await spi_master.write_reg(0x00 + j, addr_byte)
+
+ # Write length (0 means 1 beat)
+ await spi_master.write_reg(0x04, 0x00)
+
+ # 2. Issue the read command
+ await spi_master.write_reg(0x05, 0x01, wait_cycles=0)
+
+ # --- Verification ---
+ # 1. Poll the status register until the transaction is done
+ assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done"
+
+ # 2. Read the data from the buffer port
+ read_data = await spi_master.bulk_read_data(0x07, 16)
+
+ # 3. Compare with expected data
+ expected_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
+ assert read_data == expected_data
+
+ # 4. Clear the status to return FSM to Idle
+ await spi_master.write_reg(0x05, 0x00)
+
+ await responder_task
+
+@cocotb.test()
+async def test_tlul_multi_beat_read(dut):
+ """Tests a multi-beat TileLink UL read transaction initiated via SPI."""
+ # Start the main clock
+ clock = Clock(dut.clock, 10)
+ cocotb.start_soon(clock.start())
+
+ await setup_dut(dut)
+ spi_master = SPIMaster(
+ clk=dut.io_spi_clk,
+ csb=dut.io_spi_csb,
+ mosi=dut.io_spi_mosi,
+ miso=dut.io_spi_miso,
+ main_clk=dut.clock,
+ log=dut._log
+ )
+ tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
+ await tl_device.init()
+
+ num_beats = 4
+
+ # --- Device Responder Task ---
+ async def device_responder():
+ for i in range(num_beats):
+ req = await tl_device.device_get_request()
+ assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}"
+
+ # Formulate a unique response for each transaction
+ response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
+
+ await tl_device.device_respond(
+ opcode=1, # AccessAckData
+ param=0,
+ size=req['size'],
+ source=req['source'],
+ data=response_data,
+ error=0,
+ width=128
+ )
+
+ responder_task = cocotb.start_soon(device_responder())
+
+ # --- Main Test Logic ---
+ # 1. Configure the TileLink read via SPI
+ target_addr = 0x40001000
+ # Write address (32 bits) byte by byte
+ for j in range(4):
+ addr_byte = (target_addr >> (j * 8)) & 0xFF
+ await spi_master.write_reg(0x00 + j, addr_byte)
+
+ # Write length (N-1 for N beats)
+ await spi_master.write_reg(0x04, num_beats - 1)
+
+ # 2. Issue the read command
+ await spi_master.write_reg(0x05, 0x01, wait_cycles=0)
+
+ # Add a delay to allow the status to propagate across the CDC
+ await ClockCycles(dut.clock, 20)
+
+ # --- Verification ---
+ # 1. Poll the status register until the transaction is done
+ assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done"
+
+ # 2. Read the data from the buffer port
+ bytes_to_read = num_beats * 16
+ read_data = await spi_master.bulk_read_data(0x07, bytes_to_read)
+
+ # 3. Compare with expected data
+ expected_data = 0
+ for i in range(num_beats):
+ word = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i
+ expected_data |= (word << (i * 128))
+
+ assert read_data == expected_data
+
+ # 4. Clear the status to return FSM to Idle
+ await spi_master.write_reg(0x05, 0x00)
+
+ await responder_task
+
+@cocotb.test()
+async def test_tlul_write(dut):
+ """Tests back-to-back TileLink UL write transactions initiated via SPI."""
+ # Start the main clock
+ clock = Clock(dut.clock, 10)
+ cocotb.start_soon(clock.start())
+
+ await setup_dut(dut)
+ spi_master = SPIMaster(
+ clk=dut.io_spi_clk,
+ csb=dut.io_spi_csb,
+ mosi=dut.io_spi_mosi,
+ miso=dut.io_spi_miso,
+ main_clk=dut.clock,
+ log=dut._log
+ )
+ tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
+ await tl_device.init()
+
+ # --- Device Responder Task ---
+ # This task will receive the write requests and send acknowledgments.
+ received_data_list = []
+ async def device_responder():
+ for _ in range(3):
+ req = await tl_device.device_get_request()
+
+ # For a 'Put' request, we expect opcode 0 (PutFull) or 1 (PutPartial)
+ assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}"
+
+ # Capture the data for verification
+ received_data_list.append(int(req['data']))
+
+ # A 'Put' operation is acknowledged with a single 'AccessAck'
+ await tl_device.device_respond(
+ opcode=0, # AccessAck
+ param=0,
+ size=req['size'],
+ source=req['source'],
+ error=0,
+ width=128
+ )
+
+ responder_task = cocotb.start_soon(device_responder())
+
+ # --- Main Test Logic ---
+ expected_data_list = []
+ for i in range(3):
+ # 1. Write data to the DUT's internal buffer
+ write_data = 0x11223344_55667788_99AABBCC_DDEEFF00 + i
+ expected_data_list.append(write_data)
+ await spi_master.bulk_write_data(0x07, write_data, 16)
+
+ # 2. Configure the TileLink write via SPI
+ target_addr = 0x40002000 + (i * 16)
+ # Write address (32 bits) byte by byte
+ for j in range(4):
+ addr_byte = (target_addr >> (j * 8)) & 0xFF
+ await spi_master.write_reg(0x00 + j, addr_byte)
+
+ # Write length (0 means 1 beat)
+ await spi_master.write_reg(0x04, 0x00)
+
+ # 3. Issue the write command
+ await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command
+
+ # --- Verification ---
+ # 1. Poll the status register until the transaction is done
+ assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done"
+
+ # 4. Clear the status to return FSM to Idle
+ await spi_master.write_reg(0x05, 0x00)
+
+ # Wait for the responder to finish handling all requests
+ await responder_task
+
+ # Verify all data received by the responder
+ assert len(received_data_list) == 3, f"Responder received {len(received_data_list)} transactions, expected 3"
+ assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}"
+
+@cocotb.test()
+async def test_tlul_multi_beat_write(dut):
+ """Tests a multi-beat TileLink UL write transaction initiated via SPI."""
+ # Start the main clock
+ clock = Clock(dut.clock, 10)
+ cocotb.start_soon(clock.start())
+
+ await setup_dut(dut)
+ spi_master = SPIMaster(
+ clk=dut.io_spi_clk,
+ csb=dut.io_spi_csb,
+ mosi=dut.io_spi_mosi,
+ miso=dut.io_spi_miso,
+ main_clk=dut.clock,
+ log=dut._log
+ )
+ tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128)
+ await tl_device.init()
+
+ num_beats = 4
+
+ # --- Device Responder Task ---
+ received_data_list = []
+ async def device_responder():
+ # For a multi-beat write, we expect num_beats requests, with an ack after each.
+ for i in range(num_beats):
+ req = await tl_device.device_get_request()
+ assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}"
+ received_data_list.append(int(req['data']))
+
+ # Send an AccessAck after each beat
+ await tl_device.device_respond(
+ opcode=0, # AccessAck
+ param=0,
+ size=req['size'],
+ source=req['source'],
+ error=0,
+ width=128
+ )
+
+ responder_task = cocotb.start_soon(device_responder())
+
+ # --- Main Test Logic ---
+ # 1. Prepare and write data to the DUT's internal buffer
+ expected_data_list = []
+ full_write_data = 0
+ for i in range(num_beats):
+ word = 0x11223344_55667788_99AABBCC_DDEEFF00 + i
+ expected_data_list.append(word)
+ full_write_data |= (word << (i * 128))
+
+ bytes_to_write = num_beats * 16
+ await spi_master.bulk_write_data(0x07, full_write_data, bytes_to_write)
+
+ # 2. Configure the TileLink write via SPI
+ target_addr = 0x40002000
+ # Write address (32 bits) byte by byte
+ for j in range(4):
+ addr_byte = (target_addr >> (j * 8)) & 0xFF
+ await spi_master.write_reg(0x00 + j, addr_byte)
+
+ # Write length (N-1 for N beats)
+ await spi_master.write_reg(0x04, num_beats - 1)
+
+ # 3. Issue the write command
+ await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command
+
+ # --- Verification ---
+ # 1. Poll the status register until the transaction is done
+ assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done"
+
+ # 2. Wait for the responder to finish
+ await responder_task
+
+ # 3. Verify the data received by the responder
+ assert len(received_data_list) == num_beats, f"Responder received {len(received_data_list)} beats, expected {num_beats}"
+ assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}"
+
+ # 4. Clear the status to return FSM to Idle
+ await spi_master.write_reg(0x05, 0x00)
\ No newline at end of file