[test] Add functional verilator tests.
|functional_verilator_test.py| takes a flag pointing to an executable,
which is then executed under a Verilator build. The test listens
over UART, and passes or fails the test depending on whether it prints
PASS! or FAIL!; see the file comment for details.
diff --git a/test/systemtest/test_utils.py b/test/systemtest/test_utils.py
index 87f0565..8640bf2 100644
--- a/test/systemtest/test_utils.py
+++ b/test/systemtest/test_utils.py
@@ -9,6 +9,7 @@
import logging
import os
import re
+import select
import shlex
import signal
import subprocess
@@ -17,7 +18,6 @@
class Process:
"""Utility class used to spawn an interact with processs.s"""
-
def __init__(self,
cmd,
logdir,
@@ -80,14 +80,13 @@
self._f_stdout = open(logfile_stdout, 'w')
self._f_stderr = open(logfile_stderr, 'w')
- self.proc = subprocess.Popen(
- cmd,
- cwd=self.cwd,
- universal_newlines=True,
- bufsize=1,
- stdin=subprocess.PIPE,
- stdout=self._f_stdout,
- stderr=self._f_stderr)
+ self.proc = subprocess.Popen(cmd,
+ cwd=self.cwd,
+ universal_newlines=True,
+ bufsize=1,
+ stdin=subprocess.PIPE,
+ stdout=self._f_stdout,
+ stderr=self._f_stderr)
self._f_stdout_r = open(logfile_stdout, 'r')
self._f_stderr_r = open(logfile_stderr, 'r')
@@ -98,8 +97,8 @@
# check if the string indicating a successful startup appears in the
# the program output (STDOUT or STDERR)
- init_done = self._find_in_output(
- pattern=self.startup_done_expect, timeout=self.startup_timeout)
+ init_done = self.find_in_output(pattern=self.startup_done_expect,
+ timeout=self.startup_timeout)
if init_done == None:
raise subprocess.TimeoutExpired
@@ -144,9 +143,9 @@
if pattern == None:
return True
- return self._find_in_output(pattern, timeout) != None
+ return self.find_in_output(pattern, timeout) != None
- def _find_in_output(self, pattern, timeout):
+ def find_in_output(self, pattern, timeout):
"""Read STDOUT and STDERR to find an expected pattern.
Both streams are reset to the start of the stream before searching.
@@ -169,7 +168,6 @@
if timeout != None:
t_end = time.time() + timeout
-
# reset streams
self._f_stdout_r.seek(0)
self._f_stderr_r.seek(0)
@@ -217,3 +215,51 @@
pass
return None
+
+
+def stream_fd_to_log(fd, logger, pattern, timeout=None):
+ """
+ Streams lines from the given fd to log until pattern matches.
+
+ Returns the match object derived from pattern.match(), or None if
+ the timeout expires.
+ """
+
+ deadline = None
+ if timeout != None:
+ deadline = time.monotonic() + timeout
+
+ os.set_blocking(fd, False)
+ line_of_output = b''
+ while True:
+ if deadline != None and time.monotonic() > deadline:
+ return None
+
+ if line_of_output.endswith(b'\n'):
+ line_of_output = b''
+
+ # select() on the fd so that we don't waste time reading when
+ # we wouldn't get anything out of it.
+ if deadline != None:
+ rlist, _, _ = select.select([fd], [], [],
+ deadline - time.monotonic())
+ else:
+ rlist, _, _ = select.select([fd], [], [])
+
+ if len(rlist) == 0:
+ continue
+
+ raw_bytes = os.read(fd, 1024)
+ lines = raw_bytes.splitlines(True)
+
+ for line in lines:
+ line_of_output += line
+ if not line_of_output.endswith(b'\n'):
+ break
+
+ logger.debug('fd#%d: %s' % (fd, line_of_output))
+ match = pattern.match(line_of_output.decode('utf-8'))
+ if match != None:
+ return match
+
+ line_of_output = b''