feat(bus): Add SECDED integrity for TileLink-UL
This commit implements SECDED (Single Error Correction, Double Error
Detection) for the TileLink-UL bus to ensure data integrity. The
implementation is compatible with OpenTitan's `prim_secded_inv` logic.
Key changes include:
- A parameterized `SecdedEncoder` Chisel module that supports 32-bit,
57-bit, and 128-bit data widths. The 128-bit implementation uses a
folded ECC scheme.
- `RequestIntegrityGen/Check` and `ResponseIntegrityGen/Check` modules
to generate and verify integrity codes for the TileLink A and D
channels.
- A Python-based golden model (`secded_golden.py`) for the SECDED
logic to ensure correctness.
- A new `TileLinkULInterface` cocotb utility for simplified,
transaction-based testing of the TileLink bus.
- Comprehensive cocotb tests that verify the `SecdedEncoder` against
the golden model and test the full `TlulIntegrity` functionality,
including fault injection.
Change-Id: I20a059b78a47699f145ae397b0e037d8c56dab69
diff --git a/hdl/chisel/src/bus/BUILD b/hdl/chisel/src/bus/BUILD
index 0a257c2..48737ef 100644
--- a/hdl/chisel/src/bus/BUILD
+++ b/hdl/chisel/src/bus/BUILD
@@ -22,6 +22,11 @@
package(default_visibility = ["//visibility:public"])
+load(
+ "//tests/cocotb:build_defs.bzl",
+ "VERILATOR_BUILD_ARGS",
+)
+
chisel_library(
name = "bus",
srcs = [
@@ -30,7 +35,10 @@
"Axi2TLUL.scala",
"KelvinMemIO.scala",
"KelvinToTlul.scala",
+ "SecdedEncoderTestbench.scala",
"TileLinkUL.scala",
+ "TlulIntegrity.scala",
+ "TlulIntegrityTestbench.scala",
],
deps = [
"//hdl/chisel/src/kelvin:kelvin_params",
@@ -121,4 +129,64 @@
},
vcs_verilog_sources = ["//hdl/chisel/src/bus:axi2tlul_cc_library_verilog"],
verilator_model = ":axi2tlul_model",
+)
+
+chisel_cc_library(
+ name = "tlul_integrity_testbench_cc_library",
+ chisel_lib = ":bus",
+ emit_class = "bus.EmitTlulIntegrityTestbench",
+ module_name = "TlulIntegrityTestbench",
+)
+
+verilator_cocotb_model(
+ name = "tlul_integrity_testbench_model",
+ cflags = VERILATOR_BUILD_ARGS,
+ hdl_toplevel = "TlulIntegrityTestbench",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/bus:TlulIntegrityTestbench.sv",
+)
+
+chisel_cc_library(
+ name = "secded_encoder_testbench_cc_library",
+ chisel_lib = ":bus",
+ emit_class = "bus.EmitSecdedEncoderTestbench",
+ module_name = "SecdedEncoderTestbench128",
+)
+
+verilator_cocotb_model(
+ name = "secded_encoder_testbench_model",
+ cflags = VERILATOR_BUILD_ARGS,
+ hdl_toplevel = "SecdedEncoderTestbench128",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/bus:SecdedEncoderTestbench128.sv",
+)
+
+chisel_cc_library(
+ name = "secded_encoder_testbench_32_cc_library",
+ chisel_lib = ":bus",
+ emit_class = "bus.EmitSecdedEncoderTestbench32",
+ module_name = "SecdedEncoderTestbench32",
+)
+
+verilator_cocotb_model(
+ name = "secded_encoder_testbench_32_model",
+ cflags = VERILATOR_BUILD_ARGS,
+ hdl_toplevel = "SecdedEncoderTestbench32",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/bus:SecdedEncoderTestbench32.sv",
+)
+
+chisel_cc_library(
+ name = "secded_encoder_testbench_57_cc_library",
+ chisel_lib = ":bus",
+ emit_class = "bus.EmitSecdedEncoderTestbench57",
+ module_name = "SecdedEncoderTestbench57",
+)
+
+verilator_cocotb_model(
+ name = "secded_encoder_testbench_57_model",
+ cflags = VERILATOR_BUILD_ARGS,
+ hdl_toplevel = "SecdedEncoderTestbench57",
+ trace = True,
+ verilog_source = "//hdl/chisel/src/bus:SecdedEncoderTestbench57.sv",
)
\ No newline at end of file
diff --git a/hdl/chisel/src/bus/SecdedEncoderTestbench.scala b/hdl/chisel/src/bus/SecdedEncoderTestbench.scala
new file mode 100644
index 0000000..f35df89
--- /dev/null
+++ b/hdl/chisel/src/bus/SecdedEncoderTestbench.scala
@@ -0,0 +1,65 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bus
+
+import chisel3._
+import kelvin.Parameters
+
+/**
+ * A testbench DUT that instantiates a single SecdedEncoder so it can be
+ * tested with cocotb.
+ */
+class SecdedEncoderTestbench(val w: Int, val moduleName: String) extends Module {
+ override def desiredName = moduleName
+
+ val io = IO(new Bundle {
+ val data_i = Input(UInt(w.W))
+ val ecc_o = Output(UInt(7.W))
+ })
+
+ val encoder = Module(new SecdedEncoder(w))
+ encoder.io.data_i := io.data_i
+ io.ecc_o := encoder.io.ecc_o
+}
+
+import _root_.circt.stage.{ChiselStage,FirtoolOption}
+import chisel3.stage.ChiselGeneratorAnnotation
+import scala.annotation.nowarn
+
+@nowarn
+object EmitSecdedEncoderTestbench extends App {
+ val p = new Parameters
+ p.lsuDataBits = 128
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(ChiselGeneratorAnnotation(() => new SecdedEncoderTestbench(p.lsuDataBits, "SecdedEncoderTestbench128"))) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
+
+@nowarn
+object EmitSecdedEncoderTestbench32 extends App {
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(ChiselGeneratorAnnotation(() => new SecdedEncoderTestbench(32, "SecdedEncoderTestbench32"))) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
+
+@nowarn
+object EmitSecdedEncoderTestbench57 extends App {
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(ChiselGeneratorAnnotation(() => new SecdedEncoderTestbench(57, "SecdedEncoderTestbench57"))) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
\ No newline at end of file
diff --git a/hdl/chisel/src/bus/TlulIntegrity.scala b/hdl/chisel/src/bus/TlulIntegrity.scala
new file mode 100644
index 0000000..fbde102
--- /dev/null
+++ b/hdl/chisel/src/bus/TlulIntegrity.scala
@@ -0,0 +1,253 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bus
+
+import chisel3._
+import chisel3.util._
+
+
+/**
+ * Contains pure combinational functions for calculating SECDED ECC codes.
+ * These are based on the `prim_secded_inv` functions from OpenTitan to ensure
+ * compatibility.
+ */
+object Secded {
+ /**
+ * Calculates a 39-bit word (32-bit data, 7-bit ECC) using the same
+ * logic as OpenTitan's `prim_secded_inv_39_32_enc`.
+ */
+ def ecc39_32(data: UInt): UInt = {
+ val checksum = Wire(Vec(7, Bool()))
+
+ // ECC bit calculation based on Verilog implementation.
+ checksum(0) := (data & "h002606BD25".U).xorR
+ checksum(1) := (data & "h00DEBA8050".U).xorR
+ checksum(2) := (data & "h00413D89AA".U).xorR
+ checksum(3) := (data & "h0031234ED1".U).xorR
+ checksum(4) := (data & "h00C2C1323B".U).xorR
+ checksum(5) := (data & "h002DCC624C".U).xorR
+ checksum(6) := (data & "h0098505586".U).xorR
+
+ // Final inversion for `secded_inv` compatibility.
+ val data_o = Cat(checksum.asUInt, data)
+ data_o.asUInt ^ "h2A00000000".U
+ }
+
+ /**
+ * Calculates a 64-bit word (57-bit data, 7-bit ECC) using the same
+ * logic as OpenTitan's `prim_secded_inv_64_57_enc`.
+ */
+ def ecc64_57(data: UInt): UInt = {
+ val checksum = Wire(Vec(7, Bool()))
+
+ // ECC bit calculation based on Verilog implementation.
+ checksum(0) := (data & "h0103FFF800007FFF".U).xorR
+ checksum(1) := (data & "h017C1FF801FF801F".U).xorR
+ checksum(2) := (data & "h01BDE1F87E0781E1".U).xorR
+ checksum(3) := (data & "h01DEEE3B8E388E22".U).xorR
+ checksum(4) := (data & "h01EF76CDB2C93244".U).xorR
+ checksum(5) := (data & "h01F7BB56D5525488".U).xorR
+ checksum(6) := (data & "h01FBDDA769A46910".U).xorR
+
+ // Final inversion for `secded_inv` compatibility.
+ val data_o = Cat(checksum.asUInt, data)
+ data_o.asUInt ^ "h5400000000000000".U
+ }
+}
+
+/**
+ * A parameterized SECDED encoder.
+ *
+ * @param DATA_W The width of the data input. Supported values are 32, 57, 128.
+ */
+class SecdedEncoder(val DATA_W: Int) extends Module {
+ override val desiredName = s"SecdedEncoder_${DATA_W}"
+ val IO_W = DATA_W match {
+ case 32 => 39
+ case 57 => 64
+ case 128 => 128 + 7 // 128-bit data uses a 7-bit folded ECC.
+ }
+ val ECC_W = IO_W - DATA_W
+
+ val io = IO(new Bundle {
+ val data_i = Input(UInt(DATA_W.W))
+ val data_o = Output(UInt(IO_W.W))
+ val ecc_o = Output(UInt(ECC_W.W))
+ })
+
+ if (DATA_W == 32) {
+ io.data_o := Secded.ecc39_32(io.data_i)
+ } else if (DATA_W == 57) {
+ io.data_o := Secded.ecc64_57(io.data_i)
+ } else if (DATA_W == 128) {
+ // For 128-bit data, we use the "folding" scheme: the data is split into
+ // four 32-bit chunks, and their 7-bit ECC codes are XORed together.
+ val ecc0 = Secded.ecc39_32(io.data_i(31, 0))(38, 32)
+ val ecc1 = Secded.ecc39_32(io.data_i(63, 32))(38, 32)
+ val ecc2 = Secded.ecc39_32(io.data_i(95, 64))(38, 32)
+ val ecc3 = Secded.ecc39_32(io.data_i(127, 96))(38, 32)
+ io.data_o := Cat(ecc0 ^ ecc1 ^ ecc2 ^ ecc3, io.data_i)
+ } else {
+ // Ensure we don't try to synthesize for an unsupported width.
+ assert(false, "Unsupported DATA_W for SecdedEncoder")
+ io.data_o := 0.U // Default assignment to avoid compilation errors
+ }
+
+ // Convenient output for just the ECC bits.
+ io.ecc_o := io.data_o(IO_W - 1, DATA_W)
+}
+
+/**
+ * Generates TileLink integrity fields for the A-channel (Request).
+ */
+class RequestIntegrityGen(p: TLULParameters) extends Module {
+ override val desiredName = s"RequestIntegrityGen_${p.w}"
+ val io = IO(new Bundle {
+ val a_i = Input(new OpenTitanTileLink.A_Channel(p))
+ val a_o = Output(new OpenTitanTileLink.A_Channel(p))
+ })
+ // Ensure that we don't optimize out any parts of the bundle, at least
+ // via the Chisel toolchain.
+ dontTouch(io.a_i)
+ dontTouch(io.a_o)
+
+ // Passthrough for most fields.
+ io.a_o := io.a_i
+
+ // Recreate the tl_h2d_cmd_intg_t struct for command integrity.
+ val cmd_w = 57
+ val cmd_data = Wire(UInt(cmd_w.W))
+ cmd_data := Cat(
+ io.a_i.user.instr_type,
+ io.a_i.address,
+ io.a_i.opcode,
+ io.a_i.mask
+ )
+
+ val cmd_encoder = Module(new SecdedEncoder(cmd_w))
+ cmd_encoder.io.data_i := cmd_data
+ io.a_o.user.cmd_intg := cmd_encoder.io.ecc_o
+
+ // Data integrity calculation.
+ val data_encoder = Module(new SecdedEncoder(p.w * 8))
+ data_encoder.io.data_i := io.a_i.data
+ io.a_o.user.data_intg := data_encoder.io.ecc_o
+}
+
+/**
+ * Checks TileLink integrity fields for the A-channel (Request).
+ */
+class RequestIntegrityCheck(p: TLULParameters) extends Module {
+ override val desiredName = s"RequestIntegrityCheck_${p.w}"
+ val io = IO(new Bundle {
+ val a_i = Input(new OpenTitanTileLink.A_Channel(p))
+ val fault = Output(Bool())
+ })
+
+ // Recreate the tl_h2d_cmd_intg_t struct for command integrity.
+ val cmd_w = 57
+ val cmd_data = Wire(UInt(cmd_w.W))
+ cmd_data := Cat(
+ io.a_i.user.instr_type,
+ io.a_i.address,
+ io.a_i.opcode,
+ io.a_i.mask
+ )
+
+ val cmd_encoder = Module(new SecdedEncoder(cmd_w))
+ cmd_encoder.io.data_i := cmd_data
+ val expected_cmd_intg = cmd_encoder.io.ecc_o
+
+ // Data integrity calculation.
+ val data_encoder = Module(new SecdedEncoder(p.w * 8))
+ data_encoder.io.data_i := io.a_i.data
+ val expected_data_intg = data_encoder.io.ecc_o
+
+ // A fault is generated if the received integrity does not match the
+ // calculated integrity.
+ io.fault := (expected_cmd_intg =/= io.a_i.user.cmd_intg) ||
+ (expected_data_intg =/= io.a_i.user.data_intg)
+}
+
+/**
+ * Generates TileLink integrity fields for the D-channel (Response).
+ */
+class ResponseIntegrityGen(p: TLULParameters) extends Module {
+ override val desiredName = s"ResponseIntegrityGen_${p.w}"
+ val io = IO(new Bundle {
+ val d_i = Input(new OpenTitanTileLink.D_Channel(p))
+ val d_o = Output(new OpenTitanTileLink.D_Channel(p))
+ })
+ // Ensure that we don't optimize out any parts of the bundle, at least
+ // via the Chisel toolchain.
+ dontTouch(io.d_i)
+ dontTouch(io.d_o)
+
+ // Passthrough for most fields.
+ io.d_o := io.d_i
+
+ // Recreate the tl_d2h_rsp_intg_t struct for response integrity.
+ val rsp_w = 57
+ val rsp_data = Wire(UInt(rsp_w.W))
+ rsp_data := Cat(
+ io.d_i.opcode,
+ io.d_i.size,
+ io.d_i.error
+ )
+
+
+ val rsp_encoder = Module(new SecdedEncoder(rsp_w))
+ rsp_encoder.io.data_i := rsp_data
+ io.d_o.user.rsp_intg := rsp_encoder.io.ecc_o
+
+ // Data integrity calculation.
+ val data_encoder = Module(new SecdedEncoder(p.w * 8))
+ data_encoder.io.data_i := io.d_i.data
+ io.d_o.user.data_intg := data_encoder.io.ecc_o
+}
+
+/**
+ * Checks TileLink integrity fields for the D-channel (Response).
+ */
+class ResponseIntegrityCheck(p: TLULParameters) extends Module {
+ override val desiredName = s"ResponseIntegrityCheck_${p.w}"
+ val io = IO(new Bundle {
+ val d_i = Input(new OpenTitanTileLink.D_Channel(p))
+ val fault = Output(Bool())
+ })
+
+ // Recreate the tl_d2h_rsp_intg_t struct for response integrity.
+ val rsp_w = 57
+ val rsp_data = Wire(UInt(rsp_w.W))
+ rsp_data := Cat(
+ io.d_i.opcode,
+ io.d_i.size,
+ io.d_i.error
+ )
+
+ val rsp_encoder = Module(new SecdedEncoder(rsp_w))
+ rsp_encoder.io.data_i := rsp_data
+ val expected_rsp_intg = rsp_encoder.io.ecc_o
+
+ // Data integrity calculation.
+ val data_encoder = Module(new SecdedEncoder(p.w * 8))
+ data_encoder.io.data_i := io.d_i.data
+ val expected_data_intg = data_encoder.io.ecc_o
+
+ // A fault is generated if the received integrity does not match the
+ // calculated integrity.
+ io.fault := (expected_rsp_intg =/= io.d_i.user.rsp_intg) ||
+ (expected_data_intg =/= io.d_i.user.data_intg)
+}
\ No newline at end of file
diff --git a/hdl/chisel/src/bus/TlulIntegrityTestbench.scala b/hdl/chisel/src/bus/TlulIntegrityTestbench.scala
new file mode 100644
index 0000000..285e109
--- /dev/null
+++ b/hdl/chisel/src/bus/TlulIntegrityTestbench.scala
@@ -0,0 +1,85 @@
+// Copyright 2025 Google LLC
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package bus
+
+import chisel3._
+import chisel3.util._
+import kelvin.Parameters
+
+/**
+ * A testbench DUT that instantiates all TlulIntegrity modules so they can be
+ * tested with cocotb in a single simulation.
+ */
+class TlulIntegrityTestbench(p: TLULParameters) extends Module {
+ val io = IO(new Bundle {
+ // 1. RequestIntegrityGen instance
+ val req_gen_a_i = Flipped(Decoupled(new OpenTitanTileLink.A_Channel(p)))
+ val req_gen_a_o = Decoupled(new OpenTitanTileLink.A_Channel(p))
+
+ // 2. RequestIntegrityCheck instance
+ val req_check_a_i = Flipped(Decoupled(new OpenTitanTileLink.A_Channel(p)))
+ val req_check_fault = Output(Bool())
+
+ // 4. ResponseIntegrityGen instance
+ val rsp_gen_d_i = Flipped(Decoupled(new OpenTitanTileLink.D_Channel(p)))
+ val rsp_gen_d_o = Decoupled(new OpenTitanTileLink.D_Channel(p))
+
+ // 5. ResponseIntegrityCheck instance
+ val rsp_check_d_i = Flipped(Decoupled(new OpenTitanTileLink.D_Channel(p)))
+ val rsp_check_fault = Output(Bool())
+
+ })
+
+ // 1. RequestIntegrityGen instance
+ val req_gen = Module(new RequestIntegrityGen(p))
+ req_gen.io.a_i <> io.req_gen_a_i.bits
+ io.req_gen_a_o.bits := req_gen.io.a_o
+ io.req_gen_a_o.valid := io.req_gen_a_i.valid
+ io.req_gen_a_i.ready := io.req_gen_a_o.ready
+
+ // 2. RequestIntegrityCheck instance
+ val req_check = Module(new RequestIntegrityCheck(p))
+ req_check.io.a_i := io.req_check_a_i.bits
+ io.req_check_fault := req_check.io.fault
+ io.req_check_a_i.ready := true.B // Always ready to check
+
+ // 4. ResponseIntegrityGen instance
+ val rsp_gen = Module(new ResponseIntegrityGen(p))
+ rsp_gen.io.d_i := io.rsp_gen_d_i.bits
+ io.rsp_gen_d_o.bits := rsp_gen.io.d_o
+ io.rsp_gen_d_o.valid := io.rsp_gen_d_i.valid
+ io.rsp_gen_d_i.ready := io.rsp_gen_d_o.ready
+
+ // 5. ResponseIntegrityCheck instance
+ val rsp_check = Module(new ResponseIntegrityCheck(p))
+ rsp_check.io.d_i := io.rsp_check_d_i.bits
+ io.rsp_check_fault := rsp_check.io.fault
+ io.rsp_check_d_i.ready := true.B
+
+}
+
+import _root_.circt.stage.{ChiselStage,FirtoolOption}
+import chisel3.stage.ChiselGeneratorAnnotation
+import scala.annotation.nowarn
+
+@nowarn
+object EmitTlulIntegrityTestbench extends App {
+ val p = new Parameters
+ p.lsuDataBits = 128
+ (new ChiselStage).execute(
+ Array("--target", "systemverilog") ++ args,
+ Seq(ChiselGeneratorAnnotation(() => new TlulIntegrityTestbench(new bus.TLULParameters(p)))) ++ Seq(FirtoolOption("-enable-layers=Verification"))
+ )
+}
diff --git a/kelvin_test_utils/BUILD b/kelvin_test_utils/BUILD
index a358b45..94c7b6a 100644
--- a/kelvin_test_utils/BUILD
+++ b/kelvin_test_utils/BUILD
@@ -15,6 +15,22 @@
load("@kelvin_hw//third_party/python:requirements.bzl", "requirement")
py_library(
+ name = "TileLinkULInterface",
+ srcs = ["TileLinkULInterface.py"],
+ deps = [
+ requirement("cocotb"),
+ "//kelvin_test_utils:secded_golden",
+ ],
+ visibility = ["//visibility:public"],
+)
+
+py_library(
+ name = "secded_golden",
+ srcs = ["secded_golden.py"],
+ visibility = ["//visibility:public"],
+)
+
+py_library(
name = "core_mini_axi_sim_interface",
srcs = [
"core_mini_axi_interface.py",
diff --git a/kelvin_test_utils/TileLinkULInterface.py b/kelvin_test_utils/TileLinkULInterface.py
new file mode 100644
index 0000000..57c1cb5
--- /dev/null
+++ b/kelvin_test_utils/TileLinkULInterface.py
@@ -0,0 +1,273 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import cocotb
+from cocotb.clock import Clock
+from cocotb.queue import Queue
+from cocotb.triggers import FallingEdge, RisingEdge, with_timeout
+import cocotb.result
+import math
+
+from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg
+
+
+def create_a_channel_req(address,
+ data,
+ mask,
+ source=1,
+ size=None,
+ param=0,
+ width=32):
+ """Creates a standard TileLink-UL PutFullData request dictionary."""
+ num_bytes = width // 8
+ if size is None:
+ size = int(math.log2(num_bytes))
+
+ full_mask = (1 << num_bytes) - 1
+ opcode = 0 if mask == full_mask else 1 # PutFull vs PutPartial
+ txn = {
+ "opcode": opcode,
+ "param": param,
+ "size": size,
+ "source": source,
+ "address": address,
+ "mask": mask,
+ "data": data,
+ "user": {
+ "cmd_intg": 0,
+ "data_intg": 0,
+ "instr_type": 0,
+ "rsvd": 0
+ }
+ }
+ txn["user"]["cmd_intg"] = get_cmd_intg(txn, width=width)
+ txn["user"]["data_intg"] = get_data_intg(txn["data"], width=width)
+ return txn
+
+
+class TileLinkULInterface:
+ """A testbench interface for a TileLink-UL bus.
+
+ This class provides a high-level, transaction-based interface to a TileLink-UL
+ bus in the DUT. It uses cocotb queues and background coroutines ("agents")
+ to handle the low-level signal handshaking.
+
+ Args:
+ dut: The cocotb DUT object.
+ host_if_name (str, optional): The prefix for the host-side interface signals.
+ device_if_name (str, optional): The prefix for the device-side interface signals.
+ """
+
+ def __init__(self,
+ dut,
+ host_if_name=None,
+ device_if_name=None,
+ clock_name="clock",
+ reset_name="reset",
+ width=32):
+ self.dut = dut
+ self.clock = getattr(dut, clock_name)
+ self.reset = getattr(dut, reset_name)
+ self.width = width
+ self.name = host_if_name or device_if_name
+
+ if host_if_name is None and device_if_name is None:
+ raise ValueError(
+ "At least one of host_if_name or device_if_name must be provided."
+ )
+
+ self._agents = []
+
+ if host_if_name:
+ self.host_a_fifo = Queue()
+ self.host_d_fifo = Queue()
+ self._agents.append(
+ cocotb.start_soon(self._host_a_driver(host_if_name)))
+ self._agents.append(
+ cocotb.start_soon(self._host_d_monitor(host_if_name)))
+
+ if device_if_name:
+ self.device_a_fifo = Queue()
+ self.device_d_fifo = Queue()
+ self._device_a_ready = True # Default to being ready
+ self._agents.append(
+ cocotb.start_soon(self._device_a_monitor(device_if_name)))
+ self._agents.append(
+ cocotb.start_soon(self._device_d_driver(device_if_name)))
+
+ def device_a_set_ready(self, value):
+ """Set the ready signal for the device A channel monitor."""
+ self._device_a_ready = value
+
+ async def init(self):
+ """Starts the agents."""
+ # This method is currently a placeholder for starting agents.
+ # In this implementation, agents are started in the constructor.
+ # This can be extended if more complex initialization is needed.
+ pass
+
+ # --- Private Methods (Agents) ---
+
+ # slave_a{r|w}agent
+ async def _host_a_driver(self, prefix, timeout=4096):
+ """Drives the host A channel from the host_a_fifo."""
+ a_valid = getattr(self.dut, f"{prefix}_a_valid")
+ a_ready = getattr(self.dut, f"{prefix}_a_ready")
+
+ a_valid.value = 0
+ for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
+ getattr(self.dut, f"{prefix}_a_bits_{prop}").value = 0
+
+ while True:
+ while True:
+ await RisingEdge(self.clock)
+ a_valid.value = 0
+ if self.host_a_fifo.qsize():
+ break
+ txn = await self.host_a_fifo.get()
+ a_valid.value = 1
+ for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
+ getattr(self.dut, f"{prefix}_a_bits_{prop}").value = txn[prop]
+ for field, value in txn["user"].items():
+ getattr(self.dut,
+ f"{prefix}_a_bits_user_{field}").value = value
+ await FallingEdge(self.clock)
+ timeout_count = 0
+ while a_ready.value == 0:
+ await FallingEdge(self.clock)
+ timeout_count += 1
+ if timeout_count >= timeout:
+ assert False, "timeout waiting for a_ready"
+
+ # slave_bagent
+ async def _host_d_monitor(self, prefix):
+ """Monitors the host D channel and puts transactions into host_d_fifo."""
+ d_valid = getattr(self.dut, f"{prefix}_d_valid")
+ d_ready = getattr(self.dut, f"{prefix}_d_ready")
+
+ d_ready.value = 1
+ while True:
+ await RisingEdge(self.clock)
+ try:
+ if d_valid.value:
+ # Capture the transaction
+ txn = {'user': {}}
+ for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
+ txn[prop] = getattr(self.dut, f"{prefix}_d_bits_{prop}").value
+ user_fields = ["rsp_intg", "data_intg"]
+ for field in user_fields:
+ signal_name = f"{prefix}_d_bits_user_{field}"
+ if hasattr(self.dut, signal_name):
+ txn["user"][field] = getattr(self.dut, signal_name).value
+
+ await self.host_d_fifo.put(txn)
+ except Exception as e:
+ print('X seen in _host_d_monitor: ' + str(e) + ' ' + prefix)
+ # raise e
+
+ # master_aragent
+ async def _device_a_monitor(self, prefix):
+ """Monitors the device A channel and puts transactions into device_a_fifo."""
+ a_valid = getattr(self.dut, f"{prefix}_a_valid")
+ a_ready = getattr(self.dut, f"{prefix}_a_ready")
+
+ a_ready.value = 1
+ while True:
+ await RisingEdge(self.clock)
+ try:
+ if a_valid.value:
+ txn = {"user": {}}
+ for prop in ["opcode", "param", "size", "source", "address", "mask", "data"]:
+ txn[prop] = getattr(self.dut, f"{prefix}_a_bits_{prop}").value
+ user_fields = ["cmd_intg", "data_intg", "instr_type", "rsvd"]
+ for field in user_fields:
+ signal_name = f"{prefix}_a_bits_user_{field}"
+ if hasattr(self.dut, signal_name):
+ txn["user"][field] = getattr(self.dut,
+ signal_name).value
+ await self.device_a_fifo.put(txn)
+ except Exception as e:
+ print('X seen in _device_a_monitor: ' + str(e) + ' ' + prefix)
+
+ # master_bagent
+ async def _device_d_driver(self, prefix, timeout=4096):
+ """Drives the device D channel from the device_d_fifo."""
+ d_valid = getattr(self.dut, f"{prefix}_d_valid")
+ d_ready = getattr(self.dut, f"{prefix}_d_ready")
+
+ d_valid.value = 0
+ for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
+ getattr(self.dut, f"{prefix}_d_bits_{prop}").value = 0
+
+ while True:
+ while True:
+ await RisingEdge(self.clock)
+ d_valid.value = 0
+ if self.device_d_fifo.qsize():
+ break
+ txn = await self.device_d_fifo.get()
+ d_valid.value = 1
+ for prop in ["opcode", "param", "size", "source", "sink", "data", "error"]:
+ getattr(self.dut, f"{prefix}_d_bits_{prop}").value = txn[prop]
+ for field, value in txn["user"].items():
+ getattr(self.dut,
+ f"{prefix}_d_bits_user_{field}").value = value
+ await FallingEdge(self.clock)
+ timeout_count = 0
+ while d_ready.value == 0:
+ await FallingEdge(self.clock)
+ timeout_count += 1
+ if timeout_count >= timeout:
+ assert False, "timeout waiting for d_ready"
+
+ # --- Public API Methods ---
+
+ async def host_put(self, txn):
+ """Send a PutFullData or PutPartialData request from the host."""
+ await self.host_a_fifo.put(txn)
+
+ async def host_get_response(self):
+ """Get a response from the host D channel."""
+ return await self.host_d_fifo.get()
+
+ async def device_get_request(self):
+ """Get a request from the device A channel."""
+ return await self.device_a_fifo.get()
+
+ async def device_respond(self,
+ opcode,
+ param,
+ size,
+ source,
+ sink=0,
+ data=0,
+ error=0,
+ width=32):
+ """Send a response from the device."""
+ txn = {
+ "opcode": opcode,
+ "param": param,
+ "size": size,
+ "source": source,
+ "sink": sink,
+ "data": data,
+ "error": error,
+ "user": {
+ "rsp_intg": 0,
+ "data_intg": 0
+ }
+ }
+ txn["user"]["rsp_intg"] = get_rsp_intg(txn, width)
+ txn["user"]["data_intg"] = get_data_intg(txn["data"], width)
+ await self.device_d_fifo.put(txn)
diff --git a/kelvin_test_utils/secded_golden.py b/kelvin_test_utils/secded_golden.py
new file mode 100644
index 0000000..c4d2877
--- /dev/null
+++ b/kelvin_test_utils/secded_golden.py
@@ -0,0 +1,136 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Golden model for TileLink-UL integrity calculations."""
+import struct
+
+
+def _parity(n):
+ """Calculates the parity of an integer."""
+ p = 0
+ while n > 0:
+ p ^= (n & 1)
+ n >>= 1
+ return p
+
+
+def secded_inv_39_32_enc(data):
+ """Golden model for prim_secded_inv_39_32_enc. Returns 7-bit ECC."""
+ data_o = data # Start with the 32-bit data
+
+ # Calculate parity bits based on the initial 32-bit data
+ p0 = _parity(data_o & 0x002606BD25)
+ p1 = _parity(data_o & 0x00DEBA8050)
+ p2 = _parity(data_o & 0x00413D89AA)
+ p3 = _parity(data_o & 0x0031234ED1)
+ p4 = _parity(data_o & 0x00C2C1323B)
+ p5 = _parity(data_o & 0x002DCC624C)
+ p6 = _parity(data_o & 0x0098505586)
+
+ # Assemble the 39-bit word
+ data_o |= p0 << 32
+ data_o |= p1 << 33
+ data_o |= p2 << 34
+ data_o |= p3 << 35
+ data_o |= p4 << 36
+ data_o |= p5 << 37
+ data_o |= p6 << 38
+
+ # XOR the full 39-bit word with the inversion constant
+ inverted_data = data_o ^ 0x2A00000000
+
+ # Return the top 7 bits (the ECC)
+ return inverted_data >> 32
+
+
+def secded_inv_64_57_enc(data):
+ """Golden model for prim_secded_inv_64_57_enc. Returns 7-bit ECC."""
+ data_o = data # Start with the 57-bit data
+
+ # Calculate parity bits based on the initial 57-bit data
+ p0 = _parity(data_o & 0x0103FFF800007FFF)
+ p1 = _parity(data_o & 0x017C1FF801FF801F)
+ p2 = _parity(data_o & 0x01BDE1F87E0781E1)
+ p3 = _parity(data_o & 0x01DEEE3B8E388E22)
+ p4 = _parity(data_o & 0x01EF76CDB2C93244)
+ p5 = _parity(data_o & 0x01F7BB56D5525488)
+ p6 = _parity(data_o & 0x01FBDDA769A46910)
+
+ # Assemble the 64-bit word
+ data_o |= p0 << 57
+ data_o |= p1 << 58
+ data_o |= p2 << 59
+ data_o |= p3 << 60
+ data_o |= p4 << 61
+ data_o |= p5 << 62
+ data_o |= p6 << 63
+
+ # XOR the full 64-bit word with the inversion constant
+ inverted_data = data_o ^ 0x5400000000000000
+
+ # Return the top 7 bits (the ECC)
+ return inverted_data >> 57
+
+
+def get_cmd_intg(a_channel, width=128):
+ """Packs A-channel fields and returns the command integrity."""
+ # Packing order (MSB to LSB) from TlulIntegrity.scala
+ # Cat(instr_type, address, opcode, mask)
+ # instr_type: 4 bits
+ # address: 32 bits
+ # opcode: 3 bits
+ # mask: variable bits
+ mask_width = width // 8
+ packed = ((int(a_channel["user"]["instr_type"]) << (32 + 3 + mask_width)) |
+ (int(a_channel["address"]) <<
+ (3 + mask_width)) | (int(a_channel["opcode"]) << mask_width) |
+ (int(a_channel["mask"])))
+
+ return secded_inv_64_57_enc(packed)
+
+
+def get_data_intg(data, width=32):
+ """Returns the data integrity."""
+ dataint = int(data)
+ if width == 32:
+ return secded_inv_39_32_enc(dataint)
+ elif width == 128:
+ # Folded scheme
+ d0 = dataint & 0xFFFFFFFF
+ d1 = (dataint >> 32) & 0xFFFFFFFF
+ d2 = (dataint >> 64) & 0xFFFFFFFF
+ d3 = (dataint >> 96) & 0xFFFFFFFF
+ ecc0 = secded_inv_39_32_enc(d0)
+ ecc1 = secded_inv_39_32_enc(d1)
+ ecc2 = secded_inv_39_32_enc(d2)
+ ecc3 = secded_inv_39_32_enc(d3)
+ return ecc0 ^ ecc1 ^ ecc2 ^ ecc3
+ else:
+ raise ValueError(f"Unsupported data width: {width}")
+
+
+import math
+
+
+def get_rsp_intg(d_channel, width=128):
+ """Packs D-channel fields and returns the response integrity."""
+ # Packing order (MSB to LSB) from TlulIntegrity.scala
+ # Cat(opcode, size, error)
+ # opcode: 3 bits
+ # size: variable bits
+ # error: 1 bit
+ size_width = math.ceil(math.log2(width // 8))
+ packed = ((int(d_channel["opcode"]) << (size_width + 1)) |
+ (int(d_channel["size"]) << 1) | (int(d_channel["error"])))
+
+ return secded_inv_64_57_enc(packed)
diff --git a/tests/cocotb/tlul/BUILD b/tests/cocotb/tlul/BUILD
new file mode 100644
index 0000000..9b16b9c
--- /dev/null
+++ b/tests/cocotb/tlul/BUILD
@@ -0,0 +1,123 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load("@kelvin_hw//rules:coco_tb.bzl", "cocotb_test_suite", "verilator_cocotb_model")
+load("@kelvin_hw//third_party/python:requirements.bzl", "requirement")
+
+package(default_visibility = ["//visibility:public"])
+
+load(
+ "//tests/cocotb:build_defs.bzl",
+ "VCS_BUILD_ARGS",
+ "VCS_DEFINES",
+ "VCS_TEST_ARGS",
+)
+
+# BEGIN_TESTCASES_FOR_tlul_integrity_cocotb_test
+TLUL_INTEGRITY_TESTCASES = [
+ "test_request_integrity_gen",
+ "test_request_integrity_check",
+ "test_response_integrity_gen",
+ "test_response_integrity_check",
+]
+# END_TESTCASES_FOR_tlul_integrity_cocotb_test
+
+cocotb_test_suite(
+ name = "tlul_integrity_cocotb_test",
+ simulators = ["verilator", "vcs"],
+ testcases = TLUL_INTEGRITY_TESTCASES,
+ testcases_vname = "TLUL_INTEGRITY_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "TlulIntegrityTestbench",
+ "test_module": ["test_tlul_integrity.py"],
+ "deps": [
+ "//kelvin_test_utils:TileLinkULInterface",
+ "//kelvin_test_utils:secded_golden",
+ ],
+ "waves": True,
+ },
+ verilator_model = "//hdl/chisel/src/bus:tlul_integrity_testbench_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/bus:tlul_integrity_testbench_cc_library_verilog"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
+
+
+
+# BEGIN_TESTCASES_FOR_secded_encoder_cocotb_test
+SECDED_ENCODER_TESTCASES = [
+ "test_secded_encoder",
+]
+# END_TESTCASES_FOR_secded_encoder_cocotb_test
+
+cocotb_test_suite(
+ name = "secded_encoder_cocotb_test",
+ simulators = ["verilator", "vcs"],
+ testcases = SECDED_ENCODER_TESTCASES,
+ testcases_vname = "SECDED_ENCODER_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "SecdedEncoderTestbench128",
+ "test_module": ["test_secded_encoder.py"],
+ "deps": [
+ "//kelvin_test_utils:secded_golden",
+ ],
+ "waves": True,
+ },
+ verilator_model = "//hdl/chisel/src/bus:secded_encoder_testbench_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/bus:secded_encoder_testbench_cc_library_verilog"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
+
+cocotb_test_suite(
+ name = "secded_encoder_32_cocotb_test",
+ simulators = ["verilator", "vcs"],
+ testcases = SECDED_ENCODER_TESTCASES,
+ testcases_vname = "SECDED_ENCODER_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "SecdedEncoderTestbench32",
+ "test_module": ["test_secded_encoder.py"],
+ "deps": [
+ "//kelvin_test_utils:secded_golden",
+ ],
+ "waves": True,
+ },
+ verilator_model = "//hdl/chisel/src/bus:secded_encoder_testbench_32_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/bus:secded_encoder_testbench_32_cc_library_verilog"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
+
+cocotb_test_suite(
+ name = "secded_encoder_57_cocotb_test",
+ simulators = ["verilator", "vcs"],
+ testcases = SECDED_ENCODER_TESTCASES,
+ testcases_vname = "SECDED_ENCODER_TESTCASES",
+ tests_kwargs = {
+ "hdl_toplevel": "SecdedEncoderTestbench57",
+ "test_module": ["test_secded_encoder.py"],
+ "deps": [
+ "//kelvin_test_utils:secded_golden",
+ ],
+ "waves": True,
+ },
+ verilator_model = "//hdl/chisel/src/bus:secded_encoder_testbench_57_model",
+ vcs_verilog_sources = ["//hdl/chisel/src/bus:secded_encoder_testbench_57_cc_library_verilog"],
+ vcs_build_args = VCS_BUILD_ARGS,
+ vcs_test_args = VCS_TEST_ARGS,
+ vcs_defines = VCS_DEFINES,
+)
diff --git a/tests/cocotb/tlul/test_secded_encoder.py b/tests/cocotb/tlul/test_secded_encoder.py
new file mode 100644
index 0000000..175e59f
--- /dev/null
+++ b/tests/cocotb/tlul/test_secded_encoder.py
@@ -0,0 +1,69 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import cocotb
+from cocotb.clock import Clock
+from cocotb.triggers import RisingEdge, ClockCycles
+import random
+
+from kelvin_test_utils.secded_golden import get_data_intg, secded_inv_39_32_enc, secded_inv_64_57_enc
+
+
+async def setup_dut(dut):
+ """Common setup for all tests."""
+ clock = Clock(dut.clock, 10, unit="us")
+ cocotb.start_soon(clock.start())
+
+ dut.reset.value = 1
+ await ClockCycles(dut.clock, 2)
+ dut.reset.value = 0
+ await RisingEdge(dut.clock)
+
+
+@cocotb.test()
+async def test_secded_encoder(dut):
+ """Test that the SecdedEncoder module matches the golden model for random data."""
+ await setup_dut(dut)
+
+ # Determine the data width from the DUT.
+ data_width = len(dut.io_data_i)
+ num_iterations = 1000
+
+ for i in range(num_iterations):
+ # Generate a random integer of the correct width.
+ random_data = random.getrandbits(data_width)
+
+ # Drive the random data into the DUT.
+ dut.io_data_i.value = random_data
+ await RisingEdge(dut.clock)
+
+ # Get the ECC from the DUT.
+ dut_ecc = dut.io_ecc_o.value
+
+ # Calculate the expected ECC using the golden model.
+ if data_width == 32:
+ golden_ecc = secded_inv_39_32_enc(random_data)
+ elif data_width == 57:
+ golden_ecc = secded_inv_64_57_enc(random_data)
+ elif data_width == 128:
+ golden_ecc = get_data_intg(random_data, width=data_width)
+ else:
+ raise ValueError(f"Unsupported data width: {data_width}")
+
+ # Compare the DUT's output with the golden model.
+ assert dut_ecc == golden_ecc, f"Mismatch on iteration {i}: data={hex(random_data)}, dut_ecc={hex(dut_ecc)}, golden_ecc={hex(golden_ecc)}"
+
+ dut._log.info(
+ f"Successfully compared {num_iterations} random data values for data width {data_width}."
+ )
diff --git a/tests/cocotb/tlul/test_tlul_integrity.py b/tests/cocotb/tlul/test_tlul_integrity.py
new file mode 100644
index 0000000..13cce8e
--- /dev/null
+++ b/tests/cocotb/tlul/test_tlul_integrity.py
@@ -0,0 +1,280 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import cocotb
+from cocotb.clock import Clock
+from cocotb.triggers import RisingEdge, ClockCycles, with_timeout
+
+from kelvin_test_utils.TileLinkULInterface import TileLinkULInterface, create_a_channel_req
+from kelvin_test_utils.secded_golden import get_cmd_intg, get_data_intg, get_rsp_intg
+
+
+def create_d_channel_rsp(opcode,
+ data,
+ size,
+ source,
+ param=0,
+ sink=0,
+ error=False):
+ """Creates a standard TileLink-UL D-channel response dictionary."""
+ return {
+ "opcode": opcode,
+ "param": param,
+ "size": size,
+ "source": source,
+ "sink": sink,
+ "data": data,
+ "error": error,
+ "user": {
+ "rsp_intg": 0,
+ "data_intg": 0
+ }
+ }
+
+
+async def setup_dut(dut):
+ """Common setup for all tests."""
+ clock = Clock(dut.clock, 10, unit="us")
+ cocotb.start_soon(clock.start())
+
+ dut.reset.value = 1
+ await ClockCycles(dut.clock, 2)
+ dut.reset.value = 0
+ await RisingEdge(dut.clock)
+
+
+@cocotb.test()
+async def test_request_integrity_gen(dut):
+ """Test that the RequestIntegrityGen module generates correct integrity."""
+ await setup_dut(dut)
+
+ # Drive the input A-channel
+ req = create_a_channel_req(address=0x1000,
+ data=0x112233445566778899aabbccddeeff00,
+ mask=0xFFFF,
+ width=128)
+ dut.io_req_gen_a_i_valid.value = 1
+ dut.io_req_gen_a_i_bits_opcode.value = req["opcode"]
+ dut.io_req_gen_a_i_bits_param.value = req["param"]
+ dut.io_req_gen_a_i_bits_size.value = req["size"]
+ dut.io_req_gen_a_i_bits_source.value = req["source"]
+ dut.io_req_gen_a_i_bits_address.value = req["address"]
+ dut.io_req_gen_a_i_bits_mask.value = req["mask"]
+ dut.io_req_gen_a_i_bits_data.value = req["data"]
+ dut.io_req_gen_a_i_bits_user_cmd_intg.value = get_cmd_intg(req)
+ dut.io_req_gen_a_i_bits_user_data_intg.value = get_data_intg(req["data"],
+ width=128)
+ dut.io_req_gen_a_i_bits_user_rsvd.value = 0
+ dut.io_req_gen_a_i_bits_user_instr_type.value = 0
+
+ # Signal that we are ready to accept the output.
+ dut.io_req_gen_a_o_ready.value = 1
+
+ # TODO: Timeout loop
+ await RisingEdge(dut.clock)
+
+ # Check the output A-channel
+ assert dut.io_req_gen_a_o_valid.value
+ assert dut.io_req_gen_a_o_bits_opcode.value == req["opcode"]
+ assert dut.io_req_gen_a_o_bits_address.value == req["address"]
+ assert dut.io_req_gen_a_o_bits_data.value == req["data"]
+
+ assert dut.io_req_gen_a_o_bits_user_cmd_intg.value == get_cmd_intg(req)
+ assert dut.io_req_gen_a_o_bits_user_data_intg.value == get_data_intg(
+ req["data"], width=128)
+
+
+@cocotb.test()
+async def test_request_integrity_check(dut):
+ """Test that the RequestIntegrityCheck module correctly identifies faults."""
+ await setup_dut(dut)
+ req = create_a_channel_req(address=0x1000,
+ data=0x112233445566778899aabbccddeeff00,
+ mask=0xFFFF,
+ width=128)
+
+ # --- Transaction 1: Correct integrity ---
+ dut.io_req_check_a_i_valid.value = 1
+ dut.io_req_check_a_i_bits_opcode.value = req["opcode"]
+ dut.io_req_check_a_i_bits_param.value = req["param"]
+ dut.io_req_check_a_i_bits_size.value = req["size"]
+ dut.io_req_check_a_i_bits_source.value = req["source"]
+ dut.io_req_check_a_i_bits_address.value = req["address"]
+ dut.io_req_check_a_i_bits_mask.value = req["mask"]
+ dut.io_req_check_a_i_bits_data.value = req["data"]
+ dut.io_req_check_a_i_bits_user_cmd_intg.value = get_cmd_intg(req)
+ dut.io_req_check_a_i_bits_user_data_intg.value = get_data_intg(req["data"],
+ width=128)
+ dut.io_req_check_a_i_bits_user_rsvd.value = 0
+ dut.io_req_check_a_i_bits_user_instr_type.value = 0
+
+ for _ in range(10):
+ if dut.io_req_check_a_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_req_check_a_i_ready"
+
+ await RisingEdge(dut.clock)
+ dut.io_req_check_a_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert not dut.io_req_check_fault.value
+ await ClockCycles(dut.clock, 5) # Delay for clarity in logs/waves
+
+ # --- Transaction 2: Command integrity fault ---
+ dut.io_req_check_a_i_valid.value = 1
+ dut.io_req_check_a_i_bits_data.value = req["data"]
+ correct_cmd_intg = get_cmd_intg(req)
+ dut.io_req_check_a_i_bits_user_cmd_intg.value = ~correct_cmd_intg & 0x7F
+
+ for _ in range(10):
+ if dut.io_req_check_a_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_req_check_a_i_ready"
+
+ await RisingEdge(dut.clock)
+ dut.io_req_check_a_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert dut.io_req_check_fault.value
+ await ClockCycles(dut.clock, 5) # Delay for clarity in logs/waves
+
+ # --- Transaction 3: Data integrity fault ---
+ dut.io_req_check_a_i_valid.value = 1
+ dut.io_req_check_a_i_bits_data.value = req["data"]
+ dut.io_req_check_a_i_bits_user_cmd_intg.value = get_cmd_intg(
+ req) # Restore cmd_intg
+ correct_data_intg = get_data_intg(req["data"], width=128)
+ dut.io_req_check_a_i_bits_user_data_intg.value = ~correct_data_intg & 0x7F
+
+ for _ in range(10):
+ if dut.io_req_check_a_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_req_check_a_i_ready"
+
+ await RisingEdge(dut.clock)
+ dut.io_req_check_a_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert dut.io_req_check_fault.value
+ await RisingEdge(dut.clock)
+
+
+@cocotb.test()
+async def test_response_integrity_gen(dut):
+ """Test that the ResponseIntegrityGen module generates correct integrity."""
+ await setup_dut(dut)
+
+ # Drive the input D-channel
+ rsp = create_d_channel_rsp(opcode=1,
+ data=0x112233445566778899aabbccddeeff00,
+ size=4,
+ source=1)
+ dut.io_rsp_gen_d_i_valid.value = 1
+ dut.io_rsp_gen_d_i_bits_opcode.value = rsp["opcode"]
+ dut.io_rsp_gen_d_i_bits_param.value = rsp["param"]
+ dut.io_rsp_gen_d_i_bits_size.value = rsp["size"]
+ dut.io_rsp_gen_d_i_bits_source.value = rsp["source"]
+ dut.io_rsp_gen_d_i_bits_sink.value = rsp["sink"]
+ dut.io_rsp_gen_d_i_bits_data.value = rsp["data"]
+ dut.io_rsp_gen_d_i_bits_error.value = rsp["error"]
+
+ # Signal that we are ready to accept the output.
+ dut.io_rsp_gen_d_o_ready.value = 1
+
+ await RisingEdge(dut.clock)
+
+ # Check the output D-channel
+ assert dut.io_rsp_gen_d_o_valid.value
+ assert dut.io_rsp_gen_d_o_bits_opcode.value == rsp["opcode"]
+ assert dut.io_rsp_gen_d_o_bits_data.value == rsp["data"]
+
+ assert dut.io_rsp_gen_d_o_bits_user_rsp_intg.value == get_rsp_intg(rsp)
+ assert dut.io_rsp_gen_d_o_bits_user_data_intg.value == get_data_intg(
+ rsp["data"], width=128)
+
+
+@cocotb.test()
+async def test_response_integrity_check(dut):
+ """Test that the ResponseIntegrityCheck module correctly identifies faults."""
+ await setup_dut(dut)
+ rsp = create_d_channel_rsp(opcode=1,
+ data=0x112233445566778899aabbccddeeff00,
+ size=4,
+ source=1)
+
+ # --- Transaction 1: Correct integrity ---
+ dut.io_rsp_check_d_i_valid.value = 1
+ dut.io_rsp_check_d_i_bits_opcode.value = rsp["opcode"]
+ dut.io_rsp_check_d_i_bits_param.value = rsp["param"]
+ dut.io_rsp_check_d_i_bits_size.value = rsp["size"]
+ dut.io_rsp_check_d_i_bits_source.value = rsp["source"]
+ dut.io_rsp_check_d_i_bits_sink.value = rsp["sink"]
+ dut.io_rsp_check_d_i_bits_data.value = rsp["data"]
+ dut.io_rsp_check_d_i_bits_error.value = rsp["error"]
+ dut.io_rsp_check_d_i_bits_user_rsp_intg.value = get_rsp_intg(rsp)
+ dut.io_rsp_check_d_i_bits_user_data_intg.value = get_data_intg(rsp["data"],
+ width=128)
+
+ for _ in range(10):
+ if dut.io_rsp_check_d_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_rsp_check_d_i_ready"
+ await RisingEdge(dut.clock)
+ dut.io_rsp_check_d_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert not dut.io_rsp_check_fault.value
+ await ClockCycles(dut.clock, 5) # Delay for clarity in logs/waves
+
+ # --- Transaction 2: Response integrity fault ---
+ dut.io_rsp_check_d_i_valid.value = 1
+ dut.io_rsp_check_d_i_bits_data.value = rsp["data"] # Keep data same
+ correct_rsp_intg = get_rsp_intg(rsp)
+ dut.io_rsp_check_d_i_bits_user_rsp_intg.value = ~correct_rsp_intg & 0x7F
+
+ for _ in range(10):
+ if dut.io_rsp_check_d_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_rsp_check_d_i_ready"
+ await RisingEdge(dut.clock)
+ dut.io_rsp_check_d_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert dut.io_rsp_check_fault.value
+ await ClockCycles(dut.clock, 5) # Delay for clarity in logs/waves
+
+ # --- Transaction 3: Data integrity fault ---
+ dut.io_rsp_check_d_i_valid.value = 1
+ dut.io_rsp_check_d_i_bits_data.value = rsp["data"]
+ dut.io_rsp_check_d_i_bits_user_rsp_intg.value = get_rsp_intg(
+ rsp) # Restore rsp_intg
+ correct_data_intg = get_data_intg(rsp["data"], width=128)
+ dut.io_rsp_check_d_i_bits_user_data_intg.value = ~correct_data_intg & 0x7F
+
+ for _ in range(10):
+ if dut.io_rsp_check_d_i_ready.value:
+ break
+ await RisingEdge(dut.clock)
+ else:
+ assert False, "Timeout waiting for dut.io_rsp_check_d_i_ready"
+ await RisingEdge(dut.clock)
+ dut.io_rsp_check_d_i_valid.value = 0
+ await RisingEdge(dut.clock)
+ assert dut.io_rsp_check_fault.value
+ await RisingEdge(dut.clock)