pw_tokenizer: Python tokenization domain support
- When reading ELF files, a domain from which to read strings may be
specified by appending #DOMAIN_NAME to the path (e.g.
path/to/file.elf#test_domain).
- Add tests for the database.py command line interface.
- Improve type annotations in a few places.
Change-Id: I70140c04d1e504f6880af34a2e48df647ffd9738
diff --git a/pw_tokenizer/docs.rst b/pw_tokenizer/docs.rst
index 6d57d49..95e8209 100644
--- a/pw_tokenizer/docs.rst
+++ b/pw_tokenizer/docs.rst
@@ -371,6 +371,18 @@
// Tokenizes this string to the "my_custom_domain" domain.
PW_TOKENIZE_STRING_DOMAIN("my_custom_domain", "Hello, world!");
+The database and detokenization command line tools default to reading from the
+default domain. The domain may be specified for ELF files by appending
+``#DOMAIN_NAME`` to the file path. Use ``#.*`` to read from all domains. For
+example, the following reads strings in ``some_domain`` from ``my_image.elf``.
+
+.. code-block:: sh
+
+ ./database.py create --database my_db.csv path/to/my_image.elf#some_domain
+
+See `Managing token databases`_ for information about the ``database.py``
+command line tool.
+
Token databases
===============
Token databases store a mapping of tokens to the strings they represent. An ELF
@@ -440,7 +452,7 @@
Invoke ``database.py`` with ``-h`` for full usage information.
An example ELF file with tokenized logs is provided at
-``pw_tokenizer/py/example_binary_with_tokenized_logs.elf``. You can use that
+``pw_tokenizer/py/example_binary_with_tokenized_strings.elf``. You can use that
file to experiment with the ``database.py`` commands.
Create a database
diff --git a/pw_tokenizer/py/database_test.py b/pw_tokenizer/py/database_test.py
new file mode 100755
index 0000000..cba994f
--- /dev/null
+++ b/pw_tokenizer/py/database_test.py
@@ -0,0 +1,198 @@
+#!/usr/bin/env python3
+# Copyright 2020 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""Tests for the database module."""
+
+import io
+from pathlib import Path
+import shutil
+import sys
+import tempfile
+import unittest
+from unittest import mock
+
+from pw_tokenizer import database
+
+ELF = Path(__file__).parent / 'example_binary_with_tokenized_strings.elf'
+
+CSV_DEFAULT_DOMAIN = '''\
+00000000, ,""
+141c35d5, ,"The answer: ""%s"""
+2b78825f, ,"[:-)"
+2e668cd6, ,"Jello, world!"
+31631781, ,"%d"
+61fd1e26, ,"%ld"
+68ab92da, ,"%s there are %x (%.2f) of them%c"
+7b940e2a, ,"Hello %s! %hd %e"
+7da55d52, ,">:-[]"
+851beeb6, ,"%u %d"
+881436a0, ,"The answer is: %s"
+88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
+ad002c97, ,"%llx"
+b3653e13, ,"Jello!"
+cc6d3131, ,"Jello?"
+e13b0f94, ,"%llu"
+e65aefef, ,"Won't fit : %s%d"
+'''
+
+CSV_TEST_DOMAIN = '''\
+00000000, ,""
+59b2701c, ,"The answer was: %s"
+881436a0, ,"The answer is: %s"
+'''
+
+CSV_ALL_DOMAINS = '''\
+00000000, ,""
+141c35d5, ,"The answer: ""%s"""
+2b78825f, ,"[:-)"
+2e668cd6, ,"Jello, world!"
+31631781, ,"%d"
+59b2701c, ,"The answer was: %s"
+61fd1e26, ,"%ld"
+68ab92da, ,"%s there are %x (%.2f) of them%c"
+7b940e2a, ,"Hello %s! %hd %e"
+7da55d52, ,">:-[]"
+851beeb6, ,"%u %d"
+881436a0, ,"The answer is: %s"
+88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c"
+ad002c97, ,"%llx"
+b3653e13, ,"Jello!"
+cc6d3131, ,"Jello?"
+e13b0f94, ,"%llu"
+e65aefef, ,"Won't fit : %s%d"
+'''
+
+
+def run_cli(*args):
+ original_argv = sys.argv
+ sys.argv = ['database.py', *(str(a) for a in args)]
+ # pylint: disable=protected-access
+ try:
+ database._main(*database._parse_args())
+ finally:
+ # Remove the log handler added by _main to avoid duplicate logs.
+ if database._LOG.handlers:
+ database._LOG.handlers.pop()
+ # pylint: enable=protected-access
+
+ sys.argv = original_argv
+
+
+def _mock_output():
+ output = io.BytesIO()
+ output.name = '<fake stdout>'
+ return io.TextIOWrapper(output, write_through=True)
+
+
+REPORT_DEFAULT_DOMAIN = b'''\
+example_binary_with_tokenized_strings.elf]
+ Domain: default
+ Entries present: 17
+ Size of strings: 205 B
+ Total entries: 17
+ Total size of strings: 205 B
+ Collisions: 0 tokens
+'''
+
+REPORT_TEST_DOMAIN = b'''\
+example_binary_with_tokenized_strings.elf]
+ Domain: TEST_DOMAIN
+ Entries present: 3
+ Size of strings: 38 B
+ Total entries: 3
+ Total size of strings: 38 B
+ Collisions: 0 tokens
+'''
+
+
+class DatabaseCommandLineTest(unittest.TestCase):
+ """Tests the database.py command line interface."""
+ def setUp(self):
+ self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test'))
+ self._csv = self._dir / 'db.csv'
+
+ def tearDown(self):
+ shutil.rmtree(self._dir)
+
+ def test_create_csv(self):
+ run_cli('create', '--database', self._csv, ELF)
+
+ self.assertEqual(CSV_DEFAULT_DOMAIN, self._csv.read_text())
+
+ def test_create_csv_test_domain(self):
+ run_cli('create', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
+
+ self.assertEqual(CSV_TEST_DOMAIN, self._csv.read_text())
+
+ def test_create_csv_all_domains(self):
+ run_cli('create', '--database', self._csv, f'{ELF}#.*')
+
+ self.assertEqual(CSV_ALL_DOMAINS, self._csv.read_text())
+
+ def test_create_force(self):
+ self._csv.write_text(CSV_ALL_DOMAINS)
+
+ with self.assertRaises(FileExistsError):
+ run_cli('create', '--database', self._csv, ELF)
+
+ run_cli('create', '--force', '--database', self._csv, ELF)
+
+ def test_create_binary(self):
+ binary = self._dir / 'db.bin'
+ run_cli('create', '--type', 'binary', '--database', binary, ELF)
+
+ # Write the binary database as CSV to verify its contents.
+ run_cli('create', '--database', self._csv, binary)
+
+ self.assertEqual(CSV_DEFAULT_DOMAIN, self._csv.read_text())
+
+ def test_add(self):
+ self._csv.write_text(CSV_ALL_DOMAINS)
+
+ run_cli('add', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
+ self.assertEqual(CSV_ALL_DOMAINS, self._csv.read_text())
+
+ def test_mark_removals(self):
+ self._csv.write_text(CSV_ALL_DOMAINS)
+
+ run_cli('mark_removals', '--database', self._csv, '--date',
+ '1998-09-04', f'{ELF}#default')
+
+ # Add the removal date to the token not in the default domain
+ new_csv = CSV_ALL_DOMAINS.replace('59b2701c, ,',
+ '59b2701c,1998-09-04,')
+ self.assertNotEqual(CSV_ALL_DOMAINS, new_csv)
+
+ self.assertEqual(new_csv, self._csv.read_text())
+
+ def test_purge(self):
+ self._csv.write_text(CSV_ALL_DOMAINS)
+
+ # Mark everything not in TEST_DOMAIN as removed.
+ run_cli('mark_removals', '--database', self._csv, f'{ELF}#TEST_DOMAIN')
+
+ # Delete all entries except those in TEST_DOMAIN.
+ run_cli('purge', '--database', self._csv)
+
+ self.assertEqual(CSV_TEST_DOMAIN, self._csv.read_text())
+
+ @mock.patch('sys.stdout', new_callable=_mock_output)
+ def test_report(self, mock_stdout):
+ run_cli('report', ELF)
+ self.assertIn(REPORT_DEFAULT_DOMAIN, mock_stdout.buffer.getvalue())
+ self.assertIn(REPORT_TEST_DOMAIN, mock_stdout.buffer.getvalue())
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/pw_tokenizer/py/detokenize_test.py b/pw_tokenizer/py/detokenize_test.py
index 770484d..300e782 100755
--- a/pw_tokenizer/py/detokenize_test.py
+++ b/pw_tokenizer/py/detokenize_test.py
@@ -82,48 +82,45 @@
b'\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00'
b'\x00\x00\x00')
-# This is an ELF file with only .tokenized and .tokenizer_info sections.
+# This is an ELF file with only .pw_tokenized and .pw_tokenizer_info sections.
# It was created from the ELF file for tokenize_test.cc with the command:
#
-# arm-none-eabi-objcopy -S --only-section ".tokenize*" <ELF> <OUTPUT>
+# arm-none-eabi-objcopy -S --only-section ".pw_tokenize*" <ELF> <OUTPUT>
#
# The resulting ELF was converted to a Python binary string using
# path_to_byte_string function above. The file is also included in the repo as
-# example_binary_with_tokenized_logs.elf.
+# example_binary_with_tokenized_strings.elf.
ELF_WITH_TOKENIZER_SECTIONS = (
b'\x7fELF\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00(\x00\x01'
- b'\x00\x00\x00\xd1\x83\x00\x084\x00\x00\x00\x04\x03\x00\x00\x00\x04\x00\x05'
- b'4\x00 \x00\x05\x00(\x00\x04\x00\x03\x00\x01\x00\x00\x00\xd4\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00'
- b'\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00'
- b'\x01\x00\x01\x00\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00'
- b'\x00\x00\xd4\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00\xd4\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x06\x00\x00\x00\x00\x00\x01\x00%llu\x00\x00\x00\x00%llx\x00\x00\x00\x00'
- b'%u %d\x00\x00\x00The answer: "%s"\x00\x00\x00\x00Jello, world!\x00\x00'
- b'\x00Jello!\x00\x00Jello?\x00\x00%s there are %x (%.2f) of them%c\x00\x00'
- b'\x00\x00The answer is: %s\x00\x00\x00%x%lld%1.2f%s\x00\x00\x00The answ'
- b'er is: %s\x00\x00\x00%ld\x00%d\x00\x00%ld\x00The answer is: %s\x00\x00'
- b'\x00The answer is: %s\x00\x00\x00The answer is: %s\x00\x00\x00The answ'
- b'er is: %s\x00\x00\x00The answer is: %s\x00\x00\x00Hello %s! %hd %e\x00'
- b'\x00\x00\x00%u%d%02x%X%hu%hhu%d%ld%lu%lld%llu%c%c%c\x00%u%d%02x%X%hu%h'
- b'hu%d%ld%lu%lld%llu%c%c%c\x00%u%d%02x%X%hu%hhu%d%ld%lu%lld%llu%c%c%c\x00'
- b'Won\'t fit : %s%d\x00\x00\x00\x00hash_length\x00`\x00\x00\x00sizeof_l\x00'
- b'\x00\x00\x00\x04\x00\x00\x00sizeof_j\x00\x00\x00\x00\x08\x00\x00\x00si'
- b'zeof_z\x00\x00\x00\x00\x04\x00\x00\x00sizeof_t\x00\x00\x00\x00\x04\x00'
- b'\x00\x00\x00.shstrtab\x00.tokenized\x00.tokenizer_info\x00\x00\x00\x00'
+ b'\x00\x00\x00!G\x00\x084\x00\x00\x00\xd4\x02\x00\x00\x00\x04\x00\x054\x00'
+ b' \x00\x04\x00(\x00\x04\x00\x03\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00'
+ b'\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00'
+ b'\x00\x00\x00\x01\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00\x02\x00\x08\x00'
+ b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x01'
+ b'\x00\x01\x00\x00\x00\xb4\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x00\x00\x00'
+ b'\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x01\x00\x01\x00\x00\x00'
+ b'\xb4\x00\x00\x00\x18D\x00 \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ b'\x00\x06\x00\x00\x00\x00\x00\x01\x00Hello %s! %hd %e\x00\x00\x00\x00%u'
+ b'%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c\x00%u%d%02x%X%hu%hhd%d%ld%lu%lld'
+ b'%llu%c%c%c\x00Won\'t fit : %s%d\x00\x00\x00\x00%llx\x00\x00\x00\x00%ld'
+ b'\x00%d\x00\x00%ld\x00The answer is: %s\x00\x00\x00The answer is: %s\x00'
+ b'\x00\x00The answer is: %s\x00\x00\x00The answer is: %s\x00\x00\x00The '
+ b'answer is: %s\x00\x00\x00The answer is: %s\x00\x00\x00The answer is: %'
+ b's\x00\x00\x00The answer is: %s\x00\x00\x00%u %d\x00\x00\x00The answer:'
+ b' "%s"\x00\x00\x00\x00Jello, world!\x00\x00\x00Jello!\x00\x00Jello?\x00'
+ b'\x00%s there are %x (%.2f) of them%c\x00\x00\x00\x00The answer is: %s\x00'
+ b'\x00\x00\x00\x00\x00\x00[:-)\x00\x00\x00\x00>:-[]\x00\x00\x00%llu\x00\x00'
+ b'\x00\x00The answer was: %s\x00\x00The answer is: %s\x00\x00.shstrtab\x00'
+ b'.pw_tokenized.default\x00.pw_tokenized.TEST_DOMAIN\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
b'\x00\x00\x00\x0b\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\xd4\x00\x00\x00\xb5\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04'
- b'\x00\x00\x00\x00\x00\x00\x00\x16\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x8c\x02\x00\x00P\x00\x00\x00\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x00\x00\x00\xdc\x02\x00\x00&\x00\x00\x00\x00\x00\x00'
- b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00')
+ b'\x00\xb4\x00\x00\x00\xb9\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04'
+ b'\x00\x00\x00\x00\x00\x00\x00!\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00'
+ b'\x00\x00\x00\x00p\x02\x00\x00&\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
+ b'\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00'
+ b'\x00\x00\x00\x00\x00\x00\x00\x96\x02\x00\x00;\x00\x00\x00\x00\x00\x00\x00'
+ b'\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00')
# 0x2e668cd6 is 'Jello, world!' (which is also used in database_test.py).
JELLO_WORLD_TOKEN = b'\xd6\x8c\x66\x2e'
@@ -286,7 +283,7 @@
expected_tokens = frozenset(detok.database.token_to_entries.keys())
csv_database = str(detok.database)
- self.assertEqual(len(csv_database.splitlines()), 16)
+ self.assertEqual(len(csv_database.splitlines()), 17)
with tempfile.NamedTemporaryFile('r+') as csv_file:
csv_file.write(csv_database)
@@ -390,7 +387,7 @@
def test_update(self, mock_getmtime):
db = database.load_token_database(
io.BytesIO(ELF_WITH_TOKENIZER_SECTIONS))
- self.assertEqual(len(db), 16)
+ self.assertEqual(len(db), 17)
the_time = [100]
diff --git a/pw_tokenizer/py/example_binary_with_tokenized_logs.elf b/pw_tokenizer/py/example_binary_with_tokenized_logs.elf
deleted file mode 100644
index 719661b..0000000
--- a/pw_tokenizer/py/example_binary_with_tokenized_logs.elf
+++ /dev/null
Binary files differ
diff --git a/pw_tokenizer/py/example_binary_with_tokenized_strings.elf b/pw_tokenizer/py/example_binary_with_tokenized_strings.elf
new file mode 100644
index 0000000..7313906
--- /dev/null
+++ b/pw_tokenizer/py/example_binary_with_tokenized_strings.elf
Binary files differ
diff --git a/pw_tokenizer/py/pw_tokenizer/database.py b/pw_tokenizer/py/pw_tokenizer/database.py
index 52d102f..5078baf 100755
--- a/pw_tokenizer/py/pw_tokenizer/database.py
+++ b/pw_tokenizer/py/pw_tokenizer/database.py
@@ -23,10 +23,11 @@
import glob
import logging
import os
+from pathlib import Path
import re
import struct
import sys
-from typing import Dict, Iterable
+from typing import Callable, Dict, Iterable, List
try:
from pw_tokenizer import elf_reader, tokens
@@ -39,22 +40,36 @@
_LOG = logging.getLogger('pw_tokenizer')
+DEFAULT_DOMAIN = 'default'
+
def _elf_reader(elf) -> elf_reader.Elf:
return elf if isinstance(elf, elf_reader.Elf) else elf_reader.Elf(elf)
-def _read_strings_from_elf(elf) -> Iterable[str]:
+def _read_strings_from_elf(elf, domain: str) -> Iterable[str]:
"""Reads the tokenized strings from an elf_reader.Elf or ELF file object."""
- sections = _elf_reader(elf).dump_sections(r'\.tokenized(\.\d+)?')
+ _LOG.debug('Reading tokenized strings in domain "%s" from %s', domain, elf)
+
+ sections = _elf_reader(elf).dump_sections(
+ rf'^\.pw_tokenized\.{domain}(?:\.\d+)?$')
if sections is not None:
for string in sections.split(b'\0'):
yield string.decode()
+def tokenization_domains(elf) -> Iterable[str]:
+ """Lists all tokenization domains in an ELF file."""
+ tokenized_section = re.compile(r'\.pw_tokenized\.(?P<domain>.+)(?:\.\d+)?')
+ for section in _elf_reader(elf).sections:
+ match = tokenized_section.match(section.name)
+ if match:
+ yield match.group('domain')
+
+
def read_tokenizer_metadata(elf) -> Dict[str, int]:
"""Reads the metadata entries from an ELF."""
- sections = _elf_reader(elf).dump_sections(r'\.tokenized\.meta')
+ sections = _elf_reader(elf).dump_sections(r'\.pw_tokenizer_info')
metadata: Dict[str, int] = {}
if sections is not None:
@@ -68,7 +83,7 @@
return metadata
-def _load_token_database(db) -> tokens.Database:
+def _load_token_database(db, domain: str) -> tokens.Database:
"""Loads a Database from a database object, ELF, CSV, or binary database."""
if db is None:
return tokens.Database()
@@ -77,25 +92,26 @@
return db
if isinstance(db, elf_reader.Elf):
- return tokens.Database.from_strings(_read_strings_from_elf(db))
+ return tokens.Database.from_strings(_read_strings_from_elf(db, domain))
# If it's a str, it might be a path. Check if it's an ELF or CSV.
- if isinstance(db, str):
+ if isinstance(db, (str, Path)):
if not os.path.exists(db):
raise FileNotFoundError(
- '"{}" is not a path to a token database'.format(db))
+ f'"{db}" is not a path to a token database')
# Read the path as an ELF file.
with open(db, 'rb') as fd:
if elf_reader.compatible_file(fd):
- return tokens.Database.from_strings(_read_strings_from_elf(fd))
+ return tokens.Database.from_strings(
+ _read_strings_from_elf(fd, domain))
# Read the path as a packed binary or CSV file.
return tokens.DatabaseFile(db)
# Assume that it's a file object and check if it's an ELF.
if elf_reader.compatible_file(db):
- return tokens.Database.from_strings(_read_strings_from_elf(db))
+ return tokens.Database.from_strings(_read_strings_from_elf(db, domain))
# Read the database as CSV or packed binary from a file object's path.
if hasattr(db, 'name') and os.path.exists(db.name):
@@ -105,9 +121,10 @@
return tokens.Database(tokens.parse_csv(db))
-def load_token_database(*databases) -> tokens.Database:
+def load_token_database(*databases,
+ domain: str = DEFAULT_DOMAIN) -> tokens.Database:
"""Loads a Database from database objects, ELFs, CSVs, or binary files."""
- return tokens.Database.merged(*(_load_token_database(db)
+ return tokens.Database.merged(*(_load_token_database(db, domain)
for db in databases))
@@ -126,8 +143,7 @@
}
-def _handle_create(elf_or_token_database, database, force, output_type,
- include, exclude):
+def _handle_create(databases, database, force, output_type, include, exclude):
"""Creates a token database file from one or more ELF files."""
if database == '-':
@@ -135,12 +151,11 @@
fd = sys.stdout.buffer
elif not force and os.path.exists(database):
raise FileExistsError(
- 'The file {} already exists! Use --force to overwrite.'.format(
- database))
+ f'The file {database} already exists! Use --force to overwrite.')
else:
fd = open(database, 'wb')
- database = tokens.Database.merged(*elf_or_token_database)
+ database = tokens.Database.merged(*databases)
database.filter(include, exclude)
with fd:
@@ -149,16 +164,16 @@
elif output_type == 'binary':
tokens.write_binary(database, fd)
else:
- raise ValueError('Unknown database type "{}"'.format(output_type))
+ raise ValueError(f'Unknown database type "{output_type}"')
_LOG.info('Wrote database with %d entries to %s as %s', len(database),
fd.name, output_type)
-def _handle_add(token_database, elf_or_token_database):
+def _handle_add(token_database, databases):
initial = len(token_database)
- for source in elf_or_token_database:
+ for source in databases:
token_database.add((entry.string for entry in source.entries()))
token_database.write_to_file()
@@ -167,10 +182,10 @@
len(token_database) - initial, token_database.path)
-def _handle_mark_removals(token_database, elf_or_token_database, date):
+def _handle_mark_removals(token_database, databases, date):
marked_removed = token_database.mark_removals(
(entry.string
- for entry in tokens.Database.merged(*elf_or_token_database).entries()
+ for entry in tokens.Database.merged(*databases).entries()
if not entry.date_removed), date)
token_database.write_to_file()
@@ -186,57 +201,98 @@
_LOG.info('Removed %d entries from %s', len(purged), token_database.path)
-def _handle_report(database, output):
- for path, db in database:
- output.write('{name}\n'
- ' Entries present: {present_entries}\n'
- ' Size of strings: {present_size_bytes} B\n'
- ' Total entries: {total_entries}\n'
- ' Total size of strings: {total_size_bytes} B\n'
- ' Collisions: {collisions} tokens\n'.format(
- name=path, **generate_report(db)))
+def _handle_report(token_database_or_elf, output):
+ for path in token_database_or_elf:
+ with path.open('rb') as file:
+ if elf_reader.compatible_file(file):
+ domains = list(tokenization_domains(file))
+ else:
+ domains = [path.name]
+
+ for domain in domains:
+ output.write(
+ '[{name}]\n'
+ ' Domain: {domain}\n'
+ ' Entries present: {present_entries}\n'
+ ' Size of strings: {present_size_bytes} B\n'
+ ' Total entries: {total_entries}\n'
+ ' Total size of strings: {total_size_bytes} B\n'
+ ' Collisions: {collisions} tokens\n'.format(
+ name=path,
+ domain=domain,
+ **generate_report(load_token_database(path,
+ domain=domain))))
-def expand_paths_or_globs(paths_or_globs: Iterable[str]) -> Iterable[str]:
+def expand_paths_or_globs(*paths_or_globs: str) -> Iterable[Path]:
"""Expands any globs in a list of paths; raises FileNotFoundError."""
for path_or_glob in paths_or_globs:
if os.path.exists(path_or_glob):
# This is a valid path; yield it without evaluating it as a glob.
- yield path_or_glob
+ yield Path(path_or_glob)
else:
paths = glob.glob(path_or_glob)
if not paths:
- raise FileNotFoundError(
- '{} is not a valid path'.format(path_or_glob))
+ raise FileNotFoundError(f'{path_or_glob} is not a valid path')
for path in paths:
- yield path
+ yield Path(path)
-class LoadTokenDatabase(argparse.Action):
- """Argparse action that reads tokenized logs from paths or glob patterns."""
- def __init__(self, option_strings, dest, include_paths=False, **kwargs):
- """Accepts arguments passed in add_argument.
+class ExpandGlobs(argparse.Action):
+ """Argparse action that expands and appends paths."""
+ def __call__(self, parser, namespace, values, unused_option_string=None):
+ setattr(namespace, self.dest, list(expand_paths_or_globs(*values)))
- Args:
- option_strings: Forwarded to base argparse.Action.
- dest: The name of the argument to set; forwarded to base argparse.Action.
- include_paths: Whether to include the paths to the files for each database
- in addition to the database itself; if True, a list of (path, database)
- tuples is produced.
- **kwargs: Any other arguments to add_argument.
- """
- super(LoadTokenDatabase, self).__init__(option_strings, dest, **kwargs)
- if include_paths: # Make a (path, tokens.Database) tuple for each path.
- self._load_db = lambda path: (path, load_token_database(path))
- else:
- self._load_db = load_token_database
+def _read_elf_with_domain(elf: str, domain: str) -> Iterable[tokens.Database]:
+ for path in expand_paths_or_globs(elf):
+ with path.open('rb') as file:
+ if not elf_reader.compatible_file(file):
+ raise ValueError(f'{elf} is not an ELF file, '
+ f'but the "{domain}" domain was specified')
+ yield tokens.Database.from_strings(
+ _read_strings_from_elf(file, domain))
+
+
+class _LoadTokenDatabases(argparse.Action):
+ """Argparse action that reads tokenize databases from paths or globs."""
def __call__(self, parser, namespace, values, option_string=None):
- setattr(
- namespace, self.dest,
- [self._load_db(path) for path in expand_paths_or_globs(values)])
+ databases: List[tokens.Database] = []
+ paths: List[Path] = []
+
+ try:
+ for value in values:
+ if value.count('#') == 1:
+ databases.extend(_read_elf_with_domain(*value.split('#')))
+ else:
+ paths.extend(expand_paths_or_globs(value))
+
+ databases += (load_token_database(path) for path in paths)
+ except (FileNotFoundError, ValueError) as err:
+ parser.error(f'argument elf_or_token_database: {err}')
+
+ setattr(namespace, self.dest, databases)
+
+
+def token_databases_parser() -> argparse.ArgumentParser:
+ """Returns an argument parser for reading token databases.
+
+ These arguments can be added to another parser using the parents arg.
+ """
+ parser = argparse.ArgumentParser(add_help=False)
+ parser.add_argument(
+ 'databases',
+ metavar='elf_or_token_database',
+ nargs='+',
+ action=_LoadTokenDatabases,
+ help=('ELF or token database files from which to read strings and '
+ 'tokens. For ELF files, the tokenization domain to read from '
+ 'may specified after the path as #domain_name (e.g. '
+ 'foo.elf#TEST_DOMAIN). Unless specified, only the default '
+ 'domain is read from ELF files; .* reads all domains.'))
+ return parser
def _parse_args():
@@ -258,14 +314,7 @@
required=True,
help='The database file to update.')
- option_tokens = argparse.ArgumentParser(add_help=False)
- option_tokens.add_argument(
- 'elf_or_token_database',
- nargs='+',
- action=LoadTokenDatabase,
- help=(
- 'ELF files or token database files from which to read strings and '
- 'tokens.'))
+ option_tokens = token_databases_parser()
# Top-level argument parser.
parser = argparse.ArgumentParser(
@@ -358,10 +407,9 @@
help='Prints a report about a database.')
subparser.set_defaults(handler=_handle_report)
subparser.add_argument(
- 'database',
+ 'token_database_or_elf',
nargs='+',
- action=LoadTokenDatabase,
- include_paths=True,
+ action=ExpandGlobs,
help='The ELF files or token databases about which to generate reports.'
)
subparser.add_argument(
@@ -371,7 +419,12 @@
default=sys.stdout,
help='The file to which to write the output; use - for stdout.')
- return parser.parse_args()
+ args = parser.parse_args()
+
+ handler = args.handler
+ del args.handler
+
+ return handler, args
def _init_logging(level: int) -> None:
@@ -386,15 +439,11 @@
_LOG.addHandler(log_to_stderr)
-def _main(args: argparse.Namespace) -> int:
+def _main(handler: Callable, args: argparse.Namespace) -> int:
_init_logging(logging.INFO)
-
- handler = args.handler
- del args.handler
-
handler(**vars(args))
return 0
if __name__ == '__main__':
- sys.exit(_main(_parse_args()))
+ sys.exit(_main(*_parse_args()))
diff --git a/pw_tokenizer/py/pw_tokenizer/detokenize.py b/pw_tokenizer/py/pw_tokenizer/detokenize.py
index e725481..2ec0369 100755
--- a/pw_tokenizer/py/pw_tokenizer/detokenize.py
+++ b/pw_tokenizer/py/pw_tokenizer/detokenize.py
@@ -438,16 +438,13 @@
subparsers = parser.add_subparsers(help='Encoding of the input.')
base64_help = 'Detokenize Base64-encoded data from a file or stdin.'
- subparser = subparsers.add_parser('base64',
- description=base64_help,
- help=base64_help)
+ subparser = subparsers.add_parser(
+ 'base64',
+ description=base64_help,
+ parents=[database.token_databases_parser()],
+ help=base64_help)
subparser.set_defaults(handler=_handle_base64)
subparser.add_argument(
- 'databases',
- nargs='+',
- action=database.LoadTokenDatabase,
- help='Databases (ELF, binary, or CSV) to use to lookup tokens.')
- subparser.add_argument(
'-i',
'--input',
dest='input_file',
diff --git a/pw_tokenizer/py/pw_tokenizer/tokens.py b/pw_tokenizer/py/pw_tokenizer/tokens.py
index e75261b..4a416bf 100644
--- a/pw_tokenizer/py/pw_tokenizer/tokens.py
+++ b/pw_tokenizer/py/pw_tokenizer/tokens.py
@@ -18,6 +18,7 @@
from datetime import datetime
import io
import logging
+from pathlib import Path
import re
import struct
from typing import BinaryIO, Callable, Dict, Iterable, List, NamedTuple
@@ -52,6 +53,9 @@
return pw_tokenizer_65599_fixed_length_hash(string, DEFAULT_HASH_LENGTH)
+_EntryKey = Tuple[int, str] # Key for uniquely referring to an entry
+
+
class TokenizedStringEntry:
"""A tokenized string with its metadata."""
def __init__(self,
@@ -62,7 +66,7 @@
self.string = string
self.date_removed = date_removed
- def key(self) -> Tuple[int, str]:
+ def key(self) -> _EntryKey:
"""The key determines uniqueness for a tokenized string."""
return self.token, self.string
@@ -103,7 +107,10 @@
tokenize: Callable[[str], int] = default_hash):
"""Creates a token database."""
# The database dict stores each unique (token, string) entry.
- self._database: dict = {entry.key(): entry for entry in entries}
+ self._database: Dict[_EntryKey, TokenizedStringEntry] = {
+ entry.key(): entry
+ for entry in entries
+ }
self.tokenize = tokenize
# This is a cache for fast token lookup that is built as needed.
@@ -211,12 +218,12 @@
date_removed_cutoff = datetime.max
to_delete = [
- key for key, entry in self._database.items()
+ entry for _, entry in self._database.items()
if entry.date_removed and entry.date_removed <= date_removed_cutoff
]
- for key in to_delete:
- del self._database[key]
+ for entry in to_delete:
+ del self._database[entry.key()]
return to_delete
@@ -242,7 +249,7 @@
"""
self._cache = None
- to_delete: List[Tuple] = []
+ to_delete: List[_EntryKey] = []
if include:
include_re = [re.compile(pattern) for pattern in include]
@@ -393,22 +400,22 @@
This class adds the write_to_file() method that writes to file from which it
was created in the correct format (CSV or binary).
"""
- def __init__(self, path: str):
- self.path = path
+ def __init__(self, path: Union[Path, str]):
+ self.path = Path(path)
# Read the path as a packed binary file.
- with open(self.path, 'rb') as fd:
+ with self.path.open('rb') as fd:
if file_is_binary_database(fd):
super().__init__(parse_binary(fd))
self._export = write_binary
return
# Read the path as a CSV file.
- with open(self.path, 'r', newline='') as file:
+ with self.path.open('r', newline='') as file:
super().__init__(parse_csv(file))
self._export = write_csv
- def write_to_file(self, path: Optional[str] = None) -> None:
+ def write_to_file(self, path: Optional[Union[Path, str]] = None) -> None:
"""Exports in the original format to the original or provided path."""
with open(self.path if path is None else path, 'wb') as fd:
self._export(self, fd)