selfdrive.test.process_replay package

Submodules

selfdrive.test.process_replay.capture module

class selfdrive.test.process_replay.capture.FdRedirect(file_prefix: str, fd: int)[source]

Bases: object

purge() None[source]
read() bytes[source]
class selfdrive.test.process_replay.capture.ProcessOutputCapture(proc_name: str, prefix: str)[source]

Bases: object

read_outerr() tuple[str, str][source]

selfdrive.test.process_replay.compare_logs module

selfdrive.test.process_replay.compare_logs.compare_logs(log1, log2, ignore_fields=None, ignore_msgs=None, tolerance=None)[source]
selfdrive.test.process_replay.compare_logs.format_diff(results, log_paths, ref_commit)[source]
selfdrive.test.process_replay.compare_logs.format_process_diff(diff)[source]
selfdrive.test.process_replay.compare_logs.remove_ignored_fields(msg, ignore)[source]

selfdrive.test.process_replay.migration module

selfdrive.test.process_replay.migration.migrate_all(lr, old_logtime=False, manager_states=False, panda_states=False, camera_states=False)[source]
selfdrive.test.process_replay.migration.migrate_cameraStates(lr)[source]
selfdrive.test.process_replay.migration.migrate_carParams(lr, old_logtime=False)[source]
selfdrive.test.process_replay.migration.migrate_deviceState(lr)[source]
selfdrive.test.process_replay.migration.migrate_gpsLocation(lr)[source]
selfdrive.test.process_replay.migration.migrate_managerState(lr)[source]
selfdrive.test.process_replay.migration.migrate_pandaStates(lr)[source]
selfdrive.test.process_replay.migration.migrate_peripheralState(lr)[source]
selfdrive.test.process_replay.migration.migrate_sensorEvents(lr, old_logtime=False)[source]

selfdrive.test.process_replay.model_replay module

selfdrive.test.process_replay.model_replay.get_log_fn(ref_commit, test_route)[source]
selfdrive.test.process_replay.model_replay.model_replay(lr, frs)[source]
selfdrive.test.process_replay.model_replay.trim_logs_to_max_frames(logs, max_frames, frs_types, include_all_types)[source]

selfdrive.test.process_replay.process_replay module

class selfdrive.test.process_replay.process_replay.DummySocket[source]

Bases: object

receive(non_blocking: bool = False) bytes | None[source]
send(data: bytes)[source]
class selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback(trigger_msg_type)[source]

Bases: object

class selfdrive.test.process_replay.process_replay.LauncherWithCapture(capture: ProcessOutputCapture, launcher: Callable)[source]

Bases: object

class selfdrive.test.process_replay.process_replay.MessageBasedRcvCallback(trigger_msg_type)[source]

Bases: object

class selfdrive.test.process_replay.process_replay.ModeldCameraSyncRcvCallback[source]

Bases: object

class selfdrive.test.process_replay.process_replay.ProcessConfig(proc_name: str, pubs: list[str], subs: list[str], ignore: list[str], config_callback: collections.abc.Callable | None = None, init_callback: collections.abc.Callable | None = None, should_recv_callback: collections.abc.Callable | None = None, tolerance: float | None = None, processing_time: float = 0.001, timeout: int = 30, simulation: bool = True, main_pub: str | None = None, main_pub_drained: bool = True, vision_pubs: list[str] = <factory>, ignore_alive_pubs: list[str] = <factory>, unlocked_pubs: list[str] = <factory>)[source]

Bases: object

config_callback: Callable | None = None
ignore: list[str]
ignore_alive_pubs: list[str]
init_callback: Callable | None = None
main_pub: str | None = None
main_pub_drained: bool = True
proc_name: str
processing_time: float = 0.001
pubs: list[str]
should_recv_callback: Callable | None = None
simulation: bool = True
subs: list[str]
timeout: int = 30
tolerance: float | None = None
unlocked_pubs: list[str]
vision_pubs: list[str]
class selfdrive.test.process_replay.process_replay.ProcessContainer(cfg: ProcessConfig)[source]

Bases: object

property has_empty_queue: bool
property pubs: list[str]
run_step(msg: _DynamicStructReader, frs: dict[str, BaseFrameReader] | None) list[_DynamicStructReader][source]
start(params_config: dict[str, Any], environ_config: dict[str, Any], all_msgs: Iterable[type[_DynamicStructReader]], frs: dict[str, BaseFrameReader] | None, fingerprint: str | None, capture_output: bool)[source]
stop()[source]
property subs: list[str]
class selfdrive.test.process_replay.process_replay.ReplayContext(cfg)[source]

Bases: object

