blob: f8a18f6508106b8b396979f721f1d223aca7ecd2 [file] [log] [blame]
Austin Appleby64c32212022-10-24 21:25:34 +00001#!/usr/bin/env python3
2
3import base64
4import cmd
5import curses.ascii
6import select
7import socket
8import sys
9import time
10
11from elftools.elf.elffile import ELFFile
12
13
14class BootromShell(cmd.Cmd):
15 intro = "Welcome to Bootrom Shell"
16 prompt = "BOOTROM> "
17 socket = None
18 poller = None
19 pty_in = None
20 pty_out = None
21 connected = False
22 use_pty = False
23
24 ############################################################################
25 # Network stuff here
26
27 # Try and connect to {host_addr} once per second for {max_retry} seconds
28 def connect(self, host_name, host_addr, max_retry):
29 if self.use_pty:
30 # pylint: disable=R1732
31 self.pty_in = open("/tmp/uart", "rb", buffering=0)
32 self.pty_out = open("/tmp/uart", "wb", buffering=0)
33 self.poller = select.poll()
34 self.poller.register(self.pty_in, select.POLLIN)
35 return True
36 if self.socket is None:
37 print("Opening socket")
38 self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
39
40 print(f"Connecting to {host_name}", end="")
41 for _ in range(max_retry):
42 try:
43 self.socket.connect(host_addr)
44 self.connected = True
45 self.poller = select.poll()
46 self.poller.register(self.socket, select.POLLIN)
47 print("Connected!")
48 return True
49 except ConnectionError:
50 print(".", end="")
51 sys.stdout.flush()
52 time.sleep(1)
53 print("Connection timed out!")
54 return False
55
56 def disconnect(self):
57 if self.pty_in is not None:
58 print("Closing pty_in")
59 self.pty_in.close()
60 if self.pty_out is not None:
61 print("Closing pty_in")
62 self.pty_out.close()
63 if self.socket is not None:
64 print("Closing socket")
65 self.socket.close()
66 self.connected = False
67 self.pty_in = None
68 self.pty_out = None
69 self.socket = None
70 self.poller = None
71
72 def getpeername(self):
73 if self.use_pty:
74 return "/tmp/uart"
75 return self.socket.getpeername()
76
77 def send(self, data):
78 if self.use_pty:
79 self.pty_out.write(data)
80 self.pty_out.flush()
81 else:
82 self.socket.send(data)
83
84 def recv(self, size):
85 if self.use_pty:
86 print(self.pty_in)
87 result = self.pty_in.read(size)
88 return result
89 result = self.socket.recv(size)
90 if len(result) == 0:
91 self.disconnect()
92 return result
93
94 def poll(self, timeout):
95 if self.connected:
96 poll_result = self.poller.poll(timeout)
97 return len(poll_result) != 0
98 return False
99
100 ############################################################################
101
102 # Dumps incoming packet to the console, returns true if it contained a sync
103 # symbol
104 def print_packet(self, echo=False):
105 saw_sync = False
106 received = self.recv(4096)
107 for c in received:
108 if c == curses.ascii.ETX:
109 saw_sync = True
110 elif echo:
111 sys.stdout.write(chr(c))
112 sys.stdout.flush()
113 return saw_sync
114
115 # Waits until we see a packet containing a sync symbol or the line has been
116 # idle for {timeout}
117 def wait_for_sync(self, timeout, echo=True):
118 while True:
119 if not self.connected:
120 return False
121 if self.poll(timeout):
122 if self.print_packet(echo):
123 return True
124 else:
125 return False
126
127 # Collects bytes in a string until we see a sync symbol or we timeout
128 def wait_for_response(self, timeout):
129 result = bytearray()
130 while True:
131 if self.poll(timeout):
132 received = self.recv(1)
133 if received[0] == curses.ascii.ETX:
134 return result
135 result.append(received[0])
136 else:
137 print("Timeout while waiting for command response")
138 return b""
139
140 # Waits until the line has been idle for {timeout}
141 def wait_for_idle(self, timeout, echo=True):
142 while True:
143 if self.poll(timeout):
144 self.print_packet(echo)
145 else:
146 break
147
148 def run_command(self, line):
149 if self.connected:
150 self.send((line + "\n").encode())
151 self.wait_for_sync(100000)
152 else:
153 print("Not connected!")
154
155 ############################################################################
156
157 def load_blob_at(self, blob, address):
158 chunks = [blob[i:i + 65536] for i in range(0, len(blob), 65536)]
159 for chunk in chunks:
160 self.send(f"write {hex(address)}\n".encode())
161 self.wait_for_response(100000)
162 self.send(base64.b64encode(chunk))
163 self.send("\n".encode())
164 self.wait_for_sync(100000)
165 address = address + 65536
166
167 def load_file_at(self, filename, address):
168 try:
169 with open(filename, "rb") as file:
170 print("file opened")
171 blob = file.read()
172 print("blob read")
173 self.load_blob_at(blob, address)
174 except OSError as e:
175 print(f"Could not load {filename}")
176 print(f"Exception {e} ")
177
178 def load_elf(self, filename):
179 try:
180 # pylint: disable=R1732
181 elf_file = ELFFile(open(filename, "rb"))
182 print(f"Entry point at {hex(elf_file.header.e_entry)}")
183 return elf_file
184 except OSError:
185 print(f"Could not open '{filename}' as an ELF file")
186 return None
187
188 def upload_elf(self, elf_file):
189 if elf_file is None:
190 print("No elf file")
191 return False
192 for segment in elf_file.iter_segments():
193 header = segment.header
194 if header.p_type == "PT_LOAD":
195 start = header.p_paddr
196 size = header.p_filesz
197 end = start + size
198 print(
199 f"Loading seg: {hex(start)}:{hex(end)} ({size} bytes)...")
200 sys.stdout.flush()
201 self.load_blob_at(segment.data(), start)
202
203 return True
204
205 ############################################################################
206 # cmd.Cmd callbacks here
207
208 def do_connect(self, _=""):
209 """Connects to the Renode server, localhost@31415 by default."""
210 host_name = "Renode"
211 host_addr = ("localhost", 31415)
212 max_retry = 60
213
214 if self.connect(host_name, host_addr, max_retry):
215 print(f"Connected to {host_name} @ {self.getpeername()}")
216 else:
217 print(
218 f"Failed to connect to {host_name} after {max_retry} seconds")
219 self.disconnect()
220 return
221
222 # Ping the server with a newline once per second until we see a sync
223 # symbol and the line goes idle for 100 msec.
224 print("Waiting for prompt...")
225 for _ in range(180):
226 if not self.connected:
227 break
228 self.send("\n".encode())
229 if self.wait_for_sync(1000):
230 # Sync seen, turn remote echo off and mute the ack
231 self.send("echo off\n".encode())
232 self.wait_for_idle(100, False)
233 return
234
235 print("Did not see command prompt from server")
236 self.disconnect()
237 return
238
239 def do_disconnect(self, _=""):
240 """Disconnect from the Renode server"""
241 self.disconnect()
242
243 def do_reconnect(self, _=""):
244 """Reconnect to the Renode server"""
245 self.do_disconnect()
246 self.do_connect()
247
248 ##----------------------------------------
249
250 def do_boot_elf(self, line=""):
251 """Load local ELF file to remote device and boot it"""
252 try:
253 e = self.load_elf(line)
254 if e is None:
255 print(f"Could not open '{line}' as an ELF file")
256 return False
257
258 self.upload_elf(e)
259 self.send(f"boot {hex(e.header.e_entry)}\n".encode())
260 self.wait_for_sync(100000)
261 except OSError:
262 print(f"Failed to boot {line}")
263 return True
264
265 def do_load_elf(self, line=""):
266 """Load local ELF file to remote device"""
267 e = self.load_elf(line)
268 if e is None:
269 print(f"Could not open '{line}' as an ELF file")
270 return False
271
272 self.upload_elf(e)
273 return False
274
275 def do_load_file_at(self, line):
276 """Uploads a binary file to a fixed address"""
277 args = line.split()
278 self.load_file_at(args[0], int(args[1], 16))
279
280 def do_load_xflash(self, line):
281 """Uploads a binary file to external flash"""
282 self.load_file_at(line, "0x44000000")
283
284 def do_boot_sec(self, line):
285 """Boots an app on SEC at the given entry point.
286 This will kill the active bootrom console session."""
287 self.send(f"boot {line}\n".encode())
288 self.wait_for_idle(100)
289
290 def do_boot_smc(self, line):
291 """Boots an app on SMC at the given entry point.
292 If entry == 0, will stop SMC."""
293 self.send(f"poked 0x54020000 {line}")
294 self.wait_for_idle(100)
295
296 ##----------------------------------------
297
298 def do_help(self, arg=""):
299 """Displays local and remote commands"""
300 super().do_help(arg)
301 if not arg:
302 print("Remote commands")
303 print("================")
304 print()
305 self.run_command("help")
306
307 def do_exit(self, _=""):
308 """Exits BootromShell"""
309 self.disconnect()
310 return True
311
312 ##----------------------------------------
313
314 def default(self, line):
315 self.run_command(line)
316
317 def emptyline(self):
318 pass
319
320 ##----------------------------------------
321
322 # Command completeion callback (todo)
323 # pylint: disable=W0221
324 def completenames(self, text, line, begidx, endidx):
325 result = super().completenames(text, line, begidx, endidx)
326 return result
327
328 # Argument completion callback (todo)
329 # pylint: disable=W0221
330 def completedefault(self, text, line, begidx, endidx):
331 result = super().completedefault(text, line, begidx, endidx)
332 return result
333
334
335################################################################################
336
337if __name__ == '__main__':
338 print("<<shell starting>>")
339 shell = BootromShell()
340 shell.do_connect()
341 shell.cmdloop()
342 print("<<shell closed>>")