feat(soc): Add Spi2TLUL bridge and tests This commit introduces a new Chisel module, `Spi2TLUL`, which functions as a bridge between a SPI slave interface and a TileLink UL master interface. This allows an external SPI master to initiate TileLink transactions within the SoC. The bridge includes: - A register map accessible via SPI for configuring TileLink transactions (address, length, command). - A data buffer for staging data for both read and write operations. - Asynchronous queues to handle clock domain crossing between the SPI clock and the main SoC clock. - State machines to manage SPI commands and TileLink transactions for both reads and writes. To support verification, this commit also adds: - A Python-based `SPIMaster` class for cocotb, providing an easy-to-use interface for driving the SPI slave. - A comprehensive cocotb test suite (`test_spi_to_tlul.py`) with tests for: - Register read/write access. - Single and multi-beat TileLink reads. - Single and multi-beat TileLink writes. The necessary BUILD file modifications are included to integrate the new module and its tests into the Chisel and cocotb build systems. Change-Id: Ie1280db53e77cec7b3f734b5bd6d63c8d41b2ca9
diff --git a/hdl/chisel/src/bus/BUILD b/hdl/chisel/src/bus/BUILD index d793ef6..6c88270 100644 --- a/hdl/chisel/src/bus/BUILD +++ b/hdl/chisel/src/bus/BUILD
@@ -36,6 +36,7 @@ "KelvinMemIO.scala", "KelvinToTlul.scala", "SecdedEncoderTestbench.scala", + "Spi2TLUL.scala", "TileLinkUL.scala", "TlulFifoAsync.scala", "TlulFifoSync.scala", @@ -101,6 +102,21 @@ module_name = "TlulSocketM1_3_128", ) +chisel_cc_library( + name = "spi2tlul_128_cc_library", + chisel_lib = ":bus", + emit_class = "bus.Spi2TLUL_128_Emitter", + module_name = "Spi2TLUL", +) + +verilator_cocotb_model( + name = "spi2tlul_128_model", + cflags = VERILATOR_BUILD_ARGS, + hdl_toplevel = "Spi2TLUL", + trace = True, + verilog_source = "//hdl/chisel/src/bus:Spi2TLUL.sv", +) + verilator_cocotb_model( name = "tlul_socket_m1_2_128_model", cflags = VERILATOR_BUILD_ARGS,
diff --git a/hdl/chisel/src/bus/Spi2TLUL.scala b/hdl/chisel/src/bus/Spi2TLUL.scala new file mode 100644 index 0000000..a134f93 --- /dev/null +++ b/hdl/chisel/src/bus/Spi2TLUL.scala
@@ -0,0 +1,343 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bus + +import chisel3._ +import chisel3.util._ +import freechips.rocketchip.util._ + +import kelvin.Parameters + +class Spi2TLUL(p: Parameters) extends Module { + val tlul_p = new TLULParameters(p) + val io = IO(new Bundle { + val spi = new Bundle { + val clk = Input(Clock()) + val csb = Input(Bool()) + val mosi = Input(Bool()) + val miso = Output(Bool()) + } + val tl = new OpenTitanTileLink.Host2Device(new TLULParameters(p)) + }) + + + // Synchronize the main asynchronous reset to the SPI clock domain. + val spi_domain_reset = withClock(io.spi.clk) { + val rst_sync = RegNext(RegNext(reset.asBool, true.B), true.B) + rst_sync.asAsyncReset + } + + // Combine the main reset with the chip-select reset. + // Reset is active when csb is high (inactive) OR when the main reset is active. + val combined_reset = io.spi.csb || spi_domain_reset.asBool + + val (mosi_data_reg, miso_data_reg, miso_valid_reg, bit_count_reg, deq_attempted_reg) = + withClockAndReset(io.spi.clk, combined_reset.asAsyncReset) { + val mosi = RegInit(0.U(8.W)) + val miso = RegInit(0.U(8.W)) + // Gate MISO output until the first SPI clock cycle to prevent X propagation. + val miso_valid = RegInit(false.B) + val bit_count = RegInit(0.U(3.W)) + val deq_attempted = RegInit(false.B) + (mosi, miso, miso_valid, bit_count, deq_attempted) + } + miso_valid_reg := miso_valid_reg || !io.spi.csb + io.spi.miso := Mux(miso_valid_reg, miso_data_reg(7), 0.U) + + val spi2tlul_q = Module(new AsyncQueue(UInt(8.W), AsyncQueueParams(depth = 2, safe = false))) + spi2tlul_q.io.enq_clock := io.spi.clk + spi2tlul_q.io.enq_reset := reset.asBool + spi2tlul_q.io.deq_clock := clock + spi2tlul_q.io.deq_reset := reset.asBool + + val completed_byte = Cat(mosi_data_reg(6,0), io.spi.mosi) + spi2tlul_q.io.enq.valid := bit_count_reg === 7.U + spi2tlul_q.io.enq.bits := completed_byte + dontTouch(spi2tlul_q.io.enq) + + object SpiState extends ChiselEnum { + val sIDLE, sWAIT_WRITE_DATA, sSEND_READ_DATA = Value + } + val spi_state_reg = RegInit(SpiState.sIDLE) + + // Define the SPI register map + object SpiRegAddress extends ChiselEnum { + val TL_ADDR_REG_0 = 0x00.U + val TL_ADDR_REG_1 = 0x01.U + val TL_ADDR_REG_2 = 0x02.U + val TL_ADDR_REG_3 = 0x03.U + val TL_LEN_REG = 0x04.U + val TL_CMD_REG = 0x05.U + val TL_STATUS_REG = 0x06.U + val DATA_BUF_PORT = 0x07.U + val TL_WRITE_STATUS_REG = 0x08.U + } + + // Physical registers backing the map + val tl_addr_reg = RegInit(VecInit(Seq.fill(4)(0.U(8.W)))) + val tl_len_reg = RegInit(0.U(8.W)) + // Command and Status registers are handled by the TL FSM, not stored directly here. + val data_buffer = RegInit(VecInit(Seq.fill(16)(0.U(128.W)))) + val bulk_read_ptr = RegInit(0.U(8.W)) // Byte pointer into the data buffer + val bulk_write_ptr = RegInit(0.U(8.W)) // Byte pointer for writes + + val addr_reg = RegInit(0.U(7.W)) + + // === TileLink Read FSM === + object TlReadState extends ChiselEnum { + val sIdle, sSendBeat, sWaitBeatAck, sDone, sError = Value + } + val tl_read_state_reg = RegInit(TlReadState.sIdle) + + // Internal registers for the TL transaction + val tl_addr_fsm_reg = RegInit(0.U(32.W)) + val tl_len_fsm_reg = RegInit(0.U(8.W)) + val tl_beat_count_reg = RegInit(0.U(8.W)) + + // === TileLink Write FSM === + object TlWriteState extends ChiselEnum { + val sIdle, sSendBeat, sWaitBeatAck, sDone, sError = Value + } + val tl_write_state_reg = RegInit(TlWriteState.sIdle) + + // Internal registers for the TL write transaction + val tl_write_addr_fsm_reg = RegInit(0.U(32.W)) + val tl_write_len_fsm_reg = RegInit(0.U(8.W)) + val tl_write_beat_count_reg = RegInit(0.U(8.W)) + + // Wire to detect a write to the command register + val do_write = spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire + val tl_cmd_reg_write = do_write && (addr_reg === SpiRegAddress.TL_CMD_REG.asUInt) + val tl_cmd_reg_data = spi2tlul_q.io.deq.bits + + val tlul2spi_q = Module(new AsyncQueue(UInt(8.W), AsyncQueueParams.singleton(safe = false))) + tlul2spi_q.io.enq_clock := clock + tlul2spi_q.io.enq_reset := reset.asBool + tlul2spi_q.io.deq_clock := io.spi.clk + tlul2spi_q.io.deq_reset := reset.asBool + + // Add queues for TileLink channels to handle backpressure + val tl_a_q = Module(new Queue(new OpenTitanTileLink.A_Channel(tlul_p), 1)) + val tl_d_q = Module(new Queue(new OpenTitanTileLink.D_Channel(tlul_p), 1)) + io.tl.a <> tl_a_q.io.deq + io.tl.a.bits := RequestIntegrityGen(tlul_p, tl_a_q.io.deq.bits) + tl_d_q.io.enq <> io.tl.d + tlul2spi_q.io.deq.ready := !io.spi.csb && !deq_attempted_reg + + // FSM logic + val deq_ready = spi_state_reg === SpiState.sIDLE || + spi_state_reg === SpiState.sWAIT_WRITE_DATA + spi2tlul_q.io.deq.ready := deq_ready + tlul2spi_q.io.enq.valid := (spi_state_reg === SpiState.sSEND_READ_DATA) + + val is_write = spi2tlul_q.io.deq.bits(7) + val state_next = MuxCase(spi_state_reg, Seq( + (spi_state_reg === SpiState.sIDLE && spi2tlul_q.io.deq.fire) -> + Mux(is_write, SpiState.sWAIT_WRITE_DATA, SpiState.sSEND_READ_DATA), + (spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire) -> + SpiState.sIDLE, + (spi_state_reg === SpiState.sSEND_READ_DATA && tlul2spi_q.io.enq.fire) -> + SpiState.sIDLE + )) + spi_state_reg := state_next + + // sIDLE + val addr_reg_next = spi2tlul_q.io.deq.bits(6,0) + addr_reg := Mux(spi_state_reg === SpiState.sIDLE && spi2tlul_q.io.deq.fire, + addr_reg_next, + addr_reg) + + // sWAIT_WRITE_DATA + val data = spi2tlul_q.io.deq.bits + val writing_addr_reg = spi_state_reg === SpiState.sWAIT_WRITE_DATA && spi2tlul_q.io.deq.fire + for (i <- 0 until 4) { + tl_addr_reg(i) := Mux(writing_addr_reg && (addr_reg === (SpiRegAddress.TL_ADDR_REG_0 + i.U)), data, tl_addr_reg(i)) + } + + val writing_len_reg = do_write && addr_reg === SpiRegAddress.TL_LEN_REG.asUInt + tl_len_reg := Mux(writing_len_reg, data, tl_len_reg) + + val write_word_index = bulk_write_ptr(7,4) + val write_byte_index = bulk_write_ptr(3,0) + val write_shift = write_byte_index << 3 + val write_mask = ~(0xFF.U << write_shift) + val write_old_word = data_buffer(write_word_index) + val write_new_word = (write_old_word & write_mask) | (data << write_shift) + val write_cmd_fire = tl_cmd_reg_write && tl_cmd_reg_data === 2.U + val writing_data_buf = do_write && addr_reg === SpiRegAddress.DATA_BUF_PORT.asUInt + bulk_write_ptr := Mux(write_cmd_fire, 0.U, + Mux(writing_data_buf, bulk_write_ptr + 1.U, bulk_write_ptr)) + + // sSEND_READ_DATA + val word_index = bulk_read_ptr(7,4) + val byte_index = bulk_read_ptr(3,0) + val selected_word = data_buffer(word_index) + + val status_map = Seq( + TlReadState.sIdle.asUInt -> 0x00.U, + TlReadState.sSendBeat.asUInt -> 0x01.U, + TlReadState.sWaitBeatAck.asUInt -> 0x01.U, + TlReadState.sDone.asUInt -> 0x02.U, + TlReadState.sError.asUInt -> 0xFF.U + ) + + val write_status_map = Seq( + TlWriteState.sIdle.asUInt -> 0x00.U, + TlWriteState.sSendBeat.asUInt -> 0x01.U, + TlWriteState.sWaitBeatAck.asUInt -> 0x01.U, + TlWriteState.sDone.asUInt -> 0x02.U, + TlWriteState.sError.asUInt -> 0xFF.U + ) + + val read_map = Seq( + SpiRegAddress.TL_ADDR_REG_0.asUInt -> tl_addr_reg(0), + SpiRegAddress.TL_ADDR_REG_1.asUInt -> tl_addr_reg(1), + SpiRegAddress.TL_ADDR_REG_2.asUInt -> tl_addr_reg(2), + SpiRegAddress.TL_ADDR_REG_3.asUInt -> tl_addr_reg(3), + SpiRegAddress.TL_LEN_REG.asUInt -> tl_len_reg, + SpiRegAddress.TL_STATUS_REG.asUInt -> MuxLookup(tl_read_state_reg.asUInt, 0.U)(status_map), + SpiRegAddress.TL_WRITE_STATUS_REG.asUInt -> + MuxLookup(tl_write_state_reg.asUInt, 0.U)(write_status_map), + SpiRegAddress.DATA_BUF_PORT.asUInt -> (selected_word.asUInt >> (byte_index << 3.U))(7,0), + ) + tlul2spi_q.io.enq.bits := MuxLookup(addr_reg, 0.U(8.W))(read_map) + + val read_cmd_fire = tl_cmd_reg_write && tl_cmd_reg_data === 1.U + val reading_data_buf = spi_state_reg === SpiState.sSEND_READ_DATA && + tlul2spi_q.io.enq.fire && + addr_reg === SpiRegAddress.DATA_BUF_PORT.asUInt + bulk_read_ptr := Mux(read_cmd_fire, 0.U, + Mux(reading_data_buf, bulk_read_ptr + 1.U, bulk_read_ptr)) + + withClock(io.spi.clk) { + mosi_data_reg := Cat(mosi_data_reg(6,0), io.spi.mosi) + bit_count_reg := bit_count_reg + 1.U + + deq_attempted_reg := Mux(bit_count_reg === 0.U, true.B, deq_attempted_reg) + + miso_data_reg := MuxCase(miso_data_reg, Seq( + (bit_count_reg === 0.U && tlul2spi_q.io.deq.fire) -> tlul2spi_q.io.deq.bits, + (bit_count_reg =/= 0.U) -> Cat(miso_data_reg(6,0), 0.U(1.W)), + )) + } + + // === TileLink FSM Logic === + val read_fsm_active = tl_read_state_reg =/= TlReadState.sIdle + val write_fsm_active = tl_write_state_reg =/= TlWriteState.sIdle + + tl_a_q.io.enq.valid := MuxCase(false.B, Seq( + read_fsm_active -> (tl_read_state_reg === TlReadState.sSendBeat), + write_fsm_active -> (tl_write_state_reg === TlWriteState.sSendBeat) + )) + + tl_d_q.io.deq.ready := MuxCase(false.B, Seq( + read_fsm_active -> (tl_read_state_reg === TlReadState.sWaitBeatAck), + write_fsm_active -> (tl_write_state_reg === TlWriteState.sWaitBeatAck) + )) + + val a_bits = Wire(new OpenTitanTileLink.A_Channel(tlul_p)) + a_bits.param := 0.U + a_bits.size := log2Ceil(tlul_p.w).U + a_bits.source := 0.U + a_bits.mask := Fill(tlul_p.w, 1.U) + a_bits.user := 0.U.asTypeOf(a_bits.user) + a_bits.user.instr_type := 9.U // MuBi4False + + a_bits.opcode := Mux(write_fsm_active, TLULOpcodesA.PutFullData.asUInt, TLULOpcodesA.Get.asUInt) + a_bits.address := Mux(write_fsm_active, + tl_write_addr_fsm_reg + (tl_write_beat_count_reg << log2Ceil(tlul_p.w)), + tl_addr_fsm_reg + (tl_beat_count_reg << log2Ceil(tlul_p.w))) + a_bits.data := Mux(write_fsm_active, data_buffer(tl_write_beat_count_reg(3,0)), 0.U) + + tl_a_q.io.enq.bits := a_bits + + val reading_tl = tl_read_state_reg === TlReadState.sWaitBeatAck && + tl_d_q.io.deq.fire && + !tl_d_q.io.deq.bits.error + for (i <- 0 until data_buffer.length) { + val write_to_buffer = i.U === write_word_index && writing_data_buf + val read_from_buffer = i.U === tl_beat_count_reg(3,0) && reading_tl + data_buffer(i) := MuxCase(data_buffer(i), Seq( + write_to_buffer -> write_new_word, + read_from_buffer -> tl_d_q.io.deq.bits.data, + )) + } + + val clear_command = tl_cmd_reg_write && tl_cmd_reg_data === 0.U + + // === TileLink Read FSM Logic === + val tl_state_next = MuxCase(tl_read_state_reg, Seq( + (tl_read_state_reg === TlReadState.sIdle && read_cmd_fire) -> TlReadState.sSendBeat, + (tl_read_state_reg === TlReadState.sSendBeat && tl_a_q.io.enq.fire) -> + TlReadState.sWaitBeatAck, + (tl_read_state_reg === TlReadState.sWaitBeatAck && tl_d_q.io.deq.fire) -> + MuxCase(TlReadState.sSendBeat, Seq( + tl_d_q.io.deq.bits.error -> TlReadState.sError, + (tl_beat_count_reg === tl_len_fsm_reg) -> TlReadState.sDone + )), + (tl_read_state_reg === TlReadState.sDone && clear_command) -> TlReadState.sIdle, + (tl_read_state_reg === TlReadState.sError && clear_command) -> TlReadState.sIdle + )) + tl_read_state_reg := tl_state_next + + val tl_beat_count_next = Mux(tl_read_state_reg === TlReadState.sWaitBeatAck && + tl_d_q.io.deq.fire && + !tl_d_q.io.deq.bits.error, + tl_beat_count_reg + 1.U, + tl_beat_count_reg) + tl_beat_count_reg := Mux(read_cmd_fire, 0.U, tl_beat_count_next) + + tl_addr_fsm_reg := Mux(read_cmd_fire, tl_addr_reg.asUInt, tl_addr_fsm_reg) + tl_len_fsm_reg := Mux(read_cmd_fire, tl_len_reg, tl_len_fsm_reg) + + // === TileLink Write FSM Logic === + val tl_write_state_next = MuxCase(tl_write_state_reg, Seq( + (tl_write_state_reg === TlWriteState.sIdle && write_cmd_fire) -> TlWriteState.sSendBeat, + (tl_write_state_reg === TlWriteState.sSendBeat && tl_a_q.io.enq.fire) -> + TlWriteState.sWaitBeatAck, + (tl_write_state_reg === TlWriteState.sWaitBeatAck && tl_d_q.io.deq.fire) -> + MuxCase(TlWriteState.sSendBeat, Seq( + tl_d_q.io.deq.bits.error -> TlWriteState.sError, + (tl_write_beat_count_reg === tl_write_len_fsm_reg) -> TlWriteState.sDone + )), + (tl_write_state_reg === TlWriteState.sDone && clear_command) -> TlWriteState.sIdle, + (tl_write_state_reg === TlWriteState.sError && clear_command) -> TlWriteState.sIdle + )) + tl_write_state_reg := tl_write_state_next + + val tl_write_beat_count_next = Mux(tl_write_state_reg === TlWriteState.sWaitBeatAck && + tl_d_q.io.deq.fire && + !tl_d_q.io.deq.bits.error, + tl_write_beat_count_reg + 1.U, + tl_write_beat_count_reg) + tl_write_beat_count_reg := Mux(write_cmd_fire, 0.U, tl_write_beat_count_next) + + tl_write_addr_fsm_reg := Mux(write_cmd_fire, tl_addr_reg.asUInt, tl_write_addr_fsm_reg) + tl_write_len_fsm_reg := Mux(write_cmd_fire, tl_len_reg, tl_write_len_fsm_reg) +} + +import _root_.circt.stage.{ChiselStage,FirtoolOption} +import chisel3.stage.ChiselGeneratorAnnotation +import scala.annotation.nowarn + +@nowarn +object Spi2TLUL_128_Emitter extends App { + var p = Parameters() + p.lsuDataBits = 128 + (new ChiselStage).execute( + Array("--target", "systemverilog") ++ args, + Seq(ChiselGeneratorAnnotation(() => new Spi2TLUL(p))) ++ Seq(FirtoolOption("-enable-layers=Verification")) + ) +}
diff --git a/hdl/chisel/src/bus/TlulIntegrity.scala b/hdl/chisel/src/bus/TlulIntegrity.scala index fbde102..ec9a254 100644 --- a/hdl/chisel/src/bus/TlulIntegrity.scala +++ b/hdl/chisel/src/bus/TlulIntegrity.scala
@@ -112,6 +112,14 @@ /** * Generates TileLink integrity fields for the A-channel (Request). */ +object RequestIntegrityGen { + def apply(tlul_p: TLULParameters, a_i: OpenTitanTileLink.A_Channel): OpenTitanTileLink.A_Channel = { + val req_intg_gen = Module(new RequestIntegrityGen(tlul_p)) + req_intg_gen.io.a_i := a_i + req_intg_gen.io.a_o + } +} + class RequestIntegrityGen(p: TLULParameters) extends Module { override val desiredName = s"RequestIntegrityGen_${p.w}" val io = IO(new Bundle {
diff --git a/kelvin_test_utils/BUILD b/kelvin_test_utils/BUILD index 94c7b6a..89b0269 100644 --- a/kelvin_test_utils/BUILD +++ b/kelvin_test_utils/BUILD
@@ -25,6 +25,15 @@ ) py_library( + name = "spi_master", + srcs = ["spi_master.py"], + deps = [ + requirement("cocotb"), + ], + visibility = ["//visibility:public"], +) + +py_library( name = "secded_golden", srcs = ["secded_golden.py"], visibility = ["//visibility:public"],
diff --git a/kelvin_test_utils/spi_master.py b/kelvin_test_utils/spi_master.py new file mode 100644 index 0000000..c5182fc --- /dev/null +++ b/kelvin_test_utils/spi_master.py
@@ -0,0 +1,136 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cocotb +from cocotb.clock import Clock +from cocotb.triggers import ClockCycles, FallingEdge + +class SPIMaster: + def __init__(self, clk, csb, mosi, miso, main_clk, log): + self.clk = clk + self.csb = csb + self.mosi = mosi + self.miso = miso + self.main_clk = main_clk + self.log = log + self.spi_clk_driver = Clock(self.clk, 10) + self.clock_task = None + + # Initialize signal values + self.clk.value = 0 + self.csb.value = 1 + self.mosi.value = 0 + + async def start_clock(self): + if self.clock_task is None: + self.clock_task = cocotb.start_soon(self.spi_clk_driver.start()) + + async def stop_clock(self): + if self.clock_task: + self.clock_task.kill() + self.clock_task = None + self.clk.value = 0 + + async def _set_cs(self, active): + self.csb.value = not active + + async def _clock_byte(self, data_out): + data_in = 0 + for i in range(8): + self.mosi.value = (data_out >> (7-i)) & 1 + await FallingEdge(self.clk) + data_in = (data_in << 1) | int(self.miso.value) + return data_in + + async def idle_clocking(self, cycles): + await self.start_clock() + await ClockCycles(self.clk, cycles) + await self.stop_clock() + + async def spi_transaction(self, byte_out): + # Provide a setup time for CSb before the clock starts + await self._set_cs(True) + await ClockCycles(self.main_clk, 1) + + await self.start_clock() + byte_in = await self._clock_byte(byte_out) + await ClockCycles(self.clk, 2) + await self.stop_clock() + + # Provide a hold time for CSb after the clock stops + await ClockCycles(self.main_clk, 1) + await self._set_cs(False) + await ClockCycles(self.main_clk, 2) # Small delay between transactions + return byte_in + + async def write_reg(self, reg_addr, data, wait_cycles=10): + """Writes a byte to a register via SPI.""" + write_cmd = (1 << 7) | reg_addr + await self.spi_transaction(write_cmd) + await self.spi_transaction(data) + if wait_cycles > 0: + await ClockCycles(self.main_clk, wait_cycles) + + async def read_reg(self, reg_addr): + """Reads a byte from a register via SPI.""" + read_cmd = reg_addr # MSB is 0 for read + await self.spi_transaction(read_cmd) + await ClockCycles(self.main_clk, 10) + await self.idle_clocking(5) + await ClockCycles(self.main_clk, 10) + read_data = await self.spi_transaction(0x00) + return read_data + + async def poll_reg_for_value(self, reg_addr, expected_value, max_polls=20): + """Polls a register until it reads an expected value.""" + status = -1 + for _ in range(max_polls): + status = await self.read_reg(reg_addr) + if status == expected_value: + return True + await ClockCycles(self.main_clk, 5) # Wait before next poll + self.log.error(f"Timed out after {max_polls} polls waiting for register 0x{reg_addr:x} to be 0x{expected_value:x}, got 0x{status:x}") + return False + + async def bulk_read_data(self, reg_addr, num_bytes): + """Reads a block of data from a pipelined port.""" + read_cmd = reg_addr + + # The read pipeline is two stages deep. We need to send two commands + # to discard two junk bytes before the first valid data byte is received. + for _ in range(2): + await self.spi_transaction(read_cmd) + await ClockCycles(self.main_clk, 10) + await self.idle_clocking(5) + await ClockCycles(self.main_clk, 10) + + # Read the valid bytes. + received_bytes = [] + for _ in range(num_bytes): + read_byte = await self.spi_transaction(read_cmd) + received_bytes.append(read_byte) + await ClockCycles(self.main_clk, 5) + + # Assemble the received bytes into a single large integer + read_data = 0 + for i, byte in enumerate(received_bytes): + read_data |= (byte << (i * 8)) + + return read_data + + async def bulk_write_data(self, reg_addr, data, num_bytes): + """Writes a block of data to a port.""" + for i in range(num_bytes): + byte = (data >> (i * 8)) & 0xFF + await self.write_reg(reg_addr, byte, wait_cycles=5)
diff --git a/tests/cocotb/tlul/BUILD b/tests/cocotb/tlul/BUILD index 41cae74..71676eb 100644 --- a/tests/cocotb/tlul/BUILD +++ b/tests/cocotb/tlul/BUILD
@@ -345,4 +345,36 @@ vcs_build_args = VCS_BUILD_ARGS, vcs_test_args = VCS_TEST_ARGS, vcs_defines = VCS_DEFINES, -) \ No newline at end of file +) + +# BEGIN_TESTCASES_FOR_spi2tlul_cocotb +SPI2TLUL_TESTCASES = [ + "test_register_read_write", + "test_tlul_read", + "test_tlul_multi_beat_read", + "test_tlul_write", + "test_tlul_multi_beat_write", +] +# END_TESTCASES_FOR_spi2tlul_cocotb + +cocotb_test_suite( + name = "spi2tlul_cocotb", + simulators = ["verilator", "vcs"], + testcases = SPI2TLUL_TESTCASES, + testcases_vname = "SPI2TLUL_TESTCASES", + tests_kwargs = { + "hdl_toplevel": "Spi2TLUL", + "waves": True, + "seed": "42", + "test_module": ["test_spi_to_tlul.py"], + "deps": [ + "//kelvin_test_utils:TileLinkULInterface", + "//kelvin_test_utils:spi_master", + ], + }, + verilator_model = "//hdl/chisel/src/bus:spi2tlul_128_model", + vcs_verilog_sources = ["//hdl/chisel/src/bus:Spi2TLUL.sv"], + vcs_build_args = VCS_BUILD_ARGS, + vcs_test_args = VCS_TEST_ARGS, + vcs_defines = VCS_DEFINES, +)
diff --git a/tests/cocotb/tlul/test_spi_to_tlul.py b/tests/cocotb/tlul/test_spi_to_tlul.py new file mode 100644 index 0000000..2cc6599 --- /dev/null +++ b/tests/cocotb/tlul/test_spi_to_tlul.py
@@ -0,0 +1,365 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import cocotb +import random +from cocotb.clock import Clock +from cocotb.triggers import RisingEdge, ClockCycles, FallingEdge +from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface +from kelvin_test_utils.spi_master import SPIMaster + +async def setup_dut(dut): + # Main clock started by the test + dut.io_spi_csb.value = 1 # Start with chip select inactive + dut.reset.value = 1 + await ClockCycles(dut.clock, 2) + dut.reset.value = 0 + await RisingEdge(dut.clock) + +@cocotb.test() +async def test_register_read_write(dut): + # Start the main clock + clock = Clock(dut.clock, 10) + cocotb.start_soon(clock.start()) + + await setup_dut(dut) + spi_master = SPIMaster( + clk=dut.io_spi_clk, + csb=dut.io_spi_csb, + mosi=dut.io_spi_mosi, + miso=dut.io_spi_miso, + main_clk=dut.clock, + log=dut._log + ) + + # Write Transaction + write_data = random.randint(0, 255) + await spi_master.write_reg(0x04, write_data) + + # Read Transaction + read_data = await spi_master.read_reg(0x04) + assert read_data == write_data, f"Read data 0x{read_data:x} does not match written data 0x{write_data:x}" + + await ClockCycles(dut.clock, 20) + +@cocotb.test() +async def test_tlul_read(dut): + """Tests back-to-back TileLink UL read transactions initiated via SPI.""" + # Start the main clock + clock = Clock(dut.clock, 10) + cocotb.start_soon(clock.start()) + + await setup_dut(dut) + spi_master = SPIMaster( + clk=dut.io_spi_clk, + csb=dut.io_spi_csb, + mosi=dut.io_spi_mosi, + miso=dut.io_spi_miso, + main_clk=dut.clock, + log=dut._log + ) + tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128) + await tl_device.init() + + # --- Device Responder Task --- + async def device_responder(): + for i in range(3): + req = await tl_device.device_get_request() + assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}" + + # Formulate a unique response for each transaction + response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i + + await tl_device.device_respond( + opcode=1, # AccessAckData + param=0, + size=req['size'], + source=req['source'], + data=response_data, + error=0, + width=128 + ) + + responder_task = cocotb.start_soon(device_responder()) + + # --- Main Test Logic --- + for i in range(3): + # 1. Configure the TileLink read via SPI + target_addr = 0x40001000 + (i * 16) # Use a new address for each transaction + # Write address (32 bits) byte by byte + for j in range(4): + addr_byte = (target_addr >> (j * 8)) & 0xFF + await spi_master.write_reg(0x00 + j, addr_byte) + + # Write length (0 means 1 beat) + await spi_master.write_reg(0x04, 0x00) + + # 2. Issue the read command + await spi_master.write_reg(0x05, 0x01, wait_cycles=0) + + # --- Verification --- + # 1. Poll the status register until the transaction is done + assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done" + + # 2. Read the data from the buffer port + read_data = await spi_master.bulk_read_data(0x07, 16) + + # 3. Compare with expected data + expected_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i + assert read_data == expected_data + + # 4. Clear the status to return FSM to Idle + await spi_master.write_reg(0x05, 0x00) + + await responder_task + +@cocotb.test() +async def test_tlul_multi_beat_read(dut): + """Tests a multi-beat TileLink UL read transaction initiated via SPI.""" + # Start the main clock + clock = Clock(dut.clock, 10) + cocotb.start_soon(clock.start()) + + await setup_dut(dut) + spi_master = SPIMaster( + clk=dut.io_spi_clk, + csb=dut.io_spi_csb, + mosi=dut.io_spi_mosi, + miso=dut.io_spi_miso, + main_clk=dut.clock, + log=dut._log + ) + tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128) + await tl_device.init() + + num_beats = 4 + + # --- Device Responder Task --- + async def device_responder(): + for i in range(num_beats): + req = await tl_device.device_get_request() + assert int(req['opcode']) == 4, f"Expected Get opcode (4), got {req['opcode']}" + + # Formulate a unique response for each transaction + response_data = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i + + await tl_device.device_respond( + opcode=1, # AccessAckData + param=0, + size=req['size'], + source=req['source'], + data=response_data, + error=0, + width=128 + ) + + responder_task = cocotb.start_soon(device_responder()) + + # --- Main Test Logic --- + # 1. Configure the TileLink read via SPI + target_addr = 0x40001000 + # Write address (32 bits) byte by byte + for j in range(4): + addr_byte = (target_addr >> (j * 8)) & 0xFF + await spi_master.write_reg(0x00 + j, addr_byte) + + # Write length (N-1 for N beats) + await spi_master.write_reg(0x04, num_beats - 1) + + # 2. Issue the read command + await spi_master.write_reg(0x05, 0x01, wait_cycles=0) + + # Add a delay to allow the status to propagate across the CDC + await ClockCycles(dut.clock, 20) + + # --- Verification --- + # 1. Poll the status register until the transaction is done + assert await spi_master.poll_reg_for_value(0x06, 0x02), "Timed out waiting for status to be Done" + + # 2. Read the data from the buffer port + bytes_to_read = num_beats * 16 + read_data = await spi_master.bulk_read_data(0x07, bytes_to_read) + + # 3. Compare with expected data + expected_data = 0 + for i in range(num_beats): + word = 0xDEADBEEF_CAFEF00D_ABAD1DEA_C0DED00D + i + expected_data |= (word << (i * 128)) + + assert read_data == expected_data + + # 4. Clear the status to return FSM to Idle + await spi_master.write_reg(0x05, 0x00) + + await responder_task + +@cocotb.test() +async def test_tlul_write(dut): + """Tests back-to-back TileLink UL write transactions initiated via SPI.""" + # Start the main clock + clock = Clock(dut.clock, 10) + cocotb.start_soon(clock.start()) + + await setup_dut(dut) + spi_master = SPIMaster( + clk=dut.io_spi_clk, + csb=dut.io_spi_csb, + mosi=dut.io_spi_mosi, + miso=dut.io_spi_miso, + main_clk=dut.clock, + log=dut._log + ) + tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128) + await tl_device.init() + + # --- Device Responder Task --- + # This task will receive the write requests and send acknowledgments. + received_data_list = [] + async def device_responder(): + for _ in range(3): + req = await tl_device.device_get_request() + + # For a 'Put' request, we expect opcode 0 (PutFull) or 1 (PutPartial) + assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}" + + # Capture the data for verification + received_data_list.append(int(req['data'])) + + # A 'Put' operation is acknowledged with a single 'AccessAck' + await tl_device.device_respond( + opcode=0, # AccessAck + param=0, + size=req['size'], + source=req['source'], + error=0, + width=128 + ) + + responder_task = cocotb.start_soon(device_responder()) + + # --- Main Test Logic --- + expected_data_list = [] + for i in range(3): + # 1. Write data to the DUT's internal buffer + write_data = 0x11223344_55667788_99AABBCC_DDEEFF00 + i + expected_data_list.append(write_data) + await spi_master.bulk_write_data(0x07, write_data, 16) + + # 2. Configure the TileLink write via SPI + target_addr = 0x40002000 + (i * 16) + # Write address (32 bits) byte by byte + for j in range(4): + addr_byte = (target_addr >> (j * 8)) & 0xFF + await spi_master.write_reg(0x00 + j, addr_byte) + + # Write length (0 means 1 beat) + await spi_master.write_reg(0x04, 0x00) + + # 3. Issue the write command + await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command + + # --- Verification --- + # 1. Poll the status register until the transaction is done + assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done" + + # 4. Clear the status to return FSM to Idle + await spi_master.write_reg(0x05, 0x00) + + # Wait for the responder to finish handling all requests + await responder_task + + # Verify all data received by the responder + assert len(received_data_list) == 3, f"Responder received {len(received_data_list)} transactions, expected 3" + assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}" + +@cocotb.test() +async def test_tlul_multi_beat_write(dut): + """Tests a multi-beat TileLink UL write transaction initiated via SPI.""" + # Start the main clock + clock = Clock(dut.clock, 10) + cocotb.start_soon(clock.start()) + + await setup_dut(dut) + spi_master = SPIMaster( + clk=dut.io_spi_clk, + csb=dut.io_spi_csb, + mosi=dut.io_spi_mosi, + miso=dut.io_spi_miso, + main_clk=dut.clock, + log=dut._log + ) + tl_device = TileLinkULInterface(dut, device_if_name="io_tl", width=128) + await tl_device.init() + + num_beats = 4 + + # --- Device Responder Task --- + received_data_list = [] + async def device_responder(): + # For a multi-beat write, we expect num_beats requests, with an ack after each. + for i in range(num_beats): + req = await tl_device.device_get_request() + assert int(req['opcode']) in [0, 1], f"Expected PutFullData or PutPartialData, got opcode {req['opcode']}" + received_data_list.append(int(req['data'])) + + # Send an AccessAck after each beat + await tl_device.device_respond( + opcode=0, # AccessAck + param=0, + size=req['size'], + source=req['source'], + error=0, + width=128 + ) + + responder_task = cocotb.start_soon(device_responder()) + + # --- Main Test Logic --- + # 1. Prepare and write data to the DUT's internal buffer + expected_data_list = [] + full_write_data = 0 + for i in range(num_beats): + word = 0x11223344_55667788_99AABBCC_DDEEFF00 + i + expected_data_list.append(word) + full_write_data |= (word << (i * 128)) + + bytes_to_write = num_beats * 16 + await spi_master.bulk_write_data(0x07, full_write_data, bytes_to_write) + + # 2. Configure the TileLink write via SPI + target_addr = 0x40002000 + # Write address (32 bits) byte by byte + for j in range(4): + addr_byte = (target_addr >> (j * 8)) & 0xFF + await spi_master.write_reg(0x00 + j, addr_byte) + + # Write length (N-1 for N beats) + await spi_master.write_reg(0x04, num_beats - 1) + + # 3. Issue the write command + await spi_master.write_reg(0x05, 0x02, wait_cycles=20) # Start write command + + # --- Verification --- + # 1. Poll the status register until the transaction is done + assert await spi_master.poll_reg_for_value(0x08, 0x02), "Timed out waiting for write status to be Done" + + # 2. Wait for the responder to finish + await responder_task + + # 3. Verify the data received by the responder + assert len(received_data_list) == num_beats, f"Responder received {len(received_data_list)} beats, expected {num_beats}" + assert received_data_list == expected_data_list, f"Received data {received_data_list} does not match expected data {expected_data_list}" + + # 4. Clear the status to return FSM to Idle + await spi_master.write_reg(0x05, 0x00) \ No newline at end of file