Source code for selfdrive.test.process_replay.capture

import os
import sys

from typing import no_type_check

[docs] class FdRedirect: def __init__(self, file_prefix: str, fd: int): fname = os.path.join("/tmp", f"{file_prefix}.{fd}") if os.path.exists(fname): os.unlink(fname) self.dest_fd = os.open(fname, os.O_WRONLY | os.O_CREAT) self.dest_fname = fname self.source_fd = fd os.set_inheritable(self.dest_fd, True) def __del__(self): os.close(self.dest_fd)
[docs] def purge(self) -> None: os.unlink(self.dest_fname)
[docs] def read(self) -> bytes: with open(self.dest_fname, "rb") as f: return f.read() or b""
[docs] class ProcessOutputCapture: def __init__(self, proc_name: str, prefix: str): prefix = f"{proc_name}_{prefix}" self.stdout_redirect = FdRedirect(prefix, 1) self.stderr_redirect = FdRedirect(prefix, 2) def __del__(self): self.stdout_redirect.purge() self.stderr_redirect.purge()
[docs] def read_outerr(self) -> tuple[str, str]: out_str = self.stdout_redirect.read().decode() err_str = self.stderr_redirect.read().decode() return out_str, err_str