| #!/usr/bin/env python3 |
| # Copyright lowRISC contributors. |
| # Licensed under the Apache License, Version 2.0, see LICENSE for details. |
| # SPDX-License-Identifier: Apache-2.0 |
| |
| import argparse |
| import collections |
| import datetime |
| import io |
| import logging |
| import os.path |
| import re |
| import subprocess |
| import sys |
| import tarfile |
| import time |
| import urllib.request |
| import xml.etree.ElementTree |
| from pathlib import Path |
| from typing import Dict, Set |
| |
| # Default location of the bitstreams cache. |
| CACHE_DIR = '~/.cache/opentitan-bitstreams' |
| # Default bucket URL. |
| BUCKET_URL = 'https://storage.googleapis.com/opentitan-bitstreams/' |
| # The xml document returned by the bucket is in this namespace. |
| XMLNS = {'': 'http://doc.s3.amazonaws.com/2006-03-01'} |
| |
| parser = argparse.ArgumentParser( |
| description='Bitstream Downloader & Cache manager') |
| parser.add_argument('--cache', default=CACHE_DIR, help='Cache directory name') |
| parser.add_argument('--latest-update', |
| default='latest.txt', |
| help='Last time the cache was updated') |
| parser.add_argument('--bucket-url', default=BUCKET_URL, help='GCP Bucket URL') |
| parser.add_argument('--build-file', |
| default='BUILD.bazel', |
| help='Name of the genrated BUILD file') |
| parser.add_argument('--list', |
| default=False, |
| action=argparse.BooleanOptionalAction, |
| help='List GCP Bucket contents') |
| parser.add_argument('--offline', |
| default=False, |
| action=argparse.BooleanOptionalAction, |
| help='Operating in an offline environment') |
| parser.add_argument('--refresh', |
| default=False, |
| action=argparse.BooleanOptionalAction, |
| help='Force a refresh') |
| parser.add_argument('--refresh-time', |
| default=300, |
| type=int, |
| help='How often to check for new bitstreams') |
| parser.add_argument('--repo', |
| default='', |
| help="Location of the source git repo") |
| parser.add_argument( |
| 'bitstream', |
| default='HEAD', |
| nargs='?', |
| help='Bitstream to retrieve: "latest" or git commit identifier') |
| |
| |
| class BitstreamCache(object): |
| |
| def __init__(self, bucket_url, cachedir, latest_update, offline=False): |
| """Initialize the Bitstream Cache Manager.""" |
| if bucket_url[-1] != '/': |
| bucket_url += '/' |
| self.bucket_url = bucket_url |
| cachedir = os.path.expanduser(cachedir) |
| self.cachedir = os.path.join(cachedir, 'cache') |
| latest_update = os.path.join(cachedir, |
| os.path.expanduser(latest_update)) |
| self.latest_update = latest_update |
| self.offline = offline |
| self.available = {} |
| |
| def InitRepository(self): |
| """Create the cache directory and symlink it into the bazel repository dir.""" |
| os.makedirs(self.cachedir, exist_ok=True) |
| os.symlink(self.cachedir, 'cache') |
| |
| def Touch(self, key): |
| """Set the latest known bitstream. |
| |
| Args: |
| key: str; The git hash of the latest bitstream. |
| """ |
| with open(self.latest_update, 'wt') as f: |
| print(key, file=f) |
| |
| def NeedRefresh(self, interval): |
| """Determine if the cache needs a refresh. |
| |
| Args: |
| interval: int; Desired interval between refresh. |
| Returns: |
| bool: whether a refresh is needed. |
| """ |
| try: |
| st = os.stat(self.latest_update) |
| return time.time() - st.st_mtime > interval |
| except FileNotFoundError: |
| return True |
| |
| def Get(self, file): |
| """Perform an HTTP GET from the GCP bitstream bucket. |
| |
| Args: |
| file: Filename in the bucket to retrieve. |
| Returns: |
| bytes |
| """ |
| response = urllib.request.urlopen(self.bucket_url + file) |
| return response.read() |
| |
| def GetBitstreamsAvailable(self, refresh): |
| """Inventory which bitstreams are available. |
| |
| Args: |
| refresh: bool; whether to refresh from the network. |
| """ |
| if not refresh: |
| for (_, dirnames, _) in os.walk('cache'): |
| for d in dirnames: |
| self.available[d] = 'local' |
| try: |
| with open(self.latest_update, 'rt') as f: |
| self.available['latest'] = f.read().strip() |
| except FileNotFoundError: |
| if self.offline: |
| logging.error( |
| 'Must pre-initialize bitstream cache in offline mode.') |
| else: |
| logging.error( |
| f'Bitstream cache missing {self.latest_update}.') |
| sys.exit(1) |
| return |
| document = self.Get('').decode('utf-8') |
| et = xml.etree.ElementTree.fromstring(document) |
| for content in et.findall('Contents', XMLNS): |
| for key in content.findall('Key', XMLNS): |
| m = re.search(r'bitstream-([0-9A-Fa-f]+).tar.gz', key.text) |
| if m: |
| self.available[m.group(1)] = key.text |
| latest = self.Get('master/latest.txt').decode('utf-8').split('\n') |
| self.available['latest'] = latest[1] |
| |
| def GetClosest(self, repodir, key): |
| """Get the best match for a bitstream (exact or next older commit). |
| |
| Args: |
| repodir: path; Path to the repo from which bitstreams are built. |
| key: str; A git hash or identifier of the desired bitstream. |
| Returns: |
| str or None: git hash of the closest bitstream. |
| """ |
| if key in self.available: |
| return key |
| commits = [] |
| lines = subprocess.check_output( |
| ['git', 'log', '--oneline', '--no-abbrev-commit', key], |
| universal_newlines=True, |
| cwd=repodir) |
| for line in lines.split('\n'): |
| commits.append(line.split(' ')[0]) |
| |
| for commit in commits: |
| if commit in self.available: |
| return commit |
| return None |
| |
| def _GetFromLocal(self, key): |
| """Get the bitstream files from the local filesystem. |
| |
| Args: |
| key: str; A git hash or the string 'latest'. |
| Returns: |
| list[str]: The requested bitstream files or empty list. |
| """ |
| if key == 'latest': |
| key = self.available['latest'] |
| files = [] |
| local_dir = os.path.join('cache', key) |
| for (dirname, _, filenames) in os.walk(local_dir): |
| files.extend(os.path.join(dirname, f) for f in filenames) |
| return files |
| |
| def _GetFromRemote(self, key): |
| """Get the bitstream files from GCP bucket. |
| The retrieved files are extracted into the cache directory. |
| |
| Args: |
| key: str; A git hash or the string 'latest'. |
| """ |
| if self.offline: |
| return |
| if key == 'latest': |
| latest = self.available['latest'] |
| key = latest |
| else: |
| latest = None |
| |
| remote_filename = self.available[key] |
| local_dir = os.path.join(self.cachedir, key) |
| archive = io.BytesIO(self.Get(remote_filename)) |
| tar = tarfile.open(fileobj=archive, mode='r|*') |
| tar.extractall(local_dir) |
| if latest: |
| self.Touch(latest) |
| |
| def GetFromCache(self, key: str) -> Dict[str, Set[str]]: |
| """Get the requested bitstream files. |
| |
| Args: |
| key: A git hash or the string 'latest'. |
| Returns: |
| A dictionary that maps file extensions to the set of bitstream files |
| with that extension. |
| """ |
| files = self._GetFromLocal(key) |
| if not files: |
| self._GetFromRemote(key) |
| files = self._GetFromLocal(key) |
| |
| extension_map: Dict[str, Set[str]] = collections.defaultdict(set) |
| for file_name in files: |
| _, extension = os.path.splitext(file_name) |
| if extension[0] != '.': |
| logging.error("Bitstream file has no extension: " + |
| repr(file_name)) |
| sys.exit(1) |
| key = extension[1:] |
| extension_map[key] |= {file_name} |
| return extension_map |
| |
| @staticmethod |
| def _GetDateTimeStr(): |
| return datetime.datetime.now().isoformat() |
| |
| def _ConstructBazelString(self, build_file: Path, key: str) -> str: |
| files_by_extension = self.GetFromCache(key) |
| |
| if len(files_by_extension.get('orig', set())) != 1 or \ |
| len(files_by_extension.get('splice', set())) != 1: |
| error_msg_lines = [ |
| "Could not find the bitstreams to generate a BUILD file:" + |
| repr(build_file), |
| "in files_by_extension:" + repr(files_by_extension), |
| "using key:" + repr(key), |
| ] |
| logging.error('\n'.join(error_msg_lines)) |
| sys.exit(1) |
| |
| (test_rom_file, ) = files_by_extension['orig'] |
| (rom_file, ) = files_by_extension['splice'] |
| |
| def filegroup_lines(name, src): |
| return [ |
| 'filegroup(', |
| ' name = "{}",'.format(name), |
| ' srcs = ["{}"],'.format(src), |
| ')', |
| '', |
| ] |
| |
| bazel_lines = [ |
| '# This file was autogenerated. Do not edit!', |
| '# Built at {}.'.format(BitstreamCache._GetDateTimeStr()), |
| '# Configured for bitstream: {}'.format(key), |
| '', |
| 'package(default_visibility = ["//visibility:public"])', |
| '', |
| 'exports_files(glob(["cache/**"]))', |
| '', |
| ] + filegroup_lines('bitstream_test_rom', test_rom_file) \ |
| + filegroup_lines('bitstream_rom', rom_file) |
| |
| used_target_names: Set[str] = set() |
| |
| for mmi_file in sorted(files_by_extension.get('mmi', set())): |
| target_name = os.path.basename(mmi_file).replace('.', '_') |
| |
| # Only use the top-level files for now. |
| target_directory = "cache/{}".format(key) |
| if (os.path.dirname(mmi_file) != target_directory): |
| continue |
| |
| if target_name in used_target_names: |
| logging.error( |
| "Target name {} for file {} would collide with another target" |
| .format(repr(target_name), repr(mmi_file))) |
| sys.exit(1) |
| used_target_names.add(target_name) |
| |
| bazel_lines += filegroup_lines(target_name, mmi_file) |
| |
| return '\n'.join(bazel_lines) |
| |
| def WriteBuildFile(self, build_file: Path, key: str) -> str: |
| """Write a BUILD file for the requested bitstream files. |
| |
| Args: |
| build: path; Filename of the BUILD file to write. |
| key: str; A git hash or the string 'latest'. |
| Returns: |
| Either `key` or the corresponding commit hash if `key` is 'latest'. |
| """ |
| with open(build_file, 'wt') as f: |
| f.write(self._ConstructBazelString(build_file, key)) |
| |
| if key != 'latest': |
| return key |
| return self.available[key] |
| |
| |
| def main(argv): |
| # The user can override some basic behaviors with the BITSTREAM |
| # environment variable. |
| env = os.environ.get('BITSTREAM') |
| if env: |
| argv.extend(env.split(' ')) |
| args = parser.parse_args(args=argv[1:]) |
| desired_bitstream = args.bitstream |
| |
| # We need to know the location of the main git repo, since this script |
| # will have its CWD in a bazel-managed 'external' directory. |
| # If not provided, we assume this script itself is located in the main |
| # git repository. |
| if args.repo: |
| if os.path.isdir(args.repo): |
| repo = args.repo |
| else: |
| repo = os.path.dirname(args.repo) |
| else: |
| repo = os.path.dirname(argv[0]) |
| |
| cache = BitstreamCache(args.bucket_url, args.cache, args.latest_update, |
| args.offline) |
| cache.InitRepository() |
| |
| # Do we need a refresh? |
| need_refresh = (args.refresh or desired_bitstream != 'latest' or |
| cache.NeedRefresh(args.refresh_time)) |
| cache.GetBitstreamsAvailable(need_refresh and not args.offline) |
| |
| # If commanded to print bitstream availability, do so. |
| if args.list: |
| for k, v in cache.available.items(): |
| print('{}: {}'.format(k, v)) |
| |
| # If we aren't getting the latest bitstream, resolve the hash to the closest |
| # bitstream we can find. |
| if desired_bitstream != 'latest': |
| closest_bitstream = cache.GetClosest(repo, desired_bitstream) |
| if closest_bitstream is None: |
| logging.error('Cannot find a bitstream close to {}'.format( |
| desired_bitstream)) |
| return 1 |
| if closest_bitstream != desired_bitstream: |
| logging.warning('Closest bitstream to {} is {}.'.format( |
| desired_bitstream, closest_bitstream)) |
| desired_bitstream = closest_bitstream |
| |
| # Write a build file which allows tests to reference the bitstreams with |
| # the labels: |
| # @bitstreams//:bitstream_test_rom |
| # @bitstreams//:bitstream_rom |
| configured_bitream = cache.WriteBuildFile(args.build_file, |
| desired_bitstream) |
| if desired_bitstream != 'latest' and configured_bitream != desired_bitstream: |
| logging.error( |
| 'Configured bitstream {} does not match desired bitstream {}.'. |
| format(configured_bitream, desired_bitstream)) |
| return 1 |
| logging.info( |
| 'Configured latest bitstream as {}.'.format(configured_bitream)) |
| |
| return 0 |
| |
| |
| if __name__ == '__main__': |
| sys.exit(main(sys.argv)) |