| #!/usr/bin/env python3 | 
 | # Copyright lowRISC contributors. | 
 | # Licensed under the Apache License, Version 2.0, see LICENSE for details. | 
 | # SPDX-License-Identifier: Apache-2.0 | 
 |  | 
 | import argparse | 
 | import collections | 
 | import datetime | 
 | import io | 
 | import logging | 
 | import os.path | 
 | import re | 
 | import subprocess | 
 | import sys | 
 | import tarfile | 
 | import time | 
 | import urllib.request | 
 | import xml.etree.ElementTree | 
 | from pathlib import Path | 
 | from typing import Dict, Set | 
 |  | 
 | # Default location of the bitstreams cache. | 
 | CACHE_DIR = '~/.cache/opentitan-bitstreams' | 
 | # Default bucket URL. | 
 | BUCKET_URL = 'https://storage.googleapis.com/opentitan-bitstreams/' | 
 | # The xml document returned by the bucket is in this namespace. | 
 | XMLNS = {'': 'http://doc.s3.amazonaws.com/2006-03-01'} | 
 |  | 
 | parser = argparse.ArgumentParser( | 
 |     description='Bitstream Downloader & Cache manager') | 
 | parser.add_argument('--cache', default=CACHE_DIR, help='Cache directory name') | 
 | parser.add_argument('--latest-update', | 
 |                     default='latest.txt', | 
 |                     help='Last time the cache was updated') | 
 | parser.add_argument('--bucket-url', default=BUCKET_URL, help='GCP Bucket URL') | 
 | parser.add_argument('--build-file', | 
 |                     default='BUILD.bazel', | 
 |                     help='Name of the genrated BUILD file') | 
 | parser.add_argument('--list', | 
 |                     default=False, | 
 |                     action=argparse.BooleanOptionalAction, | 
 |                     help='List GCP Bucket contents') | 
 | parser.add_argument('--offline', | 
 |                     default=False, | 
 |                     action=argparse.BooleanOptionalAction, | 
 |                     help='Operating in an offline environment') | 
 | parser.add_argument('--refresh', | 
 |                     default=False, | 
 |                     action=argparse.BooleanOptionalAction, | 
 |                     help='Force a refresh') | 
 | parser.add_argument('--refresh-time', | 
 |                     default=300, | 
 |                     type=int, | 
 |                     help='How often to check for new bitstreams') | 
 | parser.add_argument('--repo', | 
 |                     default='', | 
 |                     help="Location of the source git repo") | 
 | parser.add_argument( | 
 |     'bitstream', | 
 |     default='HEAD', | 
 |     nargs='?', | 
 |     help='Bitstream to retrieve: "latest" or git commit identifier') | 
 |  | 
 |  | 
 | class BitstreamCache(object): | 
 |  | 
 |     def __init__(self, bucket_url, cachedir, latest_update, offline=False): | 
 |         """Initialize the Bitstream Cache Manager.""" | 
 |         if bucket_url[-1] != '/': | 
 |             bucket_url += '/' | 
 |         self.bucket_url = bucket_url | 
 |         cachedir = os.path.expanduser(cachedir) | 
 |         self.cachedir = os.path.join(cachedir, 'cache') | 
 |         latest_update = os.path.join(cachedir, | 
 |                                      os.path.expanduser(latest_update)) | 
 |         self.latest_update = latest_update | 
 |         self.offline = offline | 
 |         self.available = {} | 
 |  | 
 |     def InitRepository(self): | 
 |         """Create the cache directory and symlink it into the bazel repository dir.""" | 
 |         os.makedirs(self.cachedir, exist_ok=True) | 
 |         os.symlink(self.cachedir, 'cache') | 
 |  | 
 |     def Touch(self, key): | 
 |         """Set the latest known bitstream. | 
 |  | 
 |         Args: | 
 |             key: str; The git hash of the latest bitstream. | 
 |         """ | 
 |         with open(self.latest_update, 'wt') as f: | 
 |             print(key, file=f) | 
 |  | 
 |     def NeedRefresh(self, interval): | 
 |         """Determine if the cache needs a refresh. | 
 |  | 
 |         Args: | 
 |             interval: int; Desired interval between refresh. | 
 |         Returns: | 
 |             bool: whether a refresh is needed. | 
 |         """ | 
 |         try: | 
 |             st = os.stat(self.latest_update) | 
 |             return time.time() - st.st_mtime > interval | 
 |         except FileNotFoundError: | 
 |             return True | 
 |  | 
 |     def Get(self, file): | 
 |         """Perform an HTTP GET from the GCP bitstream bucket. | 
 |  | 
 |         Args: | 
 |             file: Filename in the bucket to retrieve. | 
 |         Returns: | 
 |             bytes | 
 |         """ | 
 |         response = urllib.request.urlopen(self.bucket_url + file) | 
 |         return response.read() | 
 |  | 
 |     def GetBitstreamsAvailable(self, refresh): | 
 |         """Inventory which bitstreams are available. | 
 |  | 
 |         Args: | 
 |             refresh: bool; whether to refresh from the network. | 
 |         """ | 
 |         if not refresh: | 
 |             for (_, dirnames, _) in os.walk('cache'): | 
 |                 for d in dirnames: | 
 |                     self.available[d] = 'local' | 
 |             try: | 
 |                 with open(self.latest_update, 'rt') as f: | 
 |                     self.available['latest'] = f.read().strip() | 
 |             except FileNotFoundError: | 
 |                 if self.offline: | 
 |                     logging.error( | 
 |                         'Must pre-initialize bitstream cache in offline mode.') | 
 |                 else: | 
 |                     logging.error( | 
 |                         f'Bitstream cache missing {self.latest_update}.') | 
 |                 sys.exit(1) | 
 |             return | 
 |         document = self.Get('').decode('utf-8') | 
 |         et = xml.etree.ElementTree.fromstring(document) | 
 |         for content in et.findall('Contents', XMLNS): | 
 |             for key in content.findall('Key', XMLNS): | 
 |                 m = re.search(r'bitstream-([0-9A-Fa-f]+).tar.gz', key.text) | 
 |                 if m: | 
 |                     self.available[m.group(1)] = key.text | 
 |         latest = self.Get('master/latest.txt').decode('utf-8').split('\n') | 
 |         self.available['latest'] = latest[1] | 
 |  | 
 |     def GetClosest(self, repodir, key): | 
 |         """Get the best match for a bitstream (exact or next older commit). | 
 |  | 
 |         Args: | 
 |             repodir: path; Path to the repo from which bitstreams are built. | 
 |             key: str; A git hash or identifier of the desired bitstream. | 
 |         Returns: | 
 |             str or None: git hash of the closest bitstream. | 
 |         """ | 
 |         if key in self.available: | 
 |             return key | 
 |         commits = [] | 
 |         lines = subprocess.check_output( | 
 |             ['git', 'log', '--oneline', '--no-abbrev-commit', key], | 
 |             universal_newlines=True, | 
 |             cwd=repodir) | 
 |         for line in lines.split('\n'): | 
 |             commits.append(line.split(' ')[0]) | 
 |  | 
 |         for commit in commits: | 
 |             if commit in self.available: | 
 |                 return commit | 
 |         return None | 
 |  | 
 |     def _GetFromLocal(self, key): | 
 |         """Get the bitstream files from the local filesystem. | 
 |  | 
 |         Args: | 
 |             key: str; A git hash or the string 'latest'. | 
 |         Returns: | 
 |             list[str]: The requested bitstream files or empty list. | 
 |         """ | 
 |         if key == 'latest': | 
 |             key = self.available['latest'] | 
 |         files = [] | 
 |         local_dir = os.path.join('cache', key) | 
 |         for (dirname, _, filenames) in os.walk(local_dir): | 
 |             files.extend(os.path.join(dirname, f) for f in filenames) | 
 |         return files | 
 |  | 
 |     def _GetFromRemote(self, key): | 
 |         """Get the bitstream files from GCP bucket. | 
 |         The retrieved files are extracted into the cache directory. | 
 |  | 
 |         Args: | 
 |             key: str; A git hash or the string 'latest'. | 
 |         """ | 
 |         if self.offline: | 
 |             return | 
 |         if key == 'latest': | 
 |             latest = self.available['latest'] | 
 |             key = latest | 
 |         else: | 
 |             latest = None | 
 |  | 
 |         remote_filename = self.available[key] | 
 |         local_dir = os.path.join(self.cachedir, key) | 
 |         archive = io.BytesIO(self.Get(remote_filename)) | 
 |         tar = tarfile.open(fileobj=archive, mode='r|*') | 
 |         tar.extractall(local_dir) | 
 |         if latest: | 
 |             self.Touch(latest) | 
 |  | 
 |     def GetFromCache(self, key: str) -> Dict[str, Set[str]]: | 
 |         """Get the requested bitstream files. | 
 |  | 
 |         Args: | 
 |             key: A git hash or the string 'latest'. | 
 |         Returns: | 
 |             A dictionary that maps file extensions to the set of bitstream files | 
 |             with that extension. | 
 |         """ | 
 |         files = self._GetFromLocal(key) | 
 |         if not files: | 
 |             self._GetFromRemote(key) | 
 |             files = self._GetFromLocal(key) | 
 |  | 
 |         extension_map: Dict[str, Set[str]] = collections.defaultdict(set) | 
 |         for file_name in files: | 
 |             _, extension = os.path.splitext(file_name) | 
 |             if extension[0] != '.': | 
 |                 logging.error("Bitstream file has no extension: " + | 
 |                               repr(file_name)) | 
 |                 sys.exit(1) | 
 |             key = extension[1:] | 
 |             extension_map[key] |= {file_name} | 
 |         return extension_map | 
 |  | 
 |     @staticmethod | 
 |     def _GetDateTimeStr(): | 
 |         return datetime.datetime.now().isoformat() | 
 |  | 
 |     def _ConstructBazelString(self, build_file: Path, key: str) -> str: | 
 |         files_by_extension = self.GetFromCache(key) | 
 |  | 
 |         if len(files_by_extension.get('orig', set())) != 1 or \ | 
 |            len(files_by_extension.get('splice', set())) != 1: | 
 |             error_msg_lines = [ | 
 |                 "Could not find the bitstreams to generate a BUILD file:" + | 
 |                 repr(build_file), | 
 |                 "in files_by_extension:" + repr(files_by_extension), | 
 |                 "using key:" + repr(key), | 
 |             ] | 
 |             logging.error('\n'.join(error_msg_lines)) | 
 |             sys.exit(1) | 
 |  | 
 |         (test_rom_file, ) = files_by_extension['orig'] | 
 |         (rom_file, ) = files_by_extension['splice'] | 
 |  | 
 |         def filegroup_lines(name, src): | 
 |             return [ | 
 |                 'filegroup(', | 
 |                 '    name = "{}",'.format(name), | 
 |                 '    srcs = ["{}"],'.format(src), | 
 |                 ')', | 
 |                 '', | 
 |             ] | 
 |  | 
 |         bazel_lines = [ | 
 |             '# This file was autogenerated. Do not edit!', | 
 |             '# Built at {}.'.format(BitstreamCache._GetDateTimeStr()), | 
 |             '# Configured for bitstream: {}'.format(key), | 
 |             '', | 
 |             'package(default_visibility = ["//visibility:public"])', | 
 |             '', | 
 |             'exports_files(glob(["cache/**"]))', | 
 |             '', | 
 |         ] + filegroup_lines('bitstream_test_rom', test_rom_file) \ | 
 |           + filegroup_lines('bitstream_rom', rom_file) | 
 |  | 
 |         used_target_names: Set[str] = set() | 
 |  | 
 |         for mmi_file in sorted(files_by_extension.get('mmi', set())): | 
 |             target_name = os.path.basename(mmi_file).replace('.', '_') | 
 |  | 
 |             # Only use the top-level files for now. | 
 |             target_directory = "cache/{}".format(key) | 
 |             if (os.path.dirname(mmi_file) != target_directory): | 
 |               continue | 
 |  | 
 |             if target_name in used_target_names: | 
 |                 logging.error( | 
 |                     "Target name {} for file {} would collide with another target" | 
 |                     .format(repr(target_name), repr(mmi_file))) | 
 |                 sys.exit(1) | 
 |             used_target_names.add(target_name) | 
 |  | 
 |             bazel_lines += filegroup_lines(target_name, mmi_file) | 
 |  | 
 |         return '\n'.join(bazel_lines) | 
 |  | 
 |     def WriteBuildFile(self, build_file: Path, key: str) -> str: | 
 |         """Write a BUILD file for the requested bitstream files. | 
 |  | 
 |         Args: | 
 |             build: path; Filename of the BUILD file to write. | 
 |             key: str; A git hash or the string 'latest'. | 
 |         Returns: | 
 |             Either `key` or the corresponding commit hash if `key` is 'latest'. | 
 |         """ | 
 |         with open(build_file, 'wt') as f: | 
 |             f.write(self._ConstructBazelString(build_file, key)) | 
 |  | 
 |         if key != 'latest': | 
 |             return key | 
 |         return self.available[key] | 
 |  | 
 |  | 
 | def main(argv): | 
 |     # The user can override some basic behaviors with the BITSTREAM | 
 |     # environment variable. | 
 |     env = os.environ.get('BITSTREAM') | 
 |     if env: | 
 |         argv.extend(env.split(' ')) | 
 |     args = parser.parse_args(args=argv[1:]) | 
 |     desired_bitstream = args.bitstream | 
 |  | 
 |     # We need to know the location of the main git repo, since this script | 
 |     # will have its CWD in a bazel-managed 'external' directory. | 
 |     # If not provided, we assume this script itself is located in the main | 
 |     # git repository. | 
 |     if args.repo: | 
 |         if os.path.isdir(args.repo): | 
 |             repo = args.repo | 
 |         else: | 
 |             repo = os.path.dirname(args.repo) | 
 |     else: | 
 |         repo = os.path.dirname(argv[0]) | 
 |  | 
 |     cache = BitstreamCache(args.bucket_url, args.cache, args.latest_update, | 
 |                            args.offline) | 
 |     cache.InitRepository() | 
 |  | 
 |     # Do we need a refresh? | 
 |     need_refresh = (args.refresh or desired_bitstream != 'latest' or | 
 |                     cache.NeedRefresh(args.refresh_time)) | 
 |     cache.GetBitstreamsAvailable(need_refresh and not args.offline) | 
 |  | 
 |     # If commanded to print bitstream availability, do so. | 
 |     if args.list: | 
 |         for k, v in cache.available.items(): | 
 |             print('{}: {}'.format(k, v)) | 
 |  | 
 |     # If we aren't getting the latest bitstream, resolve the hash to the closest | 
 |     # bitstream we can find. | 
 |     if desired_bitstream != 'latest': | 
 |         closest_bitstream = cache.GetClosest(repo, desired_bitstream) | 
 |         if closest_bitstream is None: | 
 |             logging.error('Cannot find a bitstream close to {}'.format( | 
 |                 desired_bitstream)) | 
 |             return 1 | 
 |         if closest_bitstream != desired_bitstream: | 
 |             logging.warning('Closest bitstream to {} is {}.'.format( | 
 |                 desired_bitstream, closest_bitstream)) | 
 |             desired_bitstream = closest_bitstream | 
 |  | 
 |     # Write a build file which allows tests to reference the bitstreams with | 
 |     # the labels: | 
 |     #   @bitstreams//:bitstream_test_rom | 
 |     #   @bitstreams//:bitstream_rom | 
 |     configured_bitream = cache.WriteBuildFile(args.build_file, | 
 |                                               desired_bitstream) | 
 |     if desired_bitstream != 'latest' and configured_bitream != desired_bitstream: | 
 |         logging.error( | 
 |             'Configured bitstream {} does not match desired bitstream {}.'. | 
 |             format(configured_bitream, desired_bitstream)) | 
 |         return 1 | 
 |     logging.info( | 
 |         'Configured latest bitstream as {}.'.format(configured_bitream)) | 
 |  | 
 |     return 0 | 
 |  | 
 |  | 
 | if __name__ == '__main__': | 
 |     sys.exit(main(sys.argv)) |