blob: a4b9f26ea28c5d7dd4d2cb5bb3ef1d445f422e47 [file] [log] [blame]
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import cocotb
import glob
import numpy as np
import os
import tqdm
import random
from kelvin_test_utils.core_mini_axi_interface import AxiBurst, AxiResp,CoreMiniAxiInterface
from bazel_tools.tools.python.runfiles import runfiles
@cocotb.test()
async def core_mini_axi_basic_write_read_memory(dut):
"""Basic test to check if TCM memory can be written and read back."""
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
# Test reading/writing words
await core_mini_axi.write_word(0x100, 0x42)
await core_mini_axi.write_word(0x104, 0x43)
rdata = (await core_mini_axi.read(0x100, 16)).view(np.uint32)
assert (rdata[0:2] == np.array([0x42, 0x43])).all()
# Three write/read data burst
wdata = np.arange(48, dtype=np.uint8)
await core_mini_axi.write(0x0, wdata)
# Unaligned read, taking two bursts
rdata = await core_mini_axi.read(0x8, 16)
assert (np.arange(8, 24, dtype=np.uint8) == rdata).all()
# Unaligned write, taking two bursts
wdata = np.arange(20, dtype=np.uint8)
await core_mini_axi.write(0x204, wdata)
rdata = await core_mini_axi.read(0x200, 32)
assert (wdata == rdata[4:24]).all()
# Iterate over both TCMs with all valid AXI sizes
for size in range(13):
txn_bytes = 2 ** size
wdata = np.random.randint(0, 255, txn_bytes, dtype=np.uint8)
for i in tqdm.tqdm(range((8 * 1024) // txn_bytes)):
await core_mini_axi.write(i * txn_bytes, wdata)
for i in tqdm.tqdm(range((32 * 1024) // txn_bytes)):
await core_mini_axi.write(0x10000 + (i * txn_bytes), wdata)
for i in tqdm.tqdm(range((8 * 1024) // txn_bytes)):
rdata = await core_mini_axi.read(i * txn_bytes, txn_bytes)
assert(rdata == wdata).all()
for i in tqdm.tqdm(range((32 * 1024) // txn_bytes)):
rdata = await core_mini_axi.read(0x10000 + (i * txn_bytes), txn_bytes)
assert(rdata == wdata).all()
@cocotb.test()
async def core_mini_axi_run_wfi_in_all_slots(dut):
"""Tests the WFI instruction in each of the 4 issue slots."""
core_mini_axi = CoreMiniAxiInterface(dut)
cocotb.start_soon(core_mini_axi.clock.start())
await core_mini_axi.init()
r = runfiles.Create()
for slot in range(0,4):
with open(r.Rlocation(f"kelvin_hw/tests/cocotb/wfi_slot_{slot}.elf"), "rb") as f:
await core_mini_axi.reset()
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_wfi()
await core_mini_axi.raise_irq()
await core_mini_axi.wait_for_halted()
@cocotb.test()
async def core_mini_axi_slow_bready(dut):
"""Test that BVALID stays high until BREADY is presented"""
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
wdata = np.arange(16, dtype=np.uint8)
for i in tqdm.trange(100):
bready_delay = random.randint(0, 50)
await core_mini_axi.write(i*32, wdata, delay_bready=bready_delay)
for _ in tqdm.trange(100):
rdata = await core_mini_axi.read(i*32, 16)
assert (wdata == rdata).all()
@cocotb.test()
async def core_mini_axi_write_read_memory_stress_test(dut):
"""Stress test reading/writing from DTCM."""
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
with open(r.Rlocation("kelvin_hw/tests/cocotb/stress_test.elf"), "rb") as f:
halt = core_mini_axi.lookup_symbol(f, "halt")
dtcm_vec = core_mini_axi.lookup_symbol(f, "dtcm_vec")
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
# Range for a DTCM buffer we can read/write too.
DTCM_START = dtcm_vec
DTCM_SIZE = 0x2000
DTCM_END = DTCM_START + DTCM_SIZE
dtcm_model_buffer = await core_mini_axi.read(DTCM_START, DTCM_SIZE)
for i in tqdm.trange(1000):
start_addr = random.randint(DTCM_START, DTCM_END-2)
end_addr = random.randint(start_addr, DTCM_END-1)
transaction_length = end_addr - start_addr
if random.randint(0, 1) == 1:
wdata = np.random.randint(0, 256, transaction_length, dtype=np.uint8)
await core_mini_axi.write(start_addr, wdata)
dtcm_model_buffer[start_addr-DTCM_START: end_addr-DTCM_START] = wdata
else:
expected = dtcm_model_buffer[start_addr-DTCM_START: end_addr-DTCM_START]
rdata = await core_mini_axi.read(start_addr, transaction_length)
assert (expected == rdata).all()
await core_mini_axi.write_word(halt, 1)
try:
await core_mini_axi.wait_for_halted()
except:
await core_mini_axi.halt()
@cocotb.test()
async def core_mini_axi_master_write_alignment(dut):
"""Test data alignment during AXI master writes"""
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
with open(r.Rlocation("kelvin_hw/tests/cocotb/align_test.elf"), "rb") as f:
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted_semihost(f)
assert core_mini_axi.dut.io_fault.value == 0
@cocotb.test()
async def core_mini_axi_finish_txn_before_halt_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
with open(r.Rlocation("kelvin_hw/tests/cocotb/finish_txn_before_halt.elf"), "rb") as f:
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted()
assert (core_mini_axi.master_arfifo.qsize() + \
core_mini_axi.master_rfifo.qsize() + \
core_mini_axi.master_awfifo.qsize() + \
core_mini_axi.master_wfifo.qsize() + \
core_mini_axi.master_bfifo.qsize()) == 0
@cocotb.test()
async def core_mini_axi_riscv_tests(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
riscv_test_path = r.Rlocation("kelvin_hw/tests/cocotb/riscv-tests")
riscv_test_elfs = [os.path.join(riscv_test_path, f) for f in os.listdir(riscv_test_path) if f.endswith(".elf")]
for elf in tqdm.tqdm(riscv_test_elfs):
with open(elf, "rb") as f:
await core_mini_axi.reset()
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted()
assert core_mini_axi.dut.io_fault.value == 0
@cocotb.test()
async def core_mini_axi_riscv_dv(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
riscv_dv_path = r.Rlocation("kelvin_hw/tests/cocotb/riscv-dv")
riscv_dv_elfs = [os.path.join(riscv_dv_path, f) for f in os.listdir(riscv_dv_path) if f.endswith(".o")]
with tqdm.tqdm(riscv_dv_elfs) as t:
for elf in tqdm.tqdm(riscv_dv_elfs):
t.set_postfix({"binary": os.path.basename(elf)})
with open(elf, "rb") as f:
await core_mini_axi.reset()
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted_semihost(f)
@cocotb.test()
async def core_mini_axi_csr_test(dut):
"""Exercises the CoreAxiCSR module."""
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
for _ in tqdm.tqdm(range(10000)):
reset_csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8)
await core_mini_axi.write(0x30000, reset_csr_wdata)
reset_csr_rdata = await core_mini_axi.read_word(0x30000)
assert (reset_csr_wdata == reset_csr_rdata).all()
for _ in tqdm.tqdm(range(10000)):
pc_start_csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8)
await core_mini_axi.write(0x30004, pc_start_csr_wdata)
pc_start_csr_rdata = await core_mini_axi.read_word(0x30004)
assert (pc_start_csr_wdata == pc_start_csr_rdata).all()
# Neither of these are valid CSRs, but this will exercise the top half of the wdata field.
for _ in tqdm.tqdm(range(10000)):
csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8)
await core_mini_axi.write(0x30008, csr_wdata)
await core_mini_axi.write(0x3000c, csr_wdata)
status_reg_csr_rdata = await core_mini_axi.read_word(0x30008)
# Because we write a random value to the reset CSR, it's possible
# for this register to either be 0, 1, or 3.
assert (status_reg_csr_rdata.view(np.uint32) <= 3)
# Read valid CSRs
for i in range(8):
misc_csr_rdata = await core_mini_axi.read_word(0x30100 + (4 * i))
# Read invalid CSRs, expect error response
for i in range(3, 0x100 // 4):
misc_csr_rdata = await core_mini_axi.read_word(0x30000 + (4 * i), expected_resp=AxiResp.SLVERR)
for i in range(8, 0x2000 // 4):
misc_csr_rdata = await core_mini_axi.read_word(0x30100 + (4 * i), expected_resp=AxiResp.SLVERR)
@cocotb.test()
async def core_mini_axi_exceptions_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
exceptions_path = r.Rlocation("kelvin_hw/tests/cocotb/exceptions")
exceptions_elfs = [os.path.join(exceptions_path, f) for f in os.listdir(exceptions_path) if f.endswith(".elf")]
with tqdm.tqdm(exceptions_elfs) as t:
for elf in tqdm.tqdm(exceptions_elfs):
t.set_postfix({"binary": os.path.basename(elf)})
with open(elf, "rb") as f:
await core_mini_axi.reset()
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted()
assert core_mini_axi.dut.io_fault.value == 0
@cocotb.test()
async def core_mini_axi_kelvin_isa_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
kelvin_isa_path = r.Rlocation("kelvin_hw/tests/cocotb/kelvin_isa")
kelvin_isa_elfs = [os.path.join(kelvin_isa_path, f) for f in os.listdir(kelvin_isa_path) if f.endswith(".elf")]
for elf in tqdm.tqdm(kelvin_isa_elfs):
with open(elf, "rb") as f:
await core_mini_axi.reset()
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted()
assert core_mini_axi.dut.io_fault.value == 0
@cocotb.test()
async def core_mini_axi_rand_instr_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
cocotb.start_soon(core_mini_axi.clock.start())
for _ in tqdm.tqdm(range(1000)):
instr = np.random.randint(0, 2**32, 1, dtype=np.uint32)
mpause = np.array([0x8000073], dtype=np.uint32)
# For our instruction stream, set mpause as instr 0.
# If we have an exception, we should jump to 0 due to
# the default `mtvec` being 0, and halt.
wdata = np.concatenate([mpause, instr, mpause, mpause])
await core_mini_axi.reset()
await core_mini_axi.write(0, wdata)
await core_mini_axi.execute_from(4)
try:
await core_mini_axi.wait_for_halted(timeout_cycles=100)
except:
await core_mini_axi.halt()
@cocotb.test()
async def core_mini_axi_burst_types_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
# AxiBurst.FIXED
for _ in tqdm.trange(1000):
beats = random.randint(2, 255)
wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8)
await core_mini_axi.write(0, wdata, burst=AxiBurst.FIXED)
rdata = await core_mini_axi.read(0, 16, burst=AxiBurst.FIXED)
assert (wdata[((beats - 1) * 16):(beats * 16)] == rdata).all()
# AxiBurst.INCR
for _ in tqdm.trange(1000):
beats = random.randint(2, 255)
wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8)
await core_mini_axi.write(0, wdata, burst=AxiBurst.INCR)
rdata = await core_mini_axi.read(0, beats * 16, burst=AxiBurst.INCR)
assert (wdata == rdata).all()
# AxiBurst.WRAP
for _ in tqdm.trange(1000):
beats = random.randint(2, 255)
wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8)
write_offset = random.randint(1, 15)
read_offset = random.randint(1, 15)
await core_mini_axi.write(write_offset, wdata, burst=AxiBurst.WRAP)
rdata = await core_mini_axi.read(read_offset, 16, burst=AxiBurst.WRAP)
expected = np.concatenate([wdata[-write_offset:], wdata[-16:-write_offset]])
assert (expected == np.roll(rdata, read_offset)).all()
@cocotb.test()
async def core_mini_axi_float_csr_test(dut):
core_mini_axi = CoreMiniAxiInterface(dut)
await core_mini_axi.init()
await core_mini_axi.reset()
cocotb.start_soon(core_mini_axi.clock.start())
r = runfiles.Create()
with open(r.Rlocation("kelvin_hw/tests/cocotb/float_csr_interlock_test.elf"), "rb") as f:
entry_point = await core_mini_axi.load_elf(f)
await core_mini_axi.execute_from(entry_point)
await core_mini_axi.wait_for_halted()
assert core_mini_axi.dut.io_fault.value == 0