feat(dv): Add Python-based SPI loader for Verilator
This commit introduces a Python-based toolchain for loading and running software on the Verilator simulation of the Kelvin SoC. This toolchain interacts with the `spi_dpi_master` via its TCP socket interface.
The new toolchain in `utils/kelvin_soc_loader/` includes:
- **`spi_driver.py`**: A Python client library that connects to the DPI server and provides a high-level API for SPI operations (read/write/poll registers, bulk data transfer).
- **`loader.py`**: An application that uses the `spi_driver` to parse an ELF file, load its segments into the SoC's memory through the simulated SPI bridge, and then start the core's execution.
- **`run_simulation.py`**: An orchestration script that manages the entire simulation and loading flow. It starts the Verilator binary, waits for the DPI server to initialize, executes the loader script, and manages the simulation runtime.
This provides a complete, scriptable workflow for running software tests on the hardware design in a simulation environment, greatly improving the development and verification loop.
Change-Id: I950efdff040c49502cc74e4b7ad71ed5e3c9124c
diff --git a/utils/kelvin_soc_loader/BUILD b/utils/kelvin_soc_loader/BUILD
new file mode 100644
index 0000000..d009353
--- /dev/null
+++ b/utils/kelvin_soc_loader/BUILD
@@ -0,0 +1,48 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+load("@kelvin_hw//third_party/python:requirements.bzl", "requirement")
+
+package(default_visibility = ["//visibility:public"])
+
+# Library for interacting with the SPI DPI master via ctypes.
+py_library(
+ name = "spi_driver",
+ srcs = ["spi_driver.py"],
+)
+
+# Binary to load a program onto the Kelvin SoC and start execution.
+py_binary(
+ name = "loader",
+ srcs = ["loader.py"],
+ deps = [
+ ":spi_driver",
+ "//kelvin_test_utils:spi_constants",
+ requirement("pyelftools"),
+ ],
+)
+
+# Binary to run the full SoC simulation and monitor its execution.
+py_binary(
+ name = "run_simulation",
+ srcs = ["run_simulation.py"],
+ data = [
+ ":loader",
+ "//fpga:copy_chip_verilator_binary",
+ ],
+ tags = ["manual"],
+ deps = [
+ "@bazel_tools//tools/python/runfiles",
+ ],
+)
diff --git a/utils/kelvin_soc_loader/loader.py b/utils/kelvin_soc_loader/loader.py
new file mode 100644
index 0000000..b58d202
--- /dev/null
+++ b/utils/kelvin_soc_loader/loader.py
@@ -0,0 +1,175 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import logging
+import time
+
+from elftools.elf.elffile import ELFFile
+from spi_driver import SPIDriver
+from kelvin_test_utils.spi_constants import SpiRegAddress, SpiCommand, TlStatus
+
+def write_line_via_spi(driver: SPIDriver, address: int, data: int):
+ """Writes a 16-byte bus line to a given address via the SPI bridge."""
+ # 1. Use the packed write transaction for efficiency
+ driver.packed_write_transaction(address, 1, data)
+
+ # 2. Poll status register until the transaction is done
+ if not driver.poll_reg_for_value(SpiRegAddress.TL_WRITE_STATUS_REG, TlStatus.DONE):
+ raise RuntimeError(f"Timed out waiting for SPI write to 0x{address:08x} to complete")
+
+ # 3. Clear the status to return FSM to Idle
+ driver.write_reg(SpiRegAddress.TL_CMD_REG, TlStatus.IDLE)
+
+def write_lines_via_spi(driver: SPIDriver, address: int, data_bytes: bytes):
+ """Writes multiple 16-byte bus lines to a given address via the SPI bridge."""
+ if len(data_bytes) % 16 != 0:
+ raise ValueError("Data length must be a multiple of 16 bytes")
+ num_lines = len(data_bytes) // 16
+ if num_lines == 0:
+ return
+
+ data_int = int.from_bytes(data_bytes, byteorder='little')
+
+ # 1. Use the packed write transaction for efficiency
+ driver.packed_write_transaction(address, num_lines, data_int)
+
+ # 2. Poll status register until the transaction is done
+ if not driver.poll_reg_for_value(SpiRegAddress.TL_WRITE_STATUS_REG, TlStatus.DONE):
+ raise RuntimeError(f"Timed out waiting for SPI write to 0x{address:08x} to complete")
+
+ # 3. Clear the status to return FSM to Idle
+ driver.write_reg(SpiRegAddress.TL_CMD_REG, TlStatus.IDLE)
+
+
+def read_line_via_spi(driver: SPIDriver, address: int) -> int:
+ """Reads a single 128-bit line from memory via SPI."""
+ # 1. Configure the read
+ driver.write_reg(SpiRegAddress.TL_ADDR_REG_0, (address >> 0) & 0xFF)
+ driver.write_reg(SpiRegAddress.TL_ADDR_REG_1, (address >> 8) & 0xFF)
+ driver.write_reg(SpiRegAddress.TL_ADDR_REG_2, (address >> 16) & 0xFF)
+ driver.write_reg(SpiRegAddress.TL_ADDR_REG_3, (address >> 24) & 0xFF)
+ driver.write_reg_16b(SpiRegAddress.TL_LEN_REG_L, 0) # 1 beat
+
+ # 2. Issue the read command
+ driver.write_reg(SpiRegAddress.TL_CMD_REG, SpiCommand.CMD_READ_START)
+
+ # 3. Poll for completion
+ if not driver.poll_reg_for_value(SpiRegAddress.TL_STATUS_REG, TlStatus.DONE):
+ raise RuntimeError(f"Timed out waiting for TL read at address 0x{address:x} to complete.")
+
+ # 4. Check bytes available and read the data using the new method
+ bytes_available = driver.read_spi_domain_reg_16b(SpiRegAddress.BULK_READ_STATUS_REG_L)
+ if bytes_available != 16:
+ raise RuntimeError(f"Expected 16 bytes, but status reg reported {bytes_available}")
+ read_data_bytes = driver.bulk_read(bytes_available)
+ read_data = int.from_bytes(bytes(read_data_bytes), 'little')
+
+ # 5. Clear the command register
+ driver.write_reg(SpiRegAddress.TL_CMD_REG, SpiCommand.CMD_NULL)
+ return read_data
+
+def write_word_via_spi(driver: SPIDriver, address: int, data: int):
+ """Writes a 32-bit value by performing a read-modify-write on a 16-byte line."""
+ line_addr = (address // 16) * 16
+ offset = address % 16
+
+ # Read the current line
+ line_data = read_line_via_spi(driver, line_addr)
+
+ # Create a 16-byte mask for the 4 bytes we want to change
+ mask = 0xFFFFFFFF << (offset * 8)
+
+ # Clear the bits we want to change, then OR in the new data
+ updated_data = (line_data & ~mask) | (data << (offset * 8))
+
+ # Write the modified line back
+ write_line_via_spi(driver, line_addr, updated_data)
+
+def main():
+ parser = argparse.ArgumentParser(description="Load an ELF binary to the Kelvin SoC.")
+ parser.add_argument("binary", help="Path to the ELF binary to load.")
+ args = parser.parse_args()
+
+ driver = None
+ try:
+ driver = SPIDriver()
+
+ # Send a few idle clock cycles to flush any reset synchronizers
+ # in the DUT before starting the first real transaction.
+ logging.warning("LOADER: Sending initial idle clocks to flush reset...")
+ driver.idle_clocking(20)
+
+ logging.warning("LOADER: Waiting for SPI bridge to be ready...")
+ if not driver.poll_reg_for_value(SpiRegAddress.TL_STATUS_REG, 0):
+ raise RuntimeError("Timed out waiting for SPI bridge to become ready.")
+ logging.warning("LOADER: SPI bridge is ready.")
+
+ entry_point = 0
+ logging.warning(f"LOADER: Opening ELF file: {args.binary}")
+ with open(args.binary, 'rb') as f:
+ elffile = ELFFile(f)
+ entry_point = elffile.header.e_entry
+
+ for segment in elffile.iter_segments():
+ if segment['p_type'] != 'PT_LOAD':
+ continue
+
+ paddr = segment['p_paddr']
+ data = segment.data()
+ logging.warning(f"LOADER: Loading segment to address 0x{paddr:08x}, size {len(data)} bytes")
+
+ # Load data in pages of up to 16 lines (256 bytes)
+ original_len = len(data)
+ # Pad data to be a multiple of 16 bytes (a line)
+ if len(data) % 16 != 0:
+ data += b'\x00' * (16 - (len(data) % 16))
+
+ page_size = 4096
+ for i in range(0, len(data), page_size):
+ page_addr = paddr + i
+ page_data_bytes = data[i:i+page_size]
+
+ write_lines_via_spi(driver, page_addr, page_data_bytes)
+
+ bytes_written = min(i + len(page_data_bytes), original_len)
+ logging.warning(f" ... wrote {bytes_written}/{original_len} bytes")
+ logging.warning(f" ... wrote {original_len}/{original_len} bytes")
+
+ logging.warning("LOADER: Binary loaded successfully.")
+
+ # --- Execute Program ---
+ kelvin_pc_csr_addr = 0x30004
+ kelvin_reset_csr_addr = 0x30000
+
+ logging.warning(f"LOADER: Programming start PC to 0x{entry_point:08x}")
+ write_word_via_spi(driver, kelvin_pc_csr_addr, entry_point)
+
+ logging.warning("LOADER: Releasing clock gate...")
+ write_word_via_spi(driver, kelvin_reset_csr_addr, 1)
+
+ logging.warning("LOADER: Releasing reset...")
+ write_word_via_spi(driver, kelvin_reset_csr_addr, 0)
+
+ logging.warning("LOADER: Execution started.")
+
+ except Exception as e:
+ logging.error(f"An error occurred: {e}")
+ finally:
+ if driver:
+ logging.info("LOADER: Closing connection.")
+ driver.close()
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/utils/kelvin_soc_loader/run_simulation.py b/utils/kelvin_soc_loader/run_simulation.py
new file mode 100644
index 0000000..3bf83c9
--- /dev/null
+++ b/utils/kelvin_soc_loader/run_simulation.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python3
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import logging
+import os
+import signal
+import socket
+import subprocess
+import sys
+import threading
+import time
+
+
+from bazel_tools.tools.python.runfiles import runfiles
+
+
+def find_free_port():
+ """Finds a free TCP port on the system."""
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.bind(("", 0))
+ return s.getsockname()[1]
+
+
+def stream_reader(pipe, prefix, ready_event=None, ready_line=None):
+ """Reads and prints lines from a subprocess pipe."""
+ try:
+ for line in iter(pipe.readline, ""):
+ logging.warning(f"[{prefix}] {line.strip()}")
+ if ready_event and ready_line and ready_line in line:
+ ready_event.set()
+ finally:
+ pipe.close()
+
+
+def main():
+ """The main entry point for the script."""
+ parser = argparse.ArgumentParser(description="Run the Kelvin SoC simulation and load an ELF binary.")
+ parser.add_argument("--elf_file", required=True, help="Path to the ELF binary to load.")
+ parser.add_argument("--trace_file", help="Optional: Path to save a waveform trace file (.fst).")
+ parser.add_argument("--run_time", type=int, default=10, help="Optional: Time in seconds to run simulation after loading.")
+ args = parser.parse_args()
+
+ r = runfiles.Create()
+ # The genrule copies the binary to a predictable path.
+ sim_bin_path = r.Rlocation("kelvin_hw/fpga/Vchip_verilator")
+ if not sim_bin_path or not os.path.exists(sim_bin_path):
+ # As a fallback, let's try the longer path. This can happen if the
+ # genrule is not correctly configured.
+ long_path = "kelvin_hw/fpga/build_chip_verilator/com.google.kelvin_fpga_chip_verilator_0.1/sim-verilator/Vchip_verilator"
+ sim_bin_path = r.Rlocation(long_path)
+ if not sim_bin_path or not os.path.exists(sim_bin_path):
+ raise FileNotFoundError(f"Could not find simulator binary in runfiles at default or fallback paths.")
+
+ port = find_free_port()
+ logging.warning(f"RUNNER: Found free TCP port: {port}")
+
+ sim_env = os.environ.copy()
+ sim_env["SPI_DPI_PORT"] = str(port)
+
+ sim_proc = None
+ loader_proc = None
+ threads = []
+
+ try:
+ sim_cmd = [sim_bin_path]
+ if args.trace_file:
+ sim_cmd.append(f"--trace={args.trace_file}")
+ logging.warning(f"RUNNER: Tracing enabled, waveform will be saved to {args.trace_file}")
+
+ logging.warning(f"RUNNER: Starting simulation: {' '.join(sim_cmd)}")
+ sim_proc = subprocess.Popen(
+ sim_cmd,
+ env=sim_env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+
+ sim_ready_event = threading.Event()
+
+ # Start threads to monitor simulator output
+ threads.append(threading.Thread(target=stream_reader, args=(sim_proc.stdout, "SIM", sim_ready_event, f"DPI: Server listening on port {port}")))
+ threads.append(threading.Thread(target=stream_reader, args=(sim_proc.stderr, "SIM_ERR")))
+ for t in threads:
+ t.start()
+
+ logging.warning("RUNNER: Waiting for simulation to be ready...")
+ if not sim_ready_event.wait(timeout=60):
+ raise RuntimeError("Timeout waiting for simulator to become ready.")
+ logging.warning("RUNNER: Simulation is ready.")
+
+ loader_script_path = r.Rlocation("kelvin_hw/utils/kelvin_soc_loader/loader")
+ if not loader_script_path or not os.path.exists(loader_script_path):
+ raise FileNotFoundError("Could not find loader binary in runfiles.")
+
+ logging.warning(f"RUNNER: Starting ELF loader: {loader_script_path}")
+ loader_proc = subprocess.Popen(
+ [loader_script_path, args.elf_file],
+ env=sim_env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+
+ # Start threads for loader output
+ loader_stdout_thread = threading.Thread(target=stream_reader, args=(loader_proc.stdout, "LOADER"))
+ loader_stderr_thread = threading.Thread(target=stream_reader, args=(loader_proc.stderr, "LOADER_ERR"))
+ loader_stdout_thread.start()
+ loader_stderr_thread.start()
+ threads.extend([loader_stdout_thread, loader_stderr_thread])
+
+ # Wait for processes to complete
+ loader_proc.wait(timeout=300)
+ logging.warning(f"RUNNER: Loader finished. Running simulation for {args.run_time} seconds...")
+ time.sleep(args.run_time)
+
+ logging.warning("RUNNER: Sending SIGINT to simulator for graceful shutdown...")
+ sim_proc.send_signal(signal.SIGINT)
+ sim_proc.wait(timeout=10)
+ logging.warning("RUNNER: Simulation finished.")
+
+ except (subprocess.TimeoutExpired, RuntimeError) as e:
+ logging.error(f"RUNNER: An error occurred: {e}", file=sys.stderr)
+ if sim_proc:
+ sim_proc.kill()
+ if loader_proc:
+ loader_proc.kill()
+ sys.exit(1)
+ finally:
+ for t in threads:
+ t.join()
+ logging.warning("RUNNER: All processes terminated.")
+
+ if loader_proc and loader_proc.returncode != 0:
+ logging.error(f"RUNNER: Loader exited with non-zero status: {loader_proc.returncode}", file=sys.stderr)
+ sys.exit(loader_proc.returncode)
+
+ if sim_proc and sim_proc.returncode != 0 and sim_proc.returncode != -15: # -15 is SIGTERM
+ logging.error(f"RUNNER: Simulator exited with non-zero status: {sim_proc.returncode}", file=sys.stderr)
+ sys.exit(sim_proc.returncode)
+
+ logging.warning("RUNNER: Simulation completed successfully.")
+
+
+
+if __name__ == "__main__":
+ main()
diff --git a/utils/kelvin_soc_loader/spi_driver.py b/utils/kelvin_soc_loader/spi_driver.py
new file mode 100644
index 0000000..a19a365
--- /dev/null
+++ b/utils/kelvin_soc_loader/spi_driver.py
@@ -0,0 +1,101 @@
+# Copyright 2025 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import socket
+import struct
+
+class SPIDriver:
+ """A driver that mimics the cocotb SPIMaster API and communicates with a
+ DPI-based server in the simulation over a TCP socket."""
+
+ class CommandType:
+ WRITE_REG = 0
+ POLL_REG = 1
+ IDLE_CLOCKING = 2
+ PACKED_WRITE = 3
+ BULK_READ = 4
+ READ_SPI_DOMAIN_REG = 5
+ WRITE_REG_16B = 6
+ READ_SPI_DOMAIN_REG_16B = 7
+
+ # Format: < (little-endian), B (u8), I (u32), Q (u64), I (u32)
+ COMMAND_FORMAT = "<BIQI"
+ # Format: < (little-endian), Q (u64), B (u8)
+ RESPONSE_FORMAT = "<QB"
+
+ def __init__(self, port: int = 5555):
+ port_str = os.environ.get("SPI_DPI_PORT")
+ self.port = int(port_str) if port_str else port
+ print(f"SPI_DRIVER: Connecting to localhost:{self.port}...")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.connect(("localhost", self.port))
+ print("SPI_DRIVER: Connected.")
+
+ def close(self):
+ if self.sock:
+ print("SPI_DRIVER: Closing socket.")
+ self.sock.close()
+ self.sock = None
+
+ def _send_command(self, cmd_type, addr=0, data=0, count=0, payload=b''):
+ cmd_header = struct.pack(self.COMMAND_FORMAT, cmd_type, addr, data, count)
+ self.sock.sendall(cmd_header)
+
+ if payload:
+ self.sock.sendall(payload)
+
+ response_data = self.sock.recv(struct.calcsize(self.RESPONSE_FORMAT))
+ if not response_data:
+ raise ConnectionAbortedError("Socket connection broken.")
+
+ unpacked = struct.unpack(self.RESPONSE_FORMAT, response_data)
+ if not unpacked[1]: # success flag
+ raise RuntimeError(f"SPI command {cmd_type} failed in simulation.")
+ return unpacked[0] # data
+
+ def write_reg(self, reg_addr, data):
+ self._send_command(self.CommandType.WRITE_REG, addr=reg_addr, data=data)
+
+ def poll_reg_for_value(self, reg_addr, expected_value, max_polls=20):
+ """Sends a single command to the DPI server to perform a polling loop."""
+ response = self._send_command(self.CommandType.POLL_REG, addr=reg_addr, data=expected_value, count=max_polls)
+ return response == 1
+
+ def idle_clocking(self, cycles):
+ """Sends a command to toggle the SPI clock for a number of cycles."""
+ self._send_command(self.CommandType.IDLE_CLOCKING, count=cycles)
+
+ def packed_write_transaction(self, target_addr, num_beats, data):
+ payload = data.to_bytes(num_beats * 16, 'little')
+ self._send_command(self.CommandType.PACKED_WRITE, addr=target_addr, count=num_beats, payload=payload)
+
+ def read_spi_domain_reg(self, reg_addr):
+ """Sends a command to read a register in the SPI clock domain."""
+ return self._send_command(self.CommandType.READ_SPI_DOMAIN_REG, addr=reg_addr)
+
+ def write_reg_16b(self, reg_addr, data):
+ """Sends a command to write a 16-bit value to a register pair."""
+ self._send_command(self.CommandType.WRITE_REG_16B, addr=reg_addr, data=data)
+
+ def read_spi_domain_reg_16b(self, reg_addr):
+ """Sends a command to read a 16-bit register pair in the SPI clock domain."""
+ return self._send_command(self.CommandType.READ_SPI_DOMAIN_REG_16B, addr=reg_addr)
+
+ def bulk_read(self, num_bytes):
+ """Sends the new bulk read command and receives the data payload."""
+ self._send_command(self.CommandType.BULK_READ, count=num_bytes)
+ # After the command is acknowledged, the server sends the raw data payload.
+ read_payload = self.sock.recv(num_bytes)
+ return list(read_payload)