import contextlib
import http.server
import os
import threading
import time
from functools import wraps
import cereal.messaging as messaging
from openpilot.common.params import Params
from openpilot.selfdrive.manager.process_config import managed_processes
from openpilot.system.hardware import PC
from openpilot.system.version import training_version, terms_version
[docs]
def set_params_enabled():
os.environ['FINGERPRINT'] = "TOYOTA_COROLLA_TSS2"
os.environ['LOGPRINT'] = "debug"
params = Params()
params.put("HasAcceptedTerms", terms_version)
params.put("CompletedTrainingVersion", training_version)
params.put_bool("OpenpilotEnabledToggle", True)
# valid calib
msg = messaging.new_message('liveCalibration')
msg.liveCalibration.validBlocks = 20
msg.liveCalibration.rpyCalib = [0.0, 0.0, 0.0]
params.put("CalibrationParams", msg.to_bytes())
[docs]
def phone_only(f):
@wraps(f)
def wrap(self, *args, **kwargs):
if PC:
self.skipTest("This test is not meant to run on PC")
f(self, *args, **kwargs)
return wrap
[docs]
def release_only(f):
@wraps(f)
def wrap(self, *args, **kwargs):
if "RELEASE" not in os.environ:
self.skipTest("This test is only for release branches")
f(self, *args, **kwargs)
return wrap
[docs]
@contextlib.contextmanager
def processes_context(processes, init_time=0, ignore_stopped=None):
ignore_stopped = [] if ignore_stopped is None else ignore_stopped
# start and assert started
for n, p in enumerate(processes):
managed_processes[p].start()
if n < len(processes) - 1:
time.sleep(init_time)
assert all(managed_processes[name].proc.exitcode is None for name in processes)
try:
yield [managed_processes[name] for name in processes]
# assert processes are still started
assert all(managed_processes[name].proc.exitcode is None for name in processes if name not in ignore_stopped)
finally:
for p in processes:
managed_processes[p].stop()
[docs]
def with_processes(processes, init_time=0, ignore_stopped=None):
def wrapper(func):
@wraps(func)
def wrap(*args, **kwargs):
with processes_context(processes, init_time, ignore_stopped):
return func(*args, **kwargs)
return wrap
return wrapper
[docs]
def noop(*args, **kwargs):
pass
[docs]
def read_segment_list(segment_list_path):
with open(segment_list_path) as f:
seg_list = f.read().splitlines()
return [(platform[2:], segment) for platform, segment in zip(seg_list[::2], seg_list[1::2], strict=True)]
[docs]
@contextlib.contextmanager
def http_server_context(handler, setup=None):
host = '127.0.0.1'
server = http.server.HTTPServer((host, 0), handler)
port = server.server_port
t = threading.Thread(target=server.serve_forever)
t.start()
if setup is not None:
setup(host, port)
try:
yield (host, port)
finally:
server.shutdown()
server.server_close()
t.join()
[docs]
def with_http_server(func, handler=http.server.BaseHTTPRequestHandler, setup=None):
@wraps(func)
def inner(*args, **kwargs):
with http_server_context(handler, setup) as (host, port):
return func(*args, f"http://{host}:{port}", **kwargs)
return inner
[docs]
def DirectoryHttpServer(directory) -> type[http.server.SimpleHTTPRequestHandler]:
# creates an http server that serves files from directory
class Handler(http.server.SimpleHTTPRequestHandler):
API_NO_RESPONSE = False
API_BAD_RESPONSE = False
def do_GET(self):
if self.API_NO_RESPONSE:
return
if self.API_BAD_RESPONSE:
self.send_response(500, "")
return
super().do_GET()
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=str(directory), **kwargs)
return Handler