blob: a0fe4a21134e01b1d839fc2945ee31911f3e4f2e [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import cmd
import curses.ascii
import select
import socket
import sys
import time
from elftools.elf.elffile import ELFFile
class BootromShell(cmd.Cmd):
intro = "Welcome to Bootrom Shell"
# The bootrom will display its own prompt, no need for one here.
prompt = ""
socket = None
poller = None
pty_in = None
pty_out = None
connected = False
use_pty = False
default_host_name = "Renode"
default_host_addr = ("localhost", 31415)
default_max_retry = 60
############################################################################
# Network stuff here
# Try and connect to {host_addr} once per second for {max_retry} seconds
def connect(self, host_name, host_addr, max_retry):
if self.use_pty:
# pylint: disable=R1732
self.pty_in = open("/tmp/uart", "rb", buffering=0)
self.pty_out = open("/tmp/uart", "wb", buffering=0)
self.poller = select.poll()
self.poller.register(self.pty_in, select.POLLIN)
return True
if self.socket is None:
print("Opening socket")
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print(f"Connecting to {host_name}", end="")
for _ in range(max_retry):
try:
self.socket.connect(host_addr)
self.connected = True
self.poller = select.poll()
self.poller.register(self.socket, select.POLLIN)
print("Connected!")
return True
except ConnectionError:
print(".", end="")
sys.stdout.flush()
time.sleep(1)
print("Connection timed out!")
return False
def disconnect(self):
if self.pty_in is not None:
print("Closing pty_in")
self.pty_in.close()
if self.pty_out is not None:
print("Closing pty_in")
self.pty_out.close()
if self.socket is not None:
print("Closing socket")
self.socket.close()
self.connected = False
self.pty_in = None
self.pty_out = None
self.socket = None
self.poller = None
def getpeername(self):
if self.use_pty:
return "/tmp/uart"
return self.socket.getpeername()
def send(self, data):
if self.use_pty:
self.pty_out.write(data)
self.pty_out.flush()
else:
self.socket.send(data)
def recv(self, size):
if self.use_pty:
print(self.pty_in)
result = self.pty_in.read(size)
else:
result = self.socket.recv(size)
if len(result) == 0:
self.disconnect()
return result
def poll(self, timeout):
if self.connected:
poll_result = self.poller.poll(timeout)
return len(poll_result) != 0
return False
############################################################################
# Dumps incoming packet to the console, returns true if it contained a sync
# symbol
def print_packet(self, echo=False):
saw_sync = False
received = self.recv(4096)
for c in received:
if c == curses.ascii.ETX:
saw_sync = True
elif echo:
sys.stdout.write(chr(c))
sys.stdout.flush()
return saw_sync
# Waits until we see a packet containing a sync symbol or the line has been
# idle for {timeout}
def wait_for_sync(self, timeout, echo=True):
while True:
if not self.connected:
return False
if self.poll(timeout):
if self.print_packet(echo):
return True
else:
return False
# Collects bytes in a string until we see a sync symbol or we timeout
def wait_for_response(self, timeout):
result = bytearray()
while True:
if self.poll(timeout):
received = self.recv(1)
if received[0] == curses.ascii.ETX:
return result
result.append(received[0])
else:
print("Timeout while waiting for command response")
return b""
# Waits until the line has been idle for {timeout}
def wait_for_idle(self, timeout, echo=True):
while True:
if self.poll(timeout):
self.print_packet(echo)
else:
break
def run_command(self, line):
if self.connected:
self.send((line + "\n").encode())
self.wait_for_sync(100000)
else:
print("Not connected!")
############################################################################
def load_blob_at(self, blob, address):
chunks = [blob[i:i + 65536] for i in range(0, len(blob), 65536)]
for chunk in chunks:
self.send(f"write {hex(address)}\n".encode())
self.wait_for_response(100000)
self.send(base64.b64encode(chunk))
self.send("\n".encode())
self.wait_for_sync(100000)
address = address + 65536
def load_file_at(self, filename, address):
try:
with open(filename, "rb") as file:
print("file opened")
blob = file.read()
print("blob read")
self.load_blob_at(blob, address)
except OSError as e:
print(f"Could not load {filename}")
print(f"Exception {e} ")
def load_elf(self, filename):
try:
# pylint: disable=R1732
elf_file = ELFFile(open(filename, "rb"))
print(f"Entry point at {hex(elf_file.header.e_entry)}")
return elf_file
except OSError:
print(f"Could not open '{filename}' as an ELF file")
return None
def upload_elf(self, elf_file):
if elf_file is None:
print("No elf file")
return False
for segment in elf_file.iter_segments():
header = segment.header
if header.p_type == "PT_LOAD":
start = header.p_paddr
size = header.p_filesz
end = start + size
print(
f"Loading seg: {hex(start)}:{hex(end)} ({size} bytes)...")
sys.stdout.flush()
self.load_blob_at(segment.data(), start)
return True
############################################################################
# cmd.Cmd callbacks here
def do_connect(self, _=""):
"""Connects to the Renode server, localhost@31415 by default."""
host_name = self.default_host_name
host_addr = self.default_host_addr
max_retry = self.default_max_retry
if self.connect(host_name, host_addr, max_retry):
print(f"Connected to {host_name} @ {self.getpeername()}")
else:
print(
f"Failed to connect to {host_name} after {max_retry} seconds")
self.disconnect()
return
# Ping the server with a newline once per second until we see a sync
# symbol and the line goes idle for 100 msec.
print("Waiting for prompt...")
for _ in range(180):
if not self.connected:
break
self.send("\n".encode())
if self.wait_for_sync(1000):
# Sync seen, turn remote echo off and mute the ack
self.send("echo off\n".encode())
self.wait_for_idle(1000, False)
return
print("Did not see command prompt from server")
self.disconnect()
return
def do_disconnect(self, _=""):
"""Disconnect from the Renode server"""
self.disconnect()
def do_reconnect(self, _=""):
"""Reconnect to the Renode server"""
self.do_disconnect()
self.do_connect()
##----------------------------------------
def do_boot_elf(self, line=""):
"""Load local ELF file to remote device and boot it"""
try:
e = self.load_elf(line)
if e is None:
print(f"Could not open '{line}' as an ELF file")
return False
self.upload_elf(e)
self.send(f"boot {hex(e.header.e_entry)}\n".encode())
self.wait_for_sync(100000)
except OSError:
print(f"Failed to boot {line}")
return True
def do_load_elf(self, line=""):
"""Load local ELF file to remote device"""
e = self.load_elf(line)
if e is None:
print(f"Could not open '{line}' as an ELF file")
return False
self.upload_elf(e)
return False
def do_load_file_at(self, line):
"""Uploads a binary file to a fixed address"""
args = line.split()
self.load_file_at(args[0], int(args[1], 16))
def do_load_xflash(self, line):
"""Uploads a binary file to external flash"""
self.load_file_at(line, "0x44000000")
def do_boot_sec(self, line):
"""Boots an app on SEC at the given entry point.
This will kill the active bootrom console session."""
self.send(f"boot {line}\n".encode())
self.wait_for_idle(100)
def do_boot_smc(self, line):
"""Boots an app on SMC at the given entry point.
If entry == 0, will stop SMC."""
self.send(f"poked 0x54020000 {line}")
self.wait_for_idle(100)
##----------------------------------------
def do_help(self, arg=""):
"""Displays local and remote commands"""
super().do_help(arg)
if not arg:
print("Remote commands")
print("================")
print()
self.run_command("help")
def do_exit(self, _=""):
"""Exits BootromShell"""
self.disconnect()
return True
##----------------------------------------
def default(self, line):
self.run_command(line)
def emptyline(self):
pass
##----------------------------------------
# Command completeion callback (todo)
# pylint: disable=W0221
def completenames(self, text, line, begidx, endidx):
result = super().completenames(text, line, begidx, endidx)
return result
# Argument completion callback (todo)
# pylint: disable=W0221
def completedefault(self, text, line, begidx, endidx):
result = super().completedefault(text, line, begidx, endidx)
return result
################################################################################
if __name__ == '__main__':
print("<<shell starting>>")
shell = BootromShell()
shell.do_connect()
shell.cmdloop()
print("<<shell closed>>")