blob: 956d21c27227a17d3a7cea5a203ecf9d166450ed [file] [log] [blame]
#! /usr/bin/env python3
"""Trace based model.
Models the microarchitectural behavior of a processor
based on a trace obtained from a functional simulator.
"""
import json
import logging
from typing import Any, Dict, Sequence
import sys
import jsonschema
import yaml
from cpu import CPU
from functional_trace import FunctionalTrace
from memory_system import MemorySystem
from scalar_pipe import ScalarPipe
import scoreboard
import tbm_options
import utilities
from vector_pipe import VectorPipe
logger = logging.getLogger("tbm")
def schema_validator() -> jsonschema.protocols.Validator:
# TODO(b/261619078): use importlib instead of relative path
schema_file_name = "config/uarch.schema.json"
with open(schema_file_name, "r", encoding="ascii") as schema_file:
uarch_schema = json.load(schema_file)
# Check that the schema is valid
vcls = jsonschema.validators.validator_for(uarch_schema)
try:
vcls.check_schema(uarch_schema)
except jsonschema.exceptions.SchemaError as e:
logger.error("in '%s':\n%s", schema_file_name, e.message)
sys.exit(1)
return vcls(uarch_schema)
def validate_uarch(validator: jsonschema.protocols.Validator,
uarch: Dict[str, Any]):
errors = sorted(validator.iter_errors(uarch), key=lambda e: e.path)
if errors:
errs = []
for err in errors:
if err.path:
errs.append("/".join(str(p) for p in err.path) + ": "
+ err.message)
else:
errs.append(err.message)
logger.error("Found %d errors in the microarchitecture"
" configuration:\n%s",
len(errors), "\n".join(errs))
sys.exit(1)
def load_config_file(name: str) -> Dict[str, Any]:
with open(name, "r", encoding="ascii") as file:
if name.endswith(".json"):
return json.load(file)
if not name.endswith(".yaml"):
logger.warning("The file '%s' has an unrecognized suffix (expected"
" .json or .yaml). Trying to load it as YAML.",
name)
return yaml.safe_load(file)
def load_uarch() -> Dict[str, Any]:
"""Read micro-architecture description."""
# Read in the micro-architecture description.
uarch_desc = load_config_file(tbm_options.args.uarch)
# Read in the micro-architecture description schema.
validator = schema_validator()
# Check that the original (un-patched) uarch is valid.
validate_uarch(validator, uarch_desc)
# Read additional modifications from files.
for fl in tbm_options.args.extend:
logger.info("Applying modifications from file '%s'", fl)
data = load_config_file(fl)
merge_config(uarch_desc, data)
# Apply command line modifications.
for cl_set in tbm_options.args.set:
logger.info("Applying setting '%s'", cl_set)
path, value = cl_set.split("=")
path = path.split(".")
apply_setting(uarch_desc, path, json.loads(value))
# Check that the patched uarch is valid
if tbm_options.args.extend or tbm_options.args.set:
validate_uarch(validator, uarch_desc)
remove_comments(uarch_desc)
if not tbm_options.args.report_dont_include_cfg:
if tbm_options.args.report:
# Save to file
with open(tbm_options.args.report, "w", encoding="ascii") as out:
print("Configuration:", file=out)
print(json.dumps(uarch_desc, indent=2), file=out)
print(file=out)
else:
# Or print to stdout
print("Configuration:")
print(json.dumps(uarch_desc, indent=2))
print()
return uarch_desc
def apply_setting(uarch: Dict[str, Any], path: Sequence[str],
value: Any) -> None:
"""Modify an element of micro-architectural description.
Args:
uarch: uArch configuration to be modified
path: path through tree of dictionaries
value: value to be set
"""
# We expect path to be non-empty
assert path
for idx, seg in enumerate(path):
if seg not in uarch:
logger.error("attempt to override non-existent element: %s",
".".join(path[:idx+1]))
sys.exit(1)
# Traverse down `path` to update uarch until the last element.
if idx < len(path) - 1:
uarch = uarch[seg]
else:
# Last element
logger.info("Changing '%s' to '%s'", seg, value)
uarch[seg] = value
def merge_config(uarch: Dict[str, Any],
modification: Dict[str, Any]) -> None:
"""Merge modification tree into uarch description.
Modification is performed by recursing down to leaves
replacing old entries with entries from modification.
Args:
uarch: uArch configuration to be modified
modification: configuration to be merged into uarch
"""
for key, val in modification.items():
if (key in uarch and isinstance(val, dict) and not val.pop("replace",
False)):
merge_config(uarch[key], val)
else:
logger.info(" Replacing '%s' with '%s'", key, val)
uarch[key] = val
def remove_comments(desc: Dict[str, Any]) -> None:
comments = [
k for k in desc if k == "description" or k.startswith("__comment__")
]
for k in comments:
del desc[k]
for _, val in desc.items():
if isinstance(val, dict):
remove_comments(val)
def create_scoreboard(uid: str, desc: Dict[str, Any],
config_desc: Dict[str, Any]):
if desc["type"] == "scalar":
return scoreboard.Preemptive(uid, desc)
if desc["type"] == "vector":
return scoreboard.VecPreemptive(uid, desc, config_desc["vector_slices"])
assert False
def create_cpu(uarch_desc: Dict[str, Any], in_trace: FunctionalTrace) -> CPU:
"""Create CPU."""
# Read in the pipe maps.
pipe_map = {}
pipe_map_keys = pipe_map.keys() # This is a dynamic view
for pm_file in uarch_desc["pipe_maps"]:
with open(pm_file, "r", encoding="ascii") as pm_io:
pm = json.load(pm_io)
pm.pop("__comment__", None)
if pipe_map_keys & pm.keys():
logger.error("instruction(s) with multiple mappings: %s",
", ".join(pipe_map.keys() & pm.keys()))
sys.exit(1)
pipe_map.update(pm)
pipe_map = {k: v for k, v in pipe_map.items() if v != "UNKNOWN"}
rf_scoreboards = {
uid: create_scoreboard(uid, rf_desc, uarch_desc["config"])
for uid, rf_desc in uarch_desc["register_files"].items()
}
mem_sys = MemorySystem(uarch_desc["memory_system"])
cpu = CPU(pipe_map, rf_scoreboards, mem_sys, uarch_desc["config"], in_trace)
for uid, iq_desc in uarch_desc["issue_queues"].items():
cpu.sched_unit.add_queue(uid, iq_desc)
for kind, fu_desc in uarch_desc["functional_units"].items():
if fu_desc["type"] == "scalar":
cpu.exec_unit.add_pipe(kind,
[ScalarPipe(f"{kind}{i}", kind, fu_desc, cpu.mem_sys,
rf_scoreboards)
for i in range(fu_desc.get("count", 1))])
else:
assert fu_desc["type"] == "vector"
cpu.exec_unit.add_pipe(kind,
[VectorPipe(f"{kind}{i}", kind, fu_desc,
uarch_desc["config"]["vector_slices"], cpu.mem_sys,
rf_scoreboards)
for i in range(fu_desc.get("count", 1))])
return cpu
def main(argv: Sequence[str]) -> int:
tbm_options.parse_args(argv, description=__doc__)
# This assert convinces pytype that args is not None.
assert tbm_options.args is not None
log_level = logging.WARNING
if tbm_options.args.verbose > 0:
log_level = logging.INFO
utilities.logging_config(log_level)
uarch = load_uarch()
if tbm_options.args.trace is None:
tr = FunctionalTrace.from_json(sys.stdin, tbm_options.args.instructions)
cpu = create_cpu(uarch, tr)
cpu.simulate()
return 0
if tbm_options.args.json_trace:
with open(tbm_options.args.trace, "r", encoding="ascii") as in_trace:
tr = FunctionalTrace.from_json(in_trace,
tbm_options.args.instructions)
cpu = create_cpu(uarch, tr)
cpu.simulate()
return 0
with open(tbm_options.args.trace, "rb") as in_trace:
tr = FunctionalTrace.from_fb(in_trace, tbm_options.args.instructions)
cpu = create_cpu(uarch, tr)
cpu.simulate()
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))