Initial commit of bootrom shell Change-Id: If41fa2e40274b19f66297e979b177e869828e85c
diff --git a/bootshell.py b/bootshell.py new file mode 100755 index 0000000..f8a18f6 --- /dev/null +++ b/bootshell.py
@@ -0,0 +1,342 @@ +#!/usr/bin/env python3 + +import base64 +import cmd +import curses.ascii +import select +import socket +import sys +import time + +from elftools.elf.elffile import ELFFile + + +class BootromShell(cmd.Cmd): + intro = "Welcome to Bootrom Shell" + prompt = "BOOTROM> " + socket = None + poller = None + pty_in = None + pty_out = None + connected = False + use_pty = False + + ############################################################################ + # Network stuff here + + # Try and connect to {host_addr} once per second for {max_retry} seconds + def connect(self, host_name, host_addr, max_retry): + if self.use_pty: + # pylint: disable=R1732 + self.pty_in = open("/tmp/uart", "rb", buffering=0) + self.pty_out = open("/tmp/uart", "wb", buffering=0) + self.poller = select.poll() + self.poller.register(self.pty_in, select.POLLIN) + return True + if self.socket is None: + print("Opening socket") + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + print(f"Connecting to {host_name}", end="") + for _ in range(max_retry): + try: + self.socket.connect(host_addr) + self.connected = True + self.poller = select.poll() + self.poller.register(self.socket, select.POLLIN) + print("Connected!") + return True + except ConnectionError: + print(".", end="") + sys.stdout.flush() + time.sleep(1) + print("Connection timed out!") + return False + + def disconnect(self): + if self.pty_in is not None: + print("Closing pty_in") + self.pty_in.close() + if self.pty_out is not None: + print("Closing pty_in") + self.pty_out.close() + if self.socket is not None: + print("Closing socket") + self.socket.close() + self.connected = False + self.pty_in = None + self.pty_out = None + self.socket = None + self.poller = None + + def getpeername(self): + if self.use_pty: + return "/tmp/uart" + return self.socket.getpeername() + + def send(self, data): + if self.use_pty: + self.pty_out.write(data) + self.pty_out.flush() + else: + self.socket.send(data) + + def recv(self, size): + if self.use_pty: + print(self.pty_in) + result = self.pty_in.read(size) + return result + result = self.socket.recv(size) + if len(result) == 0: + self.disconnect() + return result + + def poll(self, timeout): + if self.connected: + poll_result = self.poller.poll(timeout) + return len(poll_result) != 0 + return False + + ############################################################################ + + # Dumps incoming packet to the console, returns true if it contained a sync + # symbol + def print_packet(self, echo=False): + saw_sync = False + received = self.recv(4096) + for c in received: + if c == curses.ascii.ETX: + saw_sync = True + elif echo: + sys.stdout.write(chr(c)) + sys.stdout.flush() + return saw_sync + + # Waits until we see a packet containing a sync symbol or the line has been + # idle for {timeout} + def wait_for_sync(self, timeout, echo=True): + while True: + if not self.connected: + return False + if self.poll(timeout): + if self.print_packet(echo): + return True + else: + return False + + # Collects bytes in a string until we see a sync symbol or we timeout + def wait_for_response(self, timeout): + result = bytearray() + while True: + if self.poll(timeout): + received = self.recv(1) + if received[0] == curses.ascii.ETX: + return result + result.append(received[0]) + else: + print("Timeout while waiting for command response") + return b"" + + # Waits until the line has been idle for {timeout} + def wait_for_idle(self, timeout, echo=True): + while True: + if self.poll(timeout): + self.print_packet(echo) + else: + break + + def run_command(self, line): + if self.connected: + self.send((line + "\n").encode()) + self.wait_for_sync(100000) + else: + print("Not connected!") + + ############################################################################ + + def load_blob_at(self, blob, address): + chunks = [blob[i:i + 65536] for i in range(0, len(blob), 65536)] + for chunk in chunks: + self.send(f"write {hex(address)}\n".encode()) + self.wait_for_response(100000) + self.send(base64.b64encode(chunk)) + self.send("\n".encode()) + self.wait_for_sync(100000) + address = address + 65536 + + def load_file_at(self, filename, address): + try: + with open(filename, "rb") as file: + print("file opened") + blob = file.read() + print("blob read") + self.load_blob_at(blob, address) + except OSError as e: + print(f"Could not load {filename}") + print(f"Exception {e} ") + + def load_elf(self, filename): + try: + # pylint: disable=R1732 + elf_file = ELFFile(open(filename, "rb")) + print(f"Entry point at {hex(elf_file.header.e_entry)}") + return elf_file + except OSError: + print(f"Could not open '{filename}' as an ELF file") + return None + + def upload_elf(self, elf_file): + if elf_file is None: + print("No elf file") + return False + for segment in elf_file.iter_segments(): + header = segment.header + if header.p_type == "PT_LOAD": + start = header.p_paddr + size = header.p_filesz + end = start + size + print( + f"Loading seg: {hex(start)}:{hex(end)} ({size} bytes)...") + sys.stdout.flush() + self.load_blob_at(segment.data(), start) + + return True + + ############################################################################ + # cmd.Cmd callbacks here + + def do_connect(self, _=""): + """Connects to the Renode server, localhost@31415 by default.""" + host_name = "Renode" + host_addr = ("localhost", 31415) + max_retry = 60 + + if self.connect(host_name, host_addr, max_retry): + print(f"Connected to {host_name} @ {self.getpeername()}") + else: + print( + f"Failed to connect to {host_name} after {max_retry} seconds") + self.disconnect() + return + + # Ping the server with a newline once per second until we see a sync + # symbol and the line goes idle for 100 msec. + print("Waiting for prompt...") + for _ in range(180): + if not self.connected: + break + self.send("\n".encode()) + if self.wait_for_sync(1000): + # Sync seen, turn remote echo off and mute the ack + self.send("echo off\n".encode()) + self.wait_for_idle(100, False) + return + + print("Did not see command prompt from server") + self.disconnect() + return + + def do_disconnect(self, _=""): + """Disconnect from the Renode server""" + self.disconnect() + + def do_reconnect(self, _=""): + """Reconnect to the Renode server""" + self.do_disconnect() + self.do_connect() + + ##---------------------------------------- + + def do_boot_elf(self, line=""): + """Load local ELF file to remote device and boot it""" + try: + e = self.load_elf(line) + if e is None: + print(f"Could not open '{line}' as an ELF file") + return False + + self.upload_elf(e) + self.send(f"boot {hex(e.header.e_entry)}\n".encode()) + self.wait_for_sync(100000) + except OSError: + print(f"Failed to boot {line}") + return True + + def do_load_elf(self, line=""): + """Load local ELF file to remote device""" + e = self.load_elf(line) + if e is None: + print(f"Could not open '{line}' as an ELF file") + return False + + self.upload_elf(e) + return False + + def do_load_file_at(self, line): + """Uploads a binary file to a fixed address""" + args = line.split() + self.load_file_at(args[0], int(args[1], 16)) + + def do_load_xflash(self, line): + """Uploads a binary file to external flash""" + self.load_file_at(line, "0x44000000") + + def do_boot_sec(self, line): + """Boots an app on SEC at the given entry point. + This will kill the active bootrom console session.""" + self.send(f"boot {line}\n".encode()) + self.wait_for_idle(100) + + def do_boot_smc(self, line): + """Boots an app on SMC at the given entry point. + If entry == 0, will stop SMC.""" + self.send(f"poked 0x54020000 {line}") + self.wait_for_idle(100) + + ##---------------------------------------- + + def do_help(self, arg=""): + """Displays local and remote commands""" + super().do_help(arg) + if not arg: + print("Remote commands") + print("================") + print() + self.run_command("help") + + def do_exit(self, _=""): + """Exits BootromShell""" + self.disconnect() + return True + + ##---------------------------------------- + + def default(self, line): + self.run_command(line) + + def emptyline(self): + pass + + ##---------------------------------------- + + # Command completeion callback (todo) + # pylint: disable=W0221 + def completenames(self, text, line, begidx, endidx): + result = super().completenames(text, line, begidx, endidx) + return result + + # Argument completion callback (todo) + # pylint: disable=W0221 + def completedefault(self, text, line, begidx, endidx): + result = super().completedefault(text, line, begidx, endidx) + return result + + +################################################################################ + +if __name__ == '__main__': + print("<<shell starting>>") + shell = BootromShell() + shell.do_connect() + shell.cmdloop() + print("<<shell closed>>")