blob: e5f829529b390879ab24d853160845f93ce5c62a [file] [log] [blame] [edit]
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
r'''
This class holds a dict of message buckets according to the format defined
upon construction. It is meant to hold all message buckets of a build / tool
run, and provides convenience functions that streamline result aggregation
and printout in the Dvsim flow classes.
'''
import hjson
import copy
import logging as log
from pathlib import Path
from typing import Dict, List
from utils import print_msg_list
from MsgBucket import MsgBucket
class MsgBuckets():
def __init__(self,
bucket_cfgs: List[Dict]) -> None:
self.buckets = {f"{b['category']}_{b['severity']}":
MsgBucket(b['category'], b['severity'], b['label'])
for b in bucket_cfgs}
def clear(self):
'''Clear all signatures in all buckets'''
for b in self.buckets.values():
b.clear()
def get_labels(self,
severity_filter: List[str] = []) -> List[str]:
'''
Returns all bucket labels as a list.
If severity_filter is not empty, only the buckets with the listed
severities will be returned.
'''
for s in severity_filter:
if not MsgBucket.severity_is_known(s):
RuntimeError(f'Unknown severity {s}')
labels = []
for key, b in self.buckets.items():
if not severity_filter or b.severity in severity_filter:
labels.append(b.label)
return labels
def get_keys(self,
severity_filter: List[str] = []) -> List[str]:
'''
Returns all bucket keys as a list.
If severity_filter is not empty, only the buckets with the listed
severities will be returned.
'''
for s in severity_filter:
if not MsgBucket.severity_is_known(s):
RuntimeError(f'Unknown severity {s}')
keys = []
for key, b in self.buckets.items():
if not severity_filter or b.severity in severity_filter:
keys.append(key)
return keys
def get_counts(self,
keys: List[str] = None,
severity_filter: List[str] = []) -> List[int]:
'''
Get bucket count totals as a list of integers.
The bucket keys can be supplied externally, in which case the
severity_filter does not apply. If a specific bucket key does not
exist, a 0 count will be returned for that specific key.
If severity_filter is not empty, only the buckets with the listed
severities will be returned.
'''
if not keys:
keys = self.get_keys(severity_filter)
counts = []
for k in keys:
c = self.buckets[k].count() if k in self.buckets else 0
counts.append(c)
return counts
def get_counts_md(self,
keys: List[str] = None,
severity_filter: List[str] = [],
colmap: bool = True) -> List[str]:
'''
Get bucket count totals as a list of strings with optional colormap.
The bucket keys can be supplied externally, in which case the
severity_filter does not apply. If a specific bucket key does not
exist, a "--" string will be returned for that specific key.
If severity_filter is not empty, only the buckets with the listed
severities will be returned.
'''
if not keys:
keys = self.get_keys(severity_filter)
counts = []
for k in keys:
c = self.buckets[k].count_md(colmap) if k in self.buckets else '--'
counts.append(c)
return counts
def has_signatures(self,
severity_filter: List[str] = []) -> bool:
'''
Checks whether there are any signatures with specified severities.
If severity_filter is empty, the method returns true if any of the
buckets contains a nonzero amount of signatures.
'''
return any(self.get_counts(severity_filter=severity_filter))
def print_signatures_md(self,
severity_filter: List[str] = [],
max_per_bucket: int = -1) -> str:
'''
Render signatures into a string buffer.
The bucket labels are used as subtitles in this printout.
If severity_filter is not empty, only the buckets with the listed
severities will be returned.
The number of messages printed per bucket can be limited by
setting max_per_bucket to a nonnegative value.
'''
msgs = ''
keys = self.get_keys(severity_filter)
for k in keys:
msgs += print_msg_list(f'#### {self.buckets[k].label}',
self.buckets[k].signatures,
max_per_bucket)
return msgs
def merge(self, other) -> None:
'''
Merge other MsgBuckets object into this one.
This will append signatures to the corresponding bucket if it exists.
If the bucket does not yet exist it will be created.
'''
for k, b in other.buckets.items():
if k in self.buckets:
self.buckets[k].merge(b)
else:
self.buckets.update({k: copy.deepcopy(b)})
def __add__(self, other):
'''
Merges two MsgBucket objects into one.
Buckets will be uniquified, and signatures in buckets with the same
category and severity will be merged.
'''
mb = copy.deepcopy(self)
mb.merge(other)
return mb
# TODO(#9079): remove the method below once the log parsing has been
# merged into the Dvsim core code.
def load_hjson(self, result_path: Path) -> None:
'''Clear internal data structure and initialize with values in Hjson'''
self.clear()
try:
with result_path.open() as results_file:
results_dict = hjson.load(results_file, use_decimal=True)
except IOError as err:
log.warning("%s", err)
if 'flow_error' not in self.buckets:
self.buckets.update({'flow_error': MsgBucket('flow',
'error')})
self.buckets['flow_error'].signatures.append('IOError: %s' % err)
return
for k, signatures in results_dict.items():
if not isinstance(signatures, List):
raise RuntimeError(f'Signatures in {k} must be a '
'list of strings')
self.buckets[k].signatures.extend(signatures)