pw_tokenizer: Support archive files in elf_reader - Support archive files by reading all ELFs contained within them. - Make minor changes to support reading an ELF from within another file. - Add tests for archive files. Change-Id: I478c443c4f780a7399022543aef0d5f0aae6e2b8
diff --git a/pw_tokenizer/py/detokenize_test.py b/pw_tokenizer/py/detokenize_test.py index be1a4b7..ef079a5 100755 --- a/pw_tokenizer/py/detokenize_test.py +++ b/pw_tokenizer/py/detokenize_test.py
@@ -275,6 +275,7 @@ frozenset(detok.database.token_to_entries.keys())) # Open ELF by elf_reader.Elf + elf.seek(0) detok = detokenize.Detokenizer(elf_reader.Elf(elf)) self.assertEqual(expected_tokens, frozenset(detok.database.token_to_entries.keys()))
diff --git a/pw_tokenizer/py/elf_reader_test.py b/pw_tokenizer/py/elf_reader_test.py index 050c7fc..e576598 100755 --- a/pw_tokenizer/py/elf_reader_test.py +++ b/pw_tokenizer/py/elf_reader_test.py
@@ -70,20 +70,24 @@ l (large), p (processor specific) """) +TEST_ELF_PATH = os.path.join(os.path.dirname(__file__), + 'elf_reader_test_binary.elf') + class ElfReaderTest(unittest.TestCase): """Tests the elf_reader.Elf class.""" def setUp(self): super().setUp() - elf_path = os.path.join(os.path.dirname(__file__), - 'elf_reader_test_binary.elf') - self._elf_file = open(elf_path, 'rb') + self._elf_file = open(TEST_ELF_PATH, 'rb') self._elf = elf_reader.Elf(self._elf_file) def tearDown(self): super().tearDown() self._elf_file.close() + def _section(self, name): + return next(self._elf.sections_with_name(name)) + def test_readelf_comparison_using_the_readelf_binary(self): """Compares elf_reader to readelf's output.""" @@ -120,15 +124,15 @@ self.assertEqual(section.offset, offset) self.assertEqual(section.size, size) - def test_dump_section(self): - self.assertEqual(self._elf.dump_section('.test_section_1'), + def test_dump_single_section(self): + self.assertEqual(self._elf.dump_sections(r'\.test_section_1'), b'You cannot pass\0') - self.assertEqual(self._elf.dump_section('.test_section_2'), + self.assertEqual(self._elf.dump_sections(r'\.test_section_2'), b'\xef\xbe\xed\xfe') - def test_dump_sections(self): - if (self._elf.sections_by_name['.test_section_1'].address < - self._elf.sections_by_name['.test_section_2'].address): + def test_dump_multiple_sections(self): + if (self._section('.test_section_1').address < + self._section('.test_section_2').address): contents = b'You cannot pass\0\xef\xbe\xed\xfe' else: contents = b'\xef\xbe\xed\xfeYou cannot pass\0' @@ -136,11 +140,10 @@ self.assertIn(self._elf.dump_sections(r'.test_section_\d'), contents) def test_read_values(self): - string_address = self._elf.sections_by_name['.test_section_1'].address - self.assertEqual(self._elf.read_value(string_address), - b'You cannot pass') + address = self._section('.test_section_1').address + self.assertEqual(self._elf.read_value(address), b'You cannot pass') - int32_address = self._elf.sections_by_name['.test_section_2'].address + int32_address = self._section('.test_section_2').address self.assertEqual(self._elf.read_value(int32_address, 4), b'\xef\xbe\xed\xfe') @@ -152,6 +155,94 @@ self.assertEqual(elf_reader.read_c_string(bytes_io), b'No terminator!') self.assertEqual(elf_reader.read_c_string(bytes_io), b'') + def test_compatible_file_for_elf(self): + self.assertTrue(elf_reader.compatible_file(self._elf_file)) + self.assertTrue(elf_reader.compatible_file(io.BytesIO(b'\x7fELF'))) + + def test_compatible_file_for_elf_start_at_offset(self): + self._elf_file.seek(13) # Seek ahead to get out of sync + self.assertTrue(elf_reader.compatible_file(self._elf_file)) + self.assertEqual(13, self._elf_file.tell()) + + def test_compatible_file_for_invalid_elf(self): + self.assertFalse(elf_reader.compatible_file(io.BytesIO(b'\x7fELVESF'))) + + +def _archive_file(data: bytes) -> bytes: + return ('FILE ID 90123456' + 'MODIFIED 012' + 'OWNER ' + 'GROUP ' + 'MODE 678' + f'{len(data):10}' # File size -- the only part that's needed. + '`\n'.encode() + data) + + +class ArchiveTest(unittest.TestCase): + """Tests reading from archive files.""" + def setUp(self): + super().setUp() + + with open(TEST_ELF_PATH, 'rb') as fd: + self._elf_data = fd.read() + + self._archive_entries = b'blah', b'hello', self._elf_data + + self._archive_data = elf_reader.ARCHIVE_MAGIC + b''.join( + _archive_file(f) for f in self._archive_entries) + self._archive = io.BytesIO(self._archive_data) + + def test_compatible_file_for_archive(self): + self.assertTrue(elf_reader.compatible_file(io.BytesIO(b'!<arch>\n'))) + self.assertTrue(elf_reader.compatible_file(self._archive)) + + def test_compatible_file_for_invalid_archive(self): + self.assertFalse(elf_reader.compatible_file(io.BytesIO(b'!<arch>'))) + + def test_iterate_over_files(self): + for expected, size in zip(self._archive_entries, + elf_reader.files_in_archive(self._archive)): + self.assertEqual(expected, self._archive.read(size)) + + def test_iterate_over_empty_archive(self): + with self.assertRaises(StopIteration): + next(iter(elf_reader.files_in_archive(io.BytesIO(b'!<arch>\n')))) + + def test_iterate_over_invalid_archive(self): + with self.assertRaises(elf_reader.FileDecodeError): + for _ in elf_reader.files_in_archive( + io.BytesIO(b'!<arch>blah blahblah')): + pass + + def test_iterate_over_archive_with_invalid_size(self): + data = elf_reader.ARCHIVE_MAGIC + _archive_file(b'$' * 3210) + file = io.BytesIO(data) + + # Iterate over the file normally. + for size in elf_reader.files_in_archive(file): + self.assertEqual(b'$' * 3210, file.read(size)) + + # Replace the size with a hex number, which is not valid. + with self.assertRaises(elf_reader.FileDecodeError): + for _ in elf_reader.files_in_archive( + io.BytesIO(data.replace(b'3210', b'0x99'))): + pass + + def test_elf_reader_dump_single_section(self): + elf = elf_reader.Elf(self._archive) + self.assertEqual(elf.dump_sections(r'\.test_section_1'), + b'You cannot pass\0') + self.assertEqual(elf.dump_sections(r'\.test_section_2'), + b'\xef\xbe\xed\xfe') + + def test_elf_reader_read_values(self): + elf = elf_reader.Elf(self._archive) + address = next(elf.sections_with_name('.test_section_1')).address + self.assertEqual(elf.read_value(address), b'You cannot pass') + + int32_address = next(elf.sections_with_name('.test_section_2')).address + self.assertEqual(elf.read_value(int32_address, 4), b'\xef\xbe\xed\xfe') + if __name__ == '__main__': unittest.main()
diff --git a/pw_tokenizer/py/pw_tokenizer/database.py b/pw_tokenizer/py/pw_tokenizer/database.py index a717321..cf9319d 100755 --- a/pw_tokenizer/py/pw_tokenizer/database.py +++ b/pw_tokenizer/py/pw_tokenizer/database.py
@@ -54,7 +54,7 @@ def read_tokenizer_metadata(elf) -> Dict[str, int]: """Reads the metadata entries from an ELF.""" - sections = _elf_reader(elf).dump_section('.tokenized.meta') + sections = _elf_reader(elf).dump_sections(r'\.tokenized\.meta') metadata: Dict[str, int] = {} if sections is not None: @@ -87,14 +87,14 @@ # Read the path as an ELF file. with open(db, 'rb') as fd: - if elf_reader.file_is_elf(fd): + if elf_reader.compatible_file(fd): return tokens.Database.from_strings(_read_strings_from_elf(fd)) # Read the path as a packed binary or CSV file. return tokens.DatabaseFile(db) # Assume that it's a file object and check if it's an ELF. - if elf_reader.file_is_elf(db): + if elf_reader.compatible_file(db): return tokens.Database.from_strings(_read_strings_from_elf(db)) # Read the database as CSV or packed binary from a file object's path.
diff --git a/pw_tokenizer/py/pw_tokenizer/elf_reader.py b/pw_tokenizer/py/pw_tokenizer/elf_reader.py index 3788b05..079e808 100755 --- a/pw_tokenizer/py/pw_tokenizer/elf_reader.py +++ b/pw_tokenizer/py/pw_tokenizer/elf_reader.py
@@ -17,14 +17,73 @@ This module provides tools for dumping the contents of an ELF section. It can also be used to read values at a particular address. A command line interface for both of these features is provided. + +This module supports any ELF-format file, including .o and .so files. This +module also has basic support for archive (.a) files. All ELF files in an +archive are read as one unit. """ import argparse -import collections import re import struct import sys -from typing import BinaryIO, Dict, Iterable, NamedTuple, Optional, Tuple, Union +from typing import BinaryIO, Dict, Iterable, NamedTuple, Optional +from typing import Pattern, Tuple, Union + +ARCHIVE_MAGIC = b'!<arch>\n' +ELF_MAGIC = b'\x7fELF' + + +def _check_next_bytes(fd: BinaryIO, expected: bytes, what: str) -> None: + actual = fd.read(len(expected)) + if expected != actual: + raise FileDecodeError( + f'Invalid {what}: expected {expected!r}, found {actual!r}') + + +def files_in_archive(fd: BinaryIO) -> Iterable[int]: + """Seeks to each file in an archive and yields its size.""" + + _check_next_bytes(fd, ARCHIVE_MAGIC, 'archive magic number') + + while True: + # Each file in an archive is prefixed with an ASCII header: + # + # 16 B - file identifier (text) + # 12 B - file modification timestamp (decimal) + # 6 B - owner ID (decimal) + # 6 B - group ID (decimal) + # 8 B - file mode (octal) + # 10 B - file size in bytes (decimal) + # 2 B - ending characters (`\n) + # + # Skip the unused portions of the file header, then read the size. + fd.seek(16 + 12 + 6 + 6 + 8, 1) + size_str = fd.read(10) + if not size_str: + return + + try: + size = int(size_str, 10) + except ValueError as exc: + raise FileDecodeError( + 'Archive file sizes must be decimal integers') from exc + + _check_next_bytes(fd, b'`\n', 'archive file header ending') + offset = fd.tell() # Store offset in case the caller reads the file. + + yield size + + fd.seek(offset + size) + + +def _elf_files_in_archive(fd: BinaryIO): + if _bytes_match(fd, ELF_MAGIC): + yield # The value isn't used, so just yield None. + else: + for _ in files_in_archive(fd): + if _bytes_match(fd, ELF_MAGIC): + yield class Field(NamedTuple): @@ -76,35 +135,41 @@ string += byte -def file_is_elf(fd: BinaryIO) -> bool: - """Returns true if the provided file starts with the ELF magic number.""" +def _bytes_match(fd: BinaryIO, expected: bytes) -> bool: + """Peeks at the next bytes to see if they match the expected.""" try: - fd.seek(0) - magic_number = fd.read(4) - fd.seek(0) - return magic_number == b'\x7fELF' + offset = fd.tell() + data = fd.read(len(expected)) + fd.seek(offset) + return data == expected except IOError: return False -class ElfDecodeError(Exception): +def compatible_file(fd: BinaryIO) -> bool: + """True if the file type is supported (ELF or archive).""" + offset = fd.tell() + fd.seek(0) + result = _bytes_match(fd, ELF_MAGIC) or _bytes_match(fd, ARCHIVE_MAGIC) + fd.seek(offset) + return result + + +class FileDecodeError(Exception): """Invalid data was read from an ELF file.""" class FieldReader: """Reads ELF fields defined with a Field tuple from an ELF file.""" def __init__(self, elf: BinaryIO): - if not file_is_elf(elf): - raise ElfDecodeError(r"ELF files must start with b'\x7fELF'") - self._elf = elf + self.file_offset = self._elf.tell() + + _check_next_bytes(self._elf, ELF_MAGIC, 'ELF file header') + size_field = self._elf.read(1) # e_ident[EI_CLASS] (address size) int_unpacker = self._determine_integer_format() - # Set up decoding based on the address size - self._elf.seek(0x04) # e_ident[EI_CLASS] (address size) - size_field = self._elf.read(1) - if size_field == b'\x01': self.offset = lambda field: field.offset_32 self._size = lambda field: field.size_32 @@ -114,18 +179,17 @@ self._size = lambda field: field.size_64 self._decode = lambda f, d: int_unpacker[f.size_64].unpack(d)[0] else: - raise ElfDecodeError('Unknown size {!r}'.format(size_field)) + raise FileDecodeError('Unknown size {!r}'.format(size_field)) def _determine_integer_format(self) -> Dict[int, struct.Struct]: """Returns a dict of structs used for converting bytes to integers.""" - self._elf.seek(0x05) # e_ident[EI_DATA] (endianness) - endianness_byte = self._elf.read(1) + endianness_byte = self._elf.read(1) # e_ident[EI_DATA] (endianness) if endianness_byte == b'\x01': endianness = '<' elif endianness_byte == b'\x02': endianness = '>' else: - raise ElfDecodeError( + raise FileDecodeError( 'Unknown endianness {!r}'.format(endianness_byte)) return { @@ -136,24 +200,25 @@ } def read(self, field: Field, base: int = 0) -> int: - self._elf.seek(base + self.offset(field)) + self._elf.seek(self.file_offset + base + self.offset(field)) data = self._elf.read(self._size(field)) return self._decode(field, data) - def read_string(self, address: int) -> str: - self._elf.seek(address) + def read_string(self, offset: int) -> str: + self._elf.seek(self.file_offset + offset) return read_c_string(self._elf).decode() class Elf: """Represents an ELF file and the sections in it.""" - class Section: + class Section(NamedTuple): """Info about a section in an ELF file.""" - def __init__(self, name: str, address: int, offset: int, size: int): - self.name = name - self.address = address - self.offset = offset - self.size = size + name: str + address: int + offset: int + size: int + + file_offset: int # Starting place in the file; 0 unless in an archive. def range(self) -> range: return range(self.address, self.address + self.size) @@ -161,45 +226,38 @@ def __lt__(self, other) -> bool: return self.address < other.address - def __str__(self) -> str: - return ('Section(name={self.name}, address=0x{self.address:08x} ' - 'offset=0x{self.offset:x} size=0x{self.size:x})').format( - self=self) - - def __repr__(self) -> str: - return str(self) - def __init__(self, elf: BinaryIO): self._elf = elf self.sections: Tuple[Elf.Section, ...] = tuple(self._list_sections()) - self.sections_by_name: Dict[str, - Elf.Section] = collections.OrderedDict( - (section.name, section) - for section in self.sections) def _list_sections(self) -> Iterable['Elf.Section']: """Reads the section headers to enumerate all ELF sections.""" - reader = FieldReader(self._elf) - base = reader.read(FILE_HEADER.section_header_offset) - section_header_size = reader.offset(SECTION_HEADER.section_header_end) + for _ in _elf_files_in_archive(self._elf): + reader = FieldReader(self._elf) + base = reader.read(FILE_HEADER.section_header_offset) + section_header_size = reader.offset( + SECTION_HEADER.section_header_end) - # Find the section with the section names in it - names_section_header_base = base + section_header_size * reader.read( - FILE_HEADER.section_names_index) - names_table_base = reader.read(SECTION_HEADER.section_offset, - names_section_header_base) + # Find the section with the section names in it. + names_section_header_base = ( + base + section_header_size * + reader.read(FILE_HEADER.section_names_index)) + names_table_base = reader.read(SECTION_HEADER.section_offset, + names_section_header_base) - base = reader.read(FILE_HEADER.section_header_offset) - for _ in range(reader.read(FILE_HEADER.section_count)): - name_offset = reader.read(SECTION_HEADER.section_name_offset, base) + base = reader.read(FILE_HEADER.section_header_offset) + for _ in range(reader.read(FILE_HEADER.section_count)): + name_offset = reader.read(SECTION_HEADER.section_name_offset, + base) - yield self.Section( - reader.read_string(names_table_base + name_offset), - reader.read(SECTION_HEADER.section_address, base), - reader.read(SECTION_HEADER.section_offset, base), - reader.read(SECTION_HEADER.section_size, base)) + yield self.Section( + reader.read_string(names_table_base + name_offset), + reader.read(SECTION_HEADER.section_address, base), + reader.read(SECTION_HEADER.section_offset, base), + reader.read(SECTION_HEADER.section_size, base), + reader.file_offset) - base += section_header_size + base += section_header_size def section_by_address(self, address: int) -> Optional['Elf.Section']: """Returns the section that contains the provided address, if any.""" @@ -210,6 +268,11 @@ return None + def sections_with_name(self, name: str) -> Iterable['Elf.Section']: + for section in self.sections: + if section.name == name: + yield section + def read_value(self, address: int, size: Optional[int] = None) -> Union[None, bytes, int]: @@ -219,31 +282,22 @@ return None assert section.address <= address - self._elf.seek(section.offset + address - section.address) + self._elf.seek(section.file_offset + section.offset + address - + section.address) if size is None: return read_c_string(self._elf) return self._elf.read(size) - def dump_section(self, name: str) -> Optional[bytes]: - """Dumps section contents as a byte string; None if no match.""" - try: - section = self.sections_by_name[name] - except KeyError: - return None - - self._elf.seek(section.offset) - return self._elf.read(section.size) - - def dump_sections(self, name_regex) -> Optional[bytes]: - """Dumps a binary string containing the sections matching name_regex.""" - name_regex = re.compile(name_regex) + def dump_sections(self, name: Union[str, Pattern[str]]) -> Optional[bytes]: + """Dumps a binary string containing the sections matching the regex.""" + name_regex = re.compile(name) sections = [] for section in self.sections: if name_regex.match(section.name): - self._elf.seek(section.offset) + self._elf.seek(section.file_offset + section.offset) sections.append(self._elf.read(section.size)) return b''.join(sections) if sections else None @@ -268,15 +322,12 @@ output(value) -def _dump_sections(elf: Elf, output, name: str, regex) -> None: - if not name and not regex: +def _dump_sections(elf: Elf, output, sections: Iterable[Pattern[str]]) -> None: + if not sections: output(elf.summary().encode()) return - for section in name: - output(elf.dump_section(section)) - - for section_pattern in regex: + for section_pattern in sections: output(elf.dump_sections(section_pattern)) @@ -307,8 +358,11 @@ section_parser = subparsers.add_parser('section') section_parser.set_defaults(handler=_dump_sections) - section_parser.add_argument('-n', '--name', default=[], action='append') - section_parser.add_argument('-r', '--regex', default=[], action='append') + section_parser.add_argument('sections', + metavar='section_regex', + nargs='*', + type=re.compile, + help='section name regular expression') address_parser = subparsers.add_parser('address') address_parser.set_defaults(handler=_read_addresses)