Source code for common.spinner

import os
import subprocess
from openpilot.common.basedir import BASEDIR


[docs] class Spinner(): def __init__(self): try: self.spinner_proc = subprocess.Popen(["./spinner"], stdin=subprocess.PIPE, cwd=os.path.join(BASEDIR, "selfdrive", "ui"), close_fds=True) except OSError: self.spinner_proc = None def __enter__(self): return self
[docs] def update(self, spinner_text: str): if self.spinner_proc is not None: self.spinner_proc.stdin.write(spinner_text.encode('utf8') + b"\n") try: self.spinner_proc.stdin.flush() except BrokenPipeError: pass
[docs] def update_progress(self, cur: float, total: float): self.update(str(round(100 * cur / total)))
[docs] def close(self): if self.spinner_proc is not None: self.spinner_proc.kill() try: self.spinner_proc.communicate(timeout=2.) except subprocess.TimeoutExpired: print("WARNING: failed to kill spinner") self.spinner_proc = None
def __del__(self): self.close() def __exit__(self, exc_type, exc_value, traceback): self.close()
if __name__ == "__main__": import time with Spinner() as s: s.update("Spinner text") time.sleep(5.0) print("gone") time.sleep(5.0)