blob: cfe4f4bab7af5f4a11e8508bec4c41eb7adb016e [file] [log] [blame]
#!/usr/bin/env python3
# Copyright lowRISC contributors.
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0
import argparse
import collections
import datetime
import io
import json
import logging
import os.path
import re
import subprocess
import sys
import tarfile
import time
import urllib.request
import xml.etree.ElementTree
from pathlib import Path
from typing import Dict, Set
import jsonschema
# The schema version used for legacy cache entries, JSON files missing a version
# entry, and entries that use a higher version of the schema than supported here
# (attempted in case there is forwards compatibility).
MANIFEST_SCHEMA_VERSION = 1
# Default location of the bitstreams cache.
CACHE_DIR = '~/.cache/opentitan-bitstreams'
# Default bucket URL.
BUCKET_URL = 'https://storage.googleapis.com/opentitan-bitstreams/'
# The xml document returned by the bucket is in this namespace.
XMLNS = {'': 'http://doc.s3.amazonaws.com/2006-03-01'}
# Manifest schema directory
MANIFESTS_DIR = os.path.dirname(__file__) if __file__ else os.path.dirname(sys.argv[0])
parser = argparse.ArgumentParser(
description='Bitstream Downloader & Cache manager')
parser.add_argument('--cache', default=CACHE_DIR, help='Cache directory name')
parser.add_argument('--latest-update',
default='latest.txt',
help='Last time the cache was updated')
parser.add_argument('--bucket-url', default=BUCKET_URL, help='GCP Bucket URL')
parser.add_argument('--build-file',
default='BUILD.bazel',
help='Name of the genrated BUILD file')
parser.add_argument('--list',
default=False,
action=argparse.BooleanOptionalAction,
help='List GCP Bucket contents')
parser.add_argument('--offline',
default=False,
action=argparse.BooleanOptionalAction,
help='Operating in an offline environment')
parser.add_argument('--refresh',
default=False,
action=argparse.BooleanOptionalAction,
help='Force a refresh')
parser.add_argument('--refresh-time',
default=300,
type=int,
help='How often to check for new bitstreams')
parser.add_argument('--repo',
default='',
help="Location of the source git repo")
parser.add_argument(
'bitstream',
default='HEAD',
nargs='?',
help='Bitstream to retrieve: "latest" or git commit identifier')
class BitstreamCache(object):
def __init__(self, bucket_url, cachedir, latest_update, offline=False):
"""Initialize the Bitstream Cache Manager."""
if bucket_url[-1] != '/':
bucket_url += '/'
self.bucket_url = bucket_url
cachedir = os.path.expanduser(cachedir)
self.cachedir = os.path.join(cachedir, 'cache')
latest_update = os.path.join(cachedir,
os.path.expanduser(latest_update))
self.latest_update = latest_update
self.offline = offline
self.available = {}
@staticmethod
def MakeWithDefaults() -> 'BitstreamCache':
"""Create a BitstreamCache with default parameters."""
args = parser.parse_args([])
cache = BitstreamCache(args.bucket_url, args.cache, args.latest_update,
args.offline)
return cache
def InitRepository(self):
"""Create the cache directory and symlink it into the bazel repository dir."""
os.makedirs(self.cachedir, exist_ok=True)
os.symlink(self.cachedir, 'cache')
def Touch(self, key):
"""Set the latest known bitstream.
Args:
key: str; The git hash of the latest bitstream.
"""
with open(self.latest_update, 'wt') as f:
print(key, file=f)
def NeedRefresh(self, interval):
"""Determine if the cache needs a refresh.
Args:
interval: int; Desired interval between refresh.
Returns:
bool: whether a refresh is needed.
"""
try:
st = os.stat(self.latest_update)
return time.time() - st.st_mtime > interval
except FileNotFoundError:
return True
def Get(self, file, **kwargs):
"""Perform an HTTP GET from the GCP bitstream bucket.
Args:
file: Filename in the bucket to retrieve.
kwargs: Any query parameters to include in the request
Returns:
bytes
"""
url_parts = urllib.parse.urlparse(self.bucket_url)
path = urllib.parse.quote(url_parts.path + file)
query = urllib.parse.urlencode(kwargs)
url = url_parts._replace(path=path, query=query).geturl()
response = urllib.request.urlopen(url)
return response.read()
def GetBitstreamsAvailable(self, refresh):
"""Inventory which bitstreams are available.
Args:
refresh: bool; whether to refresh from the network.
"""
if not refresh:
for (_, dirnames, _) in os.walk('cache'):
for d in dirnames:
self.available[d] = 'local'
try:
with open(self.latest_update, 'rt') as f:
self.available['latest'] = f.read().strip()
except FileNotFoundError:
if self.offline:
logging.error(
'Must pre-initialize bitstream cache in offline mode.')
else:
logging.error(
f'Bitstream cache missing {self.latest_update}.')
sys.exit(1)
return
# Fetching the list of all entries in the cache may require multiple
# requests since GCP will paginate long reponses. Markers are used to
# fetch subsequent pages.
# See the GCP docs and the XML REST API reference for more details:
# https://cloud.google.com/storage/docs/paginate-results
# https://cloud.google.com/storage/docs/xml-api/get-bucket-list
marker = ''
while True:
document = self.Get('', marker=marker).decode('utf-8')
et = xml.etree.ElementTree.fromstring(document)
for content in et.findall('Contents', XMLNS):
for key in content.findall('Key', XMLNS):
m = re.search(r'bitstream-([0-9A-Fa-f]+).tar.gz', key.text)
if m:
self.available[m.group(1)] = key.text
# Handle any pagination
is_truncated_elt = et.find('IsTruncated', XMLNS)
if (is_truncated_elt is not None) and \
(is_truncated_elt.text == 'true'):
marker = et.find('NextMarker', XMLNS).text
else:
break
latest = self.Get('master/latest.txt').decode('utf-8').split('\n')
self.available['latest'] = latest[1]
def GetClosest(self, repodir, key):
"""Get the best match for a bitstream (exact or next older commit).
Args:
repodir: path; Path to the repo from which bitstreams are built.
key: str; A git hash or identifier of the desired bitstream.
Returns:
str or None: git hash of the closest bitstream.
"""
if key in self.available:
return key
commits = []
lines = subprocess.check_output(
['git', 'log', '--oneline', '--no-abbrev-commit', key],
universal_newlines=True,
cwd=repodir)
for line in lines.split('\n'):
commits.append(line.split(' ')[0])
for commit in commits:
if commit in self.available:
return commit
return None
def _GetFromLocal(self, key):
"""Get the bitstream files from the local filesystem.
Args:
key: str; A git hash or the string 'latest'.
Returns:
list[str]: The requested bitstream files or empty list.
"""
if key == 'latest':
key = self.available['latest']
files = []
local_dir = os.path.join('cache', key)
for (dirname, _, filenames) in os.walk(local_dir):
files.extend(os.path.join(dirname, f) for f in filenames)
return files
def _GetFromRemote(self, key):
"""Get the bitstream files from GCP bucket.
The retrieved files are extracted into the cache directory.
Args:
key: str; A git hash or the string 'latest'.
"""
if self.offline:
return
if key == 'latest':
latest = self.available['latest']
key = latest
else:
latest = None
remote_filename = self.available[key]
local_dir = os.path.join(self.cachedir, key)
archive = io.BytesIO(self.Get(remote_filename))
tar = tarfile.open(fileobj=archive, mode='r|*')
tar.extractall(local_dir)
if latest:
self.Touch(latest)
if not os.path.exists(self.latest_update):
self.Touch(key)
def _GenerateLegacyManifest(self, key: str, files: [str]) -> Dict:
"""Generate a manifest for old cache entries without them.
Args:
key: A git hash
files: A list of file paths
Returns:
A dictionary mapping file paths to manifest entries
"""
output_files = dict()
legacy_files = {
"lowrisc_systems_chip_earlgrey_cw310_0.1.bit.orig": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310",
"outputInfo": {
"@type": "bitstreamInfo",
"design": "chip_earlgrey_cw310"
}
},
"otp.mmi": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310",
"outputInfo": {
"@type": "memoryMapInfo",
"design": "chip_earlgrey_cw310",
"memoryId": "otp"
}
},
"rom.mmi": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310",
"outputInfo": {
"@type": "memoryMapInfo",
"design": "chip_earlgrey_cw310",
"memoryId": "rom"
}
},
"chip_earlgrey_cw310_hyperdebug/"
"lowrisc_systems_chip_earlgrey_cw310_hyperdebug_0.1.bit": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310_hyperdebug",
"outputInfo": {
"@type": "bitstreamInfo",
"design": "chip_earlgrey_cw310_hyperdebug"
}
},
"chip_earlgrey_cw310_hyperdebug/otp.mmi": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310_hyperdebug",
"outputInfo": {
"@type": "memoryMapInfo",
"design": "chip_earlgrey_cw310_hyperdebug",
"memoryId": "otp"
}
},
"chip_earlgrey_cw310_hyperdebug/rom.mmi": {
"buildTarget": "//hw/bitstream/vivado:fpga_cw310_hyperdebug",
"outputInfo": {
"@type": "memoryMapInfo",
"design": "chip_earlgrey_cw310_hyperdebug",
"memoryId": "rom"
}
},
}
for legacy_file in legacy_files:
if os.path.join("cache", key, legacy_file) in files:
output_files[legacy_file] = legacy_files[legacy_file]
manifest = {"schemaVersion": MANIFEST_SCHEMA_VERSION, "buildId": key,
"outputFiles": output_files}
return manifest
def GetFromCache(self, key: str) -> Dict:
"""Get the requested bitstream files manifest.
Args:
key: A git hash or the string 'latest'.
Returns:
A dictionary that represents a bitstream cache manifest.
"""
files = self._GetFromLocal(key)
if not files:
self._GetFromRemote(key)
files = self._GetFromLocal(key)
manifest_path = os.path.join("cache", key, "manifest.json")
if manifest_path not in files:
logging.warning("No manifest found."
" Attempting to generate manifest from legacy file"
" paths.")
return self._GenerateLegacyManifest(key, files)
with open(manifest_path, "r") as manifest_file:
manifest = json.load(manifest_file)
if "schemaVersion" not in manifest:
logging.error("schema is missing a version number."
" Generating legacy manifest instead...")
return self._GenerateLegacyManifest(key, files)
return manifest
@staticmethod
def _GetDateTimeStr():
return datetime.datetime.now().isoformat()
def _ConstructBazelString(self, build_file: Path, key: str) -> str:
# If `key` passed in is "latest", this updates the `key` to be the hash
# that "latest" points to.
if key == 'latest':
key = self.available['latest']
manifest = self.GetFromCache(key)
designs = collections.defaultdict(dict)
if manifest["schemaVersion"] > MANIFEST_SCHEMA_VERSION:
logging.warning("Warning: Manifest is newer than available schemas")
logging.warning("Will try parsing an available schema with highest version")
manifest["schemaVersion"] = MANIFEST_SCHEMA_VERSION
if manifest["schemaVersion"] == 1:
schema_path = os.path.join(MANIFESTS_DIR, "bitstreams_manifest.schema.json")
with open(schema_path) as schema_file:
schema = json.load(schema_file)
jsonschema.validate(manifest, schema)
output_files = manifest["outputFiles"]
for output in output_files:
metadata = output_files[output]["outputInfo"]
design_name = metadata["design"]
design = designs[design_name]
if metadata["@type"] == "bitstreamInfo":
design["bitstream"] = output
elif metadata["@type"] == "memoryMapInfo":
mmi_id = metadata["memoryId"] + "_mmi"
if mmi_id in design:
logging.error("Unexpected duplicate memoryId " + metadata["memoryId"])
sys.exit(1)
design[mmi_id] = output
bazel_lines = [
'# This file was autogenerated. Do not edit!',
'# Built at {}.'.format(BitstreamCache._GetDateTimeStr()),
'# Configured for bitstream: {}'.format(key),
'',
'package(default_visibility = ["//visibility:public"])',
'',
'exports_files(glob(["cache/**"]))',
'',
]
def filegroup_lines(name, src):
return [
'filegroup(',
' name = "{}",'.format(name),
' srcs = ["{}"],'.format(src),
')',
'',
]
used_target_names: Set[str] = set()
cache_base_dir = os.path.join("cache", key)
for design_name in sorted(designs.keys()):
design = designs[design_name]
if "bitstream" not in design:
error_msg_lines = [
"Could not find the bitstreams to generate a BUILD file:" +
repr(build_file),
"in design " + design_name + ":" + repr(design),
"using key:" + repr(key),
]
logging.error('\n'.join(error_msg_lines))
sys.exit(1)
for target in sorted(design.keys()):
target_file = os.path.join(cache_base_dir, design[target])
target_name = "_".join([design_name, target])
if target_name in used_target_names:
logging.error(
"Target name {} for file {} would collide with another target"
.format(repr(target_name), repr(target_file)))
sys.exit(1)
used_target_names.add(target_name)
bazel_lines += filegroup_lines(target_name, target_file)
return '\n'.join(bazel_lines)
def WriteBuildFile(self, build_file: Path, key: str) -> str:
"""Write a BUILD file for the requested bitstream files.
Args:
build: path; Filename of the BUILD file to write.
key: str; A git hash or the string 'latest'.
Returns:
Either `key` or the corresponding commit hash if `key` is 'latest'.
"""
with open(build_file, 'wt') as f:
f.write(self._ConstructBazelString(build_file, key))
if key != 'latest':
return key
return self.available[key]
def main(argv):
# The user can override some basic behaviors with the BITSTREAM
# environment variable.
env = os.environ.get('BITSTREAM')
if env:
argv.extend(env.split(' '))
args = parser.parse_args(args=argv[1:])
desired_bitstream = args.bitstream
# We need to know the location of the main git repo, since this script
# will have its CWD in a bazel-managed 'external' directory.
# If not provided, we assume this script itself is located in the main
# git repository.
if args.repo:
if os.path.isdir(args.repo):
repo = args.repo
else:
repo = os.path.dirname(args.repo)
else:
repo = os.path.dirname(argv[0])
cache = BitstreamCache(args.bucket_url, args.cache, args.latest_update,
args.offline)
cache.InitRepository()
# Do we need a refresh?
need_refresh = (args.refresh or desired_bitstream != 'latest' or
cache.NeedRefresh(args.refresh_time))
cache.GetBitstreamsAvailable(need_refresh and not args.offline)
# If commanded to print bitstream availability, do so.
if args.list:
for k, v in cache.available.items():
print('{}: {}'.format(k, v))
# If we aren't getting the latest bitstream, resolve the hash to the closest
# bitstream we can find.
if desired_bitstream != 'latest':
closest_bitstream = cache.GetClosest(repo, desired_bitstream)
if closest_bitstream is None:
logging.error('Cannot find a bitstream close to {}'.format(
desired_bitstream))
return 1
if closest_bitstream != desired_bitstream:
logging.warning('Closest bitstream to {} is {}.'.format(
desired_bitstream, closest_bitstream))
desired_bitstream = closest_bitstream
# Write a build file which allows tests to reference the bitstreams with
# the labels:
# @bitstreams//:{design}_bitstream
configured_bitream = cache.WriteBuildFile(args.build_file,
desired_bitstream)
if desired_bitstream != 'latest' and configured_bitream != desired_bitstream:
logging.error(
'Configured bitstream {} does not match desired bitstream {}.'.
format(configured_bitream, desired_bitstream))
return 1
logging.info(
'Configured latest bitstream as {}.'.format(configured_bitream))
return 0
if __name__ == '__main__':
sys.exit(main(sys.argv))