property all_recv_called_events
property all_recv_ready_events
close_context()[source]
open_context()[source]
send_sync(pm, endpoint, dat)[source]
unlock_sockets()[source]
wait_for_next_recv(trigger_empty_recv)[source]
wait_for_recv_called()[source]
selfdrive.test.process_replay.process_replay.calibration_rcv_callback(msg, cfg, frame)[source]
selfdrive.test.process_replay.process_replay.check_openpilot_enabled(msgs: Iterable[type[_DynamicStructReader]]) bool[source]
selfdrive.test.process_replay.process_replay.controlsd_config_callback(params, cfg, lr)[source]
selfdrive.test.process_replay.process_replay.controlsd_fingerprint_callback(rc, pm, msgs, fingerprint)[source]
selfdrive.test.process_replay.process_replay.controlsd_rcv_callback(msg, cfg, frame)[source]
selfdrive.test.process_replay.process_replay.dmonitoringmodeld_rcv_callback(msg, cfg, frame)[source]
selfdrive.test.process_replay.process_replay.generate_environ_config(CP=None, fingerprint=None, log_dir=None) dict[str, Any][source]
selfdrive.test.process_replay.process_replay.generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) dict[str, Any][source]
selfdrive.test.process_replay.process_replay.get_car_params_callback(rc, pm, msgs, fingerprint)[source]
selfdrive.test.process_replay.process_replay.get_custom_params_from_lr(lr: Iterable[type[_DynamicStructReader]], initial_state: str = 'first') dict[str, Any][source]

Use this to get custom params dict based on provided logs. Useful when replaying following processes: calibrationd, paramsd, torqued The params may be based on first or last message of given type (carParams, liveCalibration, liveParameters, liveTorqueParameters) in the logs.

selfdrive.test.process_replay.process_replay.get_process_config(name: str) ProcessConfig[source]
selfdrive.test.process_replay.process_replay.locationd_config_pubsub_callback(params, cfg, lr)[source]
selfdrive.test.process_replay.process_replay.replay_process(cfg: ProcessConfig | Iterable[ProcessConfig], lr: Iterable[type[_DynamicStructReader]], frs: dict[str, BaseFrameReader] = None, fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None, captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False) list[_DynamicStructReader][source]
selfdrive.test.process_replay.process_replay.replay_process_with_name(name: str | Iterable[str], lr: Iterable[type[_DynamicStructReader]], *args, **kwargs) list[_DynamicStructReader][source]
selfdrive.test.process_replay.process_replay.torqued_rcv_callback(msg, cfg, frame)[source]

selfdrive.test.process_replay.regen module

class selfdrive.test.process_replay.regen.DummyFrameReader(w: int, h: int, frame_count: int, pix_val: int)[source]

Bases: BaseFrameReader

get(idx, count=1, pix_fmt='yuv420p')[source]
static zero_dcamera()[source]
selfdrive.test.process_replay.regen.regen_and_save(route: str, sidx: int, processes: str | Iterable[str] = 'all', outdir: str = '/tmp/openpilot/openpilot/selfdrive/test/process_replay/fakedata/', upload: bool = False, use_route_meta: bool = False, disable_tqdm: bool = False, dummy_driver_cam: bool = False) str[source]
selfdrive.test.process_replay.regen.regen_segment(lr: ~collections.abc.Iterable[type[~capnp.lib.capnp._DynamicStructReader]], frs: dict[str, ~typing.Any] = None, processes: ~collections.abc.Iterable[~openpilot.selfdrive.test.process_replay.process_replay.ProcessConfig] = [ProcessConfig(proc_name='controlsd', pubs=['can', 'deviceState', 'pandaStates', 'peripheralState', 'liveCalibration', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman', 'liveParameters', 'radarState', 'modelV2', 'driverCameraState', 'roadCameraState', 'wideRoadCameraState', 'managerState', 'testJoystick', 'liveTorqueParameters', 'accelerometer', 'gyroscope'], subs=['controlsState', 'carState', 'carControl', 'sendcan', 'onroadEvents', 'carParams'], ignore=['logMonoTime', 'controlsState.startMonoTime', 'controlsState.cumLagMs'], config_callback=<function controlsd_config_callback>, init_callback=<function controlsd_fingerprint_callback>, should_recv_callback=<function controlsd_rcv_callback>, tolerance=1e-07, processing_time=0.004, timeout=30, simulation=True, main_pub='can', main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='radard', pubs=['can', 'carState', 'modelV2'], subs=['radarState', 'liveTracks'], ignore=['logMonoTime', 'radarState.cumLagMs'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.MessageBasedRcvCallback object>, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub='can', main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='plannerd', pubs=['modelV2', 'carControl', 'carState', 'controlsState', 'radarState'], subs=['longitudinalPlan', 'uiPlan'], ignore=['logMonoTime', 'longitudinalPlan.processingDelay', 'longitudinalPlan.solverExecutionTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='calibrationd', pubs=['carState', 'cameraOdometry', 'carParams'], subs=['liveCalibration'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=<function calibration_rcv_callback>, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='dmonitoringd', pubs=['driverStateV2', 'liveCalibration', 'carState', 'modelV2', 'controlsState'], subs=['driverMonitoringState'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='locationd', pubs=['cameraOdometry', 'accelerometer', 'gyroscope', 'gpsLocationExternal', 'liveCalibration', 'carState', 'gpsLocation'], subs=['liveLocationKalman'], ignore=['logMonoTime'], config_callback=<function locationd_config_pubsub_callback>, init_callback=None, should_recv_callback=None, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='paramsd', pubs=['liveLocationKalman', 'carState'], subs=['liveParameters'], ignore=['logMonoTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.004, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='ubloxd', pubs=['ubloxRaw'], subs=['ubloxGnss', 'gpsLocationExternal'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=None, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='torqued', pubs=['liveLocationKalman', 'carState', 'carControl'], subs=['liveTorqueParameters'], ignore=['logMonoTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<function torqued_rcv_callback>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='modeld', pubs=['deviceState', 'roadCameraState', 'wideRoadCameraState', 'liveCalibration', 'driverMonitoringState'], subs=['modelV2', 'cameraOdometry'], ignore=['logMonoTime', 'modelV2.frameDropPerc', 'modelV2.modelExecutionTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.ModeldCameraSyncRcvCallback object>, tolerance=1e-07, processing_time=0.02, timeout=30, simulation=True, main_pub='visionipc_camerad_0', main_pub_drained=False, vision_pubs=['roadCameraState', 'wideRoadCameraState'], ignore_alive_pubs=['wideRoadCameraState'], unlocked_pubs=[]), ProcessConfig(proc_name='dmonitoringmodeld', pubs=['liveCalibration', 'driverCameraState'], subs=['driverStateV2'], ignore=['logMonoTime', 'driverStateV2.modelExecutionTime', 'driverStateV2.dspExecutionTime'], config_callback=None, init_callback=None, should_recv_callback=<function dmonitoringmodeld_rcv_callback>, tolerance=1e-07, processing_time=0.02, timeout=30, simulation=True, main_pub='visionipc_camerad_1', main_pub_drained=False, vision_pubs=['driverCameraState'], ignore_alive_pubs=['driverCameraState'], unlocked_pubs=[])], disable_tqdm: bool = False) list[_DynamicStructReader][source]
selfdrive.test.process_replay.regen.setup_data_readers(route: str, sidx: int, use_route_meta: bool, needs_driver_cam: bool = True, needs_road_cam: bool = True, dummy_driver_cam: bool = False) tuple[LogReader, dict[str, Any]][source]

selfdrive.test.process_replay.regen_all module

selfdrive.test.process_replay.regen_all.regen_job(segment, upload, disable_tqdm)[source]

selfdrive.test.process_replay.test_fuzzy module

class selfdrive.test.process_replay.test_fuzzy.TestFuzzProcesses(methodName='runTest')[source]

Bases: TestCase

test_fuzz_process = None
test_fuzz_process_0_radard() None
test_fuzz_process_1_locationd() None
test_fuzz_process_2_ubloxd() None
test_fuzz_process_3_torqued() None

selfdrive.test.process_replay.test_imgproc module

selfdrive.test.process_replay.test_imgproc.imgproc_replay(lr)[source]
selfdrive.test.process_replay.test_imgproc.init_kernels(frame_offset=0)[source]
selfdrive.test.process_replay.test_imgproc.proc_frame(ctx, imgproc_prg, data, rgb=False)[source]

selfdrive.test.process_replay.test_processes module

selfdrive.test.process_replay.test_processes.get_log_data(segment)[source]
selfdrive.test.process_replay.test_processes.run_test_process(data)[source]
selfdrive.test.process_replay.test_processes.test_process(cfg, lr, segment, ref_log_path, new_log_path, ignore_fields=None, ignore_msgs=None)[source]

selfdrive.test.process_replay.test_regen module

class selfdrive.test.process_replay.test_regen.TestRegen(methodName='runTest')[source]

Bases: TestCase

test_engaged = None
test_engaged_0_PRIUS_C2()
selfdrive.test.process_replay.test_regen.ci_setup_data_readers(route, sidx)[source]

selfdrive.test.process_replay.vision_meta module

class selfdrive.test.process_replay.vision_meta.VideoStreamMeta(camera_state, encode_index, stream, dt, frame_sizes)

Bases: tuple

camera_state

Alias for field number 0

dt

Alias for field number 3

encode_index

Alias for field number 1

frame_sizes

Alias for field number 4

stream

Alias for field number 2

selfdrive.test.process_replay.vision_meta.available_streams(lr=None)[source]
selfdrive.test.process_replay.vision_meta.meta_from_camera_state(state)[source]
selfdrive.test.process_replay.vision_meta.meta_from_encode_index(encode_index)[source]
selfdrive.test.process_replay.vision_meta.meta_from_stream_type(stream_type)[source]

Module contents