blob: be825c0b926695dfc4f7542d61f3a3270bdb764a [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run a live camera stream from Nexus board
"""
import argparse
from datetime import datetime
from functools import reduce
import re
import serial
import subprocess
from PIL import Image, ImageDraw, ImageFont
def main():
"""Args:
smc_uart_port: SMC uart port
baudrate: Baud rate of smc uart
input_shape: Input image shape
input_mode: Input image mode
"""
parser = argparse.ArgumentParser(
description='Generate inputs for ML models.')
parser.add_argument("--smc_uart_port",
"-p",
dest='port',
required=True,
help="SMC uart port")
parser.add_argument("--baudrate",
"-b",
default=115200,
type=int,
help='Baud rate of smc uart (example: 115200)')
parser.add_argument("--input_shape",
"-s",
default=[320, 240],
nargs='+',
type=int,
help='Input image shape (example: "320 240")')
parser.add_argument("--input_mode",
"-m",
default="L",
help='Input image mode (example: "RGB")')
parser.add_argument("--opentitantool_bin", type=str, required=True)
parser.add_argument("--opentitantool_conf", type=str, required=True)
args = parser.parse_args()
ser = serial.Serial(port=args.port, baudrate=args.baudrate)
while True:
line = ser.readline()
try:
if "Start of a frame" not in line.decode():
# Drop the current frame and process the next available frame.
continue
except:
continue
address = re.search(r"\[SMC\] Start of a frame, address (\d+)\.",
line.decode())
if address:
address = address.groups(1)[0]
else:
continue
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Start loading the frame")
# Load the frame from uart
byte_size = str(reduce(lambda x, y: x * y, args.input_shape))
byte_str = subprocess.check_output([
args.opentitantool_bin, '--conf', args.opentitantool_conf,
'--interface', 'nexus', 'spi', 'read-memory', '--length',
byte_size, '--start', address
])
im_obj = Image.frombytes(mode=args.input_mode,
size=tuple(args.input_shape),
data=byte_str)
im_width = args.input_shape[0]
im_height = args.input_shape[1]
# Fetch the scores
while (line := ser.readline()) and ("Score" not in line.decode()):
continue
scores = re.search(r"\[SMC\] Score 0: (-?\d+), Score 1: (-?\d+)\.",
line.decode())
if scores:
score_0 = scores.group(1)
score_1 = scores.group(2)
else:
score_0 = "Err"
score_1 = "Err"
# Draw the image
draw = ImageDraw.Draw(im_obj)
text = f"Score 0: {score_0:4}, Score 1: {score_1:4}"
font = ImageFont.load_default()
draw.text((im_width * 0.95 - font.getlength(text), im_height * 0.90),
text=text,
fill=255)
# Add timestamp
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
draw.text(
(im_width * 0.95 - font.getlength(current_time), im_height * 0.05),
text=current_time,
fill=255)
im_obj.save('/tmp/HPS_DEMO.png')
print("Finish the frame at", current_time)
print(text)
print('-' * 40)
if __name__ == "__main__":
main()