| # Copyright 2025 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import cocotb |
| import glob |
| import numpy as np |
| import os |
| import tqdm |
| import random |
| |
| from kelvin_test_utils.core_mini_axi_interface import AxiBurst, AxiResp,CoreMiniAxiInterface |
| from bazel_tools.tools.python.runfiles import runfiles |
| |
| |
| @cocotb.test() |
| async def core_mini_axi_basic_write_read_memory(dut): |
| """Basic test to check if TCM memory can be written and read back.""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| |
| # Test reading/writing words |
| await core_mini_axi.write_word(0x100, 0x42) |
| await core_mini_axi.write_word(0x104, 0x43) |
| rdata = (await core_mini_axi.read(0x100, 16)).view(np.uint32) |
| assert (rdata[0:2] == np.array([0x42, 0x43])).all() |
| |
| # Three write/read data burst |
| wdata = np.arange(48, dtype=np.uint8) |
| await core_mini_axi.write(0x0, wdata) |
| |
| # Unaligned read, taking two bursts |
| rdata = await core_mini_axi.read(0x8, 16) |
| assert (np.arange(8, 24, dtype=np.uint8) == rdata).all() |
| |
| # Unaligned write, taking two bursts |
| wdata = np.arange(20, dtype=np.uint8) |
| await core_mini_axi.write(0x204, wdata) |
| rdata = await core_mini_axi.read(0x200, 32) |
| assert (wdata == rdata[4:24]).all() |
| |
| # Iterate over both TCMs with all valid AXI sizes |
| for size in range(13): |
| txn_bytes = 2 ** size |
| wdata = np.random.randint(0, 255, txn_bytes, dtype=np.uint8) |
| for i in tqdm.tqdm(range((8 * 1024) // txn_bytes)): |
| await core_mini_axi.write(i * txn_bytes, wdata) |
| for i in tqdm.tqdm(range((32 * 1024) // txn_bytes)): |
| await core_mini_axi.write(0x10000 + (i * txn_bytes), wdata) |
| |
| for i in tqdm.tqdm(range((8 * 1024) // txn_bytes)): |
| rdata = await core_mini_axi.read(i * txn_bytes, txn_bytes) |
| assert(rdata == wdata).all() |
| for i in tqdm.tqdm(range((32 * 1024) // txn_bytes)): |
| rdata = await core_mini_axi.read(0x10000 + (i * txn_bytes), txn_bytes) |
| assert(rdata == wdata).all() |
| |
| @cocotb.test() |
| async def core_mini_axi_run_wfi_in_all_slots(dut): |
| """Tests the WFI instruction in each of the 4 issue slots.""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| await core_mini_axi.init() |
| r = runfiles.Create() |
| |
| for slot in range(0,4): |
| with open(r.Rlocation(f"kelvin_hw/tests/cocotb/wfi_slot_{slot}.elf"), "rb") as f: |
| await core_mini_axi.reset() |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| |
| await core_mini_axi.wait_for_wfi() |
| await core_mini_axi.raise_irq() |
| await core_mini_axi.wait_for_halted() |
| |
| @cocotb.test() |
| async def core_mini_axi_slow_bready(dut): |
| """Test that BVALID stays high until BREADY is presented""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| |
| wdata = np.arange(16, dtype=np.uint8) |
| for i in tqdm.trange(100): |
| bready_delay = random.randint(0, 50) |
| await core_mini_axi.write(i*32, wdata, delay_bready=bready_delay) |
| |
| for _ in tqdm.trange(100): |
| rdata = await core_mini_axi.read(i*32, 16) |
| assert (wdata == rdata).all() |
| |
| @cocotb.test() |
| async def core_mini_axi_write_read_memory_stress_test(dut): |
| """Stress test reading/writing from DTCM.""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| with open(r.Rlocation("kelvin_hw/tests/cocotb/stress_test.elf"), "rb") as f: |
| halt = core_mini_axi.lookup_symbol(f, "halt") |
| dtcm_vec = core_mini_axi.lookup_symbol(f, "dtcm_vec") |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| |
| # Range for a DTCM buffer we can read/write too. |
| DTCM_START = dtcm_vec |
| DTCM_SIZE = 0x2000 |
| DTCM_END = DTCM_START + DTCM_SIZE |
| dtcm_model_buffer = await core_mini_axi.read(DTCM_START, DTCM_SIZE) |
| |
| for i in tqdm.trange(1000): |
| start_addr = random.randint(DTCM_START, DTCM_END-2) |
| end_addr = random.randint(start_addr, DTCM_END-1) |
| transaction_length = end_addr - start_addr |
| |
| if random.randint(0, 1) == 1: |
| wdata = np.random.randint(0, 256, transaction_length, dtype=np.uint8) |
| await core_mini_axi.write(start_addr, wdata) |
| dtcm_model_buffer[start_addr-DTCM_START: end_addr-DTCM_START] = wdata |
| else: |
| expected = dtcm_model_buffer[start_addr-DTCM_START: end_addr-DTCM_START] |
| rdata = await core_mini_axi.read(start_addr, transaction_length) |
| assert (expected == rdata).all() |
| |
| await core_mini_axi.write_word(halt, 1) |
| try: |
| await core_mini_axi.wait_for_halted() |
| except: |
| await core_mini_axi.halt() |
| |
| @cocotb.test() |
| async def core_mini_axi_master_write_alignment(dut): |
| """Test data alignment during AXI master writes""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| with open(r.Rlocation("kelvin_hw/tests/cocotb/align_test.elf"), "rb") as f: |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| |
| await core_mini_axi.wait_for_halted_semihost(f) |
| assert core_mini_axi.dut.io_fault.value == 0 |
| |
| @cocotb.test() |
| async def core_mini_axi_finish_txn_before_halt_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| with open(r.Rlocation("kelvin_hw/tests/cocotb/finish_txn_before_halt.elf"), "rb") as f: |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| await core_mini_axi.wait_for_halted() |
| |
| assert (core_mini_axi.master_arfifo.qsize() + \ |
| core_mini_axi.master_rfifo.qsize() + \ |
| core_mini_axi.master_awfifo.qsize() + \ |
| core_mini_axi.master_wfifo.qsize() + \ |
| core_mini_axi.master_bfifo.qsize()) == 0 |
| |
| @cocotb.test() |
| async def core_mini_axi_riscv_tests(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| riscv_test_path = r.Rlocation("kelvin_hw/tests/cocotb/riscv-tests") |
| riscv_test_elfs = [os.path.join(riscv_test_path, f) for f in os.listdir(riscv_test_path) if f.endswith(".elf")] |
| for elf in tqdm.tqdm(riscv_test_elfs): |
| with open(elf, "rb") as f: |
| await core_mini_axi.reset() |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| await core_mini_axi.wait_for_halted() |
| assert core_mini_axi.dut.io_fault.value == 0 |
| |
| @cocotb.test() |
| async def core_mini_axi_riscv_dv(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| riscv_dv_path = r.Rlocation("kelvin_hw/tests/cocotb/riscv-dv") |
| riscv_dv_elfs = [os.path.join(riscv_dv_path, f) for f in os.listdir(riscv_dv_path) if f.endswith(".o")] |
| with tqdm.tqdm(riscv_dv_elfs) as t: |
| for elf in tqdm.tqdm(riscv_dv_elfs): |
| t.set_postfix({"binary": os.path.basename(elf)}) |
| with open(elf, "rb") as f: |
| await core_mini_axi.reset() |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| await core_mini_axi.wait_for_halted_semihost(f) |
| |
| @cocotb.test() |
| async def core_mini_axi_csr_test(dut): |
| """Exercises the CoreAxiCSR module.""" |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| |
| for _ in tqdm.tqdm(range(10000)): |
| reset_csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8) |
| await core_mini_axi.write(0x30000, reset_csr_wdata) |
| reset_csr_rdata = await core_mini_axi.read_word(0x30000) |
| assert (reset_csr_wdata == reset_csr_rdata).all() |
| |
| for _ in tqdm.tqdm(range(10000)): |
| pc_start_csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8) |
| await core_mini_axi.write(0x30004, pc_start_csr_wdata) |
| pc_start_csr_rdata = await core_mini_axi.read_word(0x30004) |
| assert (pc_start_csr_wdata == pc_start_csr_rdata).all() |
| |
| # Neither of these are valid CSRs, but this will exercise the top half of the wdata field. |
| for _ in tqdm.tqdm(range(10000)): |
| csr_wdata = np.random.randint(0, 255, 4, dtype=np.uint8) |
| await core_mini_axi.write(0x30008, csr_wdata) |
| await core_mini_axi.write(0x3000c, csr_wdata) |
| |
| status_reg_csr_rdata = await core_mini_axi.read_word(0x30008) |
| # Because we write a random value to the reset CSR, it's possible |
| # for this register to either be 0, 1, or 3. |
| assert (status_reg_csr_rdata.view(np.uint32) <= 3) |
| |
| # Read valid CSRs |
| for i in range(8): |
| misc_csr_rdata = await core_mini_axi.read_word(0x30100 + (4 * i)) |
| # Read invalid CSRs, expect error response |
| for i in range(3, 0x100 // 4): |
| misc_csr_rdata = await core_mini_axi.read_word(0x30000 + (4 * i), expected_resp=AxiResp.SLVERR) |
| for i in range(8, 0x2000 // 4): |
| misc_csr_rdata = await core_mini_axi.read_word(0x30100 + (4 * i), expected_resp=AxiResp.SLVERR) |
| |
| @cocotb.test() |
| async def core_mini_axi_exceptions_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| exceptions_path = r.Rlocation("kelvin_hw/tests/cocotb/exceptions") |
| exceptions_elfs = [os.path.join(exceptions_path, f) for f in os.listdir(exceptions_path) if f.endswith(".elf")] |
| with tqdm.tqdm(exceptions_elfs) as t: |
| for elf in tqdm.tqdm(exceptions_elfs): |
| t.set_postfix({"binary": os.path.basename(elf)}) |
| with open(elf, "rb") as f: |
| await core_mini_axi.reset() |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| await core_mini_axi.wait_for_halted() |
| assert core_mini_axi.dut.io_fault.value == 0 |
| |
| @cocotb.test() |
| async def core_mini_axi_kelvin_isa_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| kelvin_isa_path = r.Rlocation("kelvin_hw/tests/cocotb/kelvin_isa") |
| kelvin_isa_elfs = [os.path.join(kelvin_isa_path, f) for f in os.listdir(kelvin_isa_path) if f.endswith(".elf")] |
| for elf in tqdm.tqdm(kelvin_isa_elfs): |
| with open(elf, "rb") as f: |
| await core_mini_axi.reset() |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| await core_mini_axi.wait_for_halted() |
| assert core_mini_axi.dut.io_fault.value == 0 |
| |
| @cocotb.test() |
| async def core_mini_axi_rand_instr_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| |
| |
| for _ in tqdm.tqdm(range(1000)): |
| instr = np.random.randint(0, 2**32, 1, dtype=np.uint32) |
| mpause = np.array([0x8000073], dtype=np.uint32) |
| # For our instruction stream, set mpause as instr 0. |
| # If we have an exception, we should jump to 0 due to |
| # the default `mtvec` being 0, and halt. |
| wdata = np.concatenate([mpause, instr, mpause, mpause]) |
| await core_mini_axi.reset() |
| await core_mini_axi.write(0, wdata) |
| await core_mini_axi.execute_from(4) |
| try: |
| await core_mini_axi.wait_for_halted(timeout_cycles=100) |
| except: |
| await core_mini_axi.halt() |
| |
| @cocotb.test() |
| async def core_mini_axi_burst_types_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| |
| # AxiBurst.FIXED |
| for _ in tqdm.trange(1000): |
| beats = random.randint(2, 255) |
| wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8) |
| await core_mini_axi.write(0, wdata, burst=AxiBurst.FIXED) |
| rdata = await core_mini_axi.read(0, 16, burst=AxiBurst.FIXED) |
| assert (wdata[((beats - 1) * 16):(beats * 16)] == rdata).all() |
| |
| # AxiBurst.INCR |
| for _ in tqdm.trange(1000): |
| beats = random.randint(2, 255) |
| wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8) |
| await core_mini_axi.write(0, wdata, burst=AxiBurst.INCR) |
| rdata = await core_mini_axi.read(0, beats * 16, burst=AxiBurst.INCR) |
| assert (wdata == rdata).all() |
| |
| # AxiBurst.WRAP |
| for _ in tqdm.trange(1000): |
| beats = random.randint(2, 255) |
| wdata = np.random.randint(0, 255, 16 * beats, dtype=np.uint8) |
| write_offset = random.randint(1, 15) |
| read_offset = random.randint(1, 15) |
| await core_mini_axi.write(write_offset, wdata, burst=AxiBurst.WRAP) |
| rdata = await core_mini_axi.read(read_offset, 16, burst=AxiBurst.WRAP) |
| expected = np.concatenate([wdata[-write_offset:], wdata[-16:-write_offset]]) |
| assert (expected == np.roll(rdata, read_offset)).all() |
| |
| @cocotb.test() |
| async def core_mini_axi_float_csr_test(dut): |
| core_mini_axi = CoreMiniAxiInterface(dut) |
| await core_mini_axi.init() |
| await core_mini_axi.reset() |
| cocotb.start_soon(core_mini_axi.clock.start()) |
| r = runfiles.Create() |
| |
| with open(r.Rlocation("kelvin_hw/tests/cocotb/float_csr_interlock_test.elf"), "rb") as f: |
| entry_point = await core_mini_axi.load_elf(f) |
| await core_mini_axi.execute_from(entry_point) |
| |
| await core_mini_axi.wait_for_halted() |
| assert core_mini_axi.dut.io_fault.value == 0 |