| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| r''' |
| This class holds a dict of message buckets according to the format defined |
| upon construction. It is meant to hold all message buckets of a build / tool |
| run, and provides convenience functions that streamline result aggregation |
| and printout in the Dvsim flow classes. |
| ''' |
| |
| import hjson |
| import copy |
| import logging as log |
| |
| from pathlib import Path |
| from typing import Dict, List |
| from utils import print_msg_list |
| from MsgBucket import MsgBucket |
| |
| |
| class MsgBuckets(): |
| def __init__(self, |
| bucket_cfgs: List[Dict]) -> None: |
| self.buckets = {f"{b['category']}_{b['severity']}": |
| MsgBucket(b['category'], b['severity'], b['label']) |
| for b in bucket_cfgs} |
| |
| def clear(self): |
| '''Clear all signatures in all buckets''' |
| for b in self.buckets.values(): |
| b.clear() |
| |
| def get_labels(self, |
| severity_filter: List[str] = []) -> List[str]: |
| ''' |
| Returns all bucket labels as a list. |
| |
| If severity_filter is not empty, only the buckets with the listed |
| severities will be returned. |
| ''' |
| for s in severity_filter: |
| if not MsgBucket.severity_is_known(s): |
| RuntimeError(f'Unknown severity {s}') |
| |
| labels = [] |
| for key, b in self.buckets.items(): |
| if not severity_filter or b.severity in severity_filter: |
| labels.append(b.label) |
| return labels |
| |
| def get_keys(self, |
| severity_filter: List[str] = []) -> List[str]: |
| ''' |
| Returns all bucket keys as a list. |
| |
| If severity_filter is not empty, only the buckets with the listed |
| severities will be returned. |
| ''' |
| for s in severity_filter: |
| if not MsgBucket.severity_is_known(s): |
| RuntimeError(f'Unknown severity {s}') |
| |
| keys = [] |
| for key, b in self.buckets.items(): |
| if not severity_filter or b.severity in severity_filter: |
| keys.append(key) |
| return keys |
| |
| def get_counts(self, |
| keys: List[str] = None, |
| severity_filter: List[str] = []) -> List[int]: |
| ''' |
| Get bucket count totals as a list of integers. |
| |
| The bucket keys can be supplied externally, in which case the |
| severity_filter does not apply. If a specific bucket key does not |
| exist, a 0 count will be returned for that specific key. |
| |
| If severity_filter is not empty, only the buckets with the listed |
| severities will be returned. |
| ''' |
| if not keys: |
| keys = self.get_keys(severity_filter) |
| counts = [] |
| for k in keys: |
| c = self.buckets[k].count() if k in self.buckets else 0 |
| counts.append(c) |
| return counts |
| |
| def get_counts_md(self, |
| keys: List[str] = None, |
| severity_filter: List[str] = [], |
| colmap: bool = True) -> List[str]: |
| ''' |
| Get bucket count totals as a list of strings with optional colormap. |
| |
| The bucket keys can be supplied externally, in which case the |
| severity_filter does not apply. If a specific bucket key does not |
| exist, a "--" string will be returned for that specific key. |
| |
| If severity_filter is not empty, only the buckets with the listed |
| severities will be returned. |
| ''' |
| if not keys: |
| keys = self.get_keys(severity_filter) |
| counts = [] |
| for k in keys: |
| c = self.buckets[k].count_md(colmap) if k in self.buckets else '--' |
| counts.append(c) |
| return counts |
| |
| def has_signatures(self, |
| severity_filter: List[str] = []) -> bool: |
| ''' |
| Checks whether there are any signatures with specified severities. |
| |
| If severity_filter is empty, the method returns true if any of the |
| buckets contains a nonzero amount of signatures. |
| |
| ''' |
| return any(self.get_counts(severity_filter=severity_filter)) |
| |
| def print_signatures_md(self, |
| severity_filter: List[str] = [], |
| max_per_bucket: int = -1) -> str: |
| ''' |
| Render signatures into a string buffer. |
| |
| The bucket labels are used as subtitles in this printout. |
| |
| If severity_filter is not empty, only the buckets with the listed |
| severities will be returned. |
| |
| The number of messages printed per bucket can be limited by |
| setting max_per_bucket to a nonnegative value. |
| ''' |
| msgs = '' |
| keys = self.get_keys(severity_filter) |
| |
| for k in keys: |
| msgs += print_msg_list(f'#### {self.buckets[k].label}', |
| self.buckets[k].signatures, |
| max_per_bucket) |
| return msgs |
| |
| def merge(self, other) -> None: |
| ''' |
| Merge other MsgBuckets object into this one. |
| |
| This will append signatures to the corresponding bucket if it exists. |
| If the bucket does not yet exist it will be created. |
| ''' |
| |
| for k, b in other.buckets.items(): |
| if k in self.buckets: |
| self.buckets[k].merge(b) |
| else: |
| self.buckets.update({k: copy.deepcopy(b)}) |
| |
| def __add__(self, other): |
| ''' |
| Merges two MsgBucket objects into one. |
| |
| Buckets will be uniquified, and signatures in buckets with the same |
| category and severity will be merged. |
| ''' |
| mb = copy.deepcopy(self) |
| mb.merge(other) |
| return mb |
| |
| # TODO(#9079): remove the method below once the log parsing has been |
| # merged into the Dvsim core code. |
| def load_hjson(self, result_path: Path) -> None: |
| '''Clear internal data structure and initialize with values in Hjson''' |
| self.clear() |
| try: |
| with result_path.open() as results_file: |
| results_dict = hjson.load(results_file, use_decimal=True) |
| except IOError as err: |
| log.warning("%s", err) |
| if 'flow_error' not in self.buckets: |
| self.buckets.update({'flow_error': MsgBucket('flow', |
| 'error')}) |
| self.buckets['flow_error'].signatures.append('IOError: %s' % err) |
| return |
| |
| for k, signatures in results_dict.items(): |
| if not isinstance(signatures, List): |
| raise RuntimeError(f'Signatures in {k} must be a ' |
| 'list of strings') |
| self.buckets[k].signatures.extend(signatures) |