| #!/usr/bin/env python3 |
| # Copyright 2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| """ |
| Run a live camera stream from Nexus board |
| """ |
| |
| import argparse |
| from datetime import datetime |
| from functools import reduce |
| import re |
| import serial |
| import subprocess |
| from PIL import Image, ImageDraw, ImageFont |
| |
| |
| def main(): |
| """Args: |
| smc_uart_port: SMC uart port |
| baudrate: Baud rate of smc uart |
| input_shape: Input image shape |
| input_mode: Input image mode |
| """ |
| parser = argparse.ArgumentParser( |
| description='Generate inputs for ML models.') |
| parser.add_argument("--smc_uart_port", |
| "-p", |
| dest='port', |
| required=True, |
| help="SMC uart port") |
| parser.add_argument("--baudrate", |
| "-b", |
| default=115200, |
| type=int, |
| help='Baud rate of smc uart (example: 115200)') |
| parser.add_argument("--input_shape", |
| "-s", |
| default=[320, 240], |
| nargs='+', |
| type=int, |
| help='Input image shape (example: "320 240")') |
| parser.add_argument("--input_mode", |
| "-m", |
| default="L", |
| help='Input image mode (example: "RGB")') |
| parser.add_argument("--opentitantool_bin", type=str, required=True) |
| parser.add_argument("--opentitantool_conf", type=str, required=True) |
| args = parser.parse_args() |
| |
| ser = serial.Serial(port=args.port, baudrate=args.baudrate) |
| |
| while True: |
| line = ser.readline() |
| try: |
| if "Start of a frame" not in line.decode(): |
| # Drop the current frame and process the next available frame. |
| continue |
| except: |
| continue |
| address = re.search(r"\[SMC\] Start of a frame, address (\d+)\.", |
| line.decode()) |
| if address: |
| address = address.groups(1)[0] |
| else: |
| continue |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| print("Start loading the frame") |
| # Load the frame from uart |
| byte_size = str(reduce(lambda x, y: x * y, args.input_shape)) |
| byte_str = subprocess.check_output([ |
| args.opentitantool_bin, '--conf', args.opentitantool_conf, |
| '--interface', 'nexus', 'spi', 'read-memory', '--length', |
| byte_size, '--start', address |
| ]) |
| |
| im_obj = Image.frombytes(mode=args.input_mode, |
| size=tuple(args.input_shape), |
| data=byte_str) |
| im_width = args.input_shape[0] |
| im_height = args.input_shape[1] |
| |
| # Fetch the scores |
| while (line := ser.readline()) and ("Score" not in line.decode()): |
| continue |
| scores = re.search(r"\[SMC\] Score 0: (-?\d+), Score 1: (-?\d+)\.", |
| line.decode()) |
| if scores: |
| score_0 = scores.group(1) |
| score_1 = scores.group(2) |
| else: |
| score_0 = "Err" |
| score_1 = "Err" |
| |
| # Draw the image |
| draw = ImageDraw.Draw(im_obj) |
| text = f"Score 0: {score_0:4}, Score 1: {score_1:4}" |
| font = ImageFont.load_default() |
| draw.text((im_width * 0.95 - font.getlength(text), im_height * 0.90), |
| text=text, |
| fill=255) |
| # Add timestamp |
| current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
| draw.text( |
| (im_width * 0.95 - font.getlength(current_time), im_height * 0.05), |
| text=current_time, |
| fill=255) |
| |
| im_obj.save('/tmp/HPS_DEMO.png') |
| print("Finish the frame at", current_time) |
| print(text) |
| print('-' * 40) |
| |
| |
| if __name__ == "__main__": |
| main() |