selfdrive.test.process_replay package
Submodules
selfdrive.test.process_replay.capture module
selfdrive.test.process_replay.compare_logs module
selfdrive.test.process_replay.migration module
selfdrive.test.process_replay.model_replay module
selfdrive.test.process_replay.process_replay module
- class selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback(trigger_msg_type)[source]
Bases:
object
- class selfdrive.test.process_replay.process_replay.LauncherWithCapture(capture: ProcessOutputCapture, launcher: Callable)[source]
Bases:
object
- class selfdrive.test.process_replay.process_replay.MessageBasedRcvCallback(trigger_msg_type)[source]
Bases:
object
- class selfdrive.test.process_replay.process_replay.ModeldCameraSyncRcvCallback[source]
Bases:
object
- class selfdrive.test.process_replay.process_replay.ProcessConfig(proc_name: str, pubs: list[str], subs: list[str], ignore: list[str], config_callback: collections.abc.Callable | None = None, init_callback: collections.abc.Callable | None = None, should_recv_callback: collections.abc.Callable | None = None, tolerance: float | None = None, processing_time: float = 0.001, timeout: int = 30, simulation: bool = True, main_pub: str | None = None, main_pub_drained: bool = True, vision_pubs: list[str] = <factory>, ignore_alive_pubs: list[str] = <factory>, unlocked_pubs: list[str] = <factory>)[source]
Bases:
object
- config_callback: Callable | None = None
- ignore: list[str]
- ignore_alive_pubs: list[str]
- init_callback: Callable | None = None
- main_pub: str | None = None
- main_pub_drained: bool = True
- proc_name: str
- processing_time: float = 0.001
- pubs: list[str]
- should_recv_callback: Callable | None = None
- simulation: bool = True
- subs: list[str]
- timeout: int = 30
- tolerance: float | None = None
- unlocked_pubs: list[str]
- vision_pubs: list[str]
- class selfdrive.test.process_replay.process_replay.ProcessContainer(cfg: ProcessConfig)[source]
Bases:
object
- property has_empty_queue: bool
- property pubs: list[str]
- run_step(msg: _DynamicStructReader, frs: dict[str, BaseFrameReader] | None) list[_DynamicStructReader] [source]
- start(params_config: dict[str, Any], environ_config: dict[str, Any], all_msgs: Iterable[type[_DynamicStructReader]], frs: dict[str, BaseFrameReader] | None, fingerprint: str | None, capture_output: bool)[source]
- property subs: list[str]
- class selfdrive.test.process_replay.process_replay.ReplayContext(cfg)[source]
Bases:
object
- property all_recv_called_events
- property all_recv_ready_events
- selfdrive.test.process_replay.process_replay.check_openpilot_enabled(msgs: Iterable[type[_DynamicStructReader]]) bool [source]
- selfdrive.test.process_replay.process_replay.controlsd_fingerprint_callback(rc, pm, msgs, fingerprint)[source]
- selfdrive.test.process_replay.process_replay.dmonitoringmodeld_rcv_callback(msg, cfg, frame)[source]
- selfdrive.test.process_replay.process_replay.generate_environ_config(CP=None, fingerprint=None, log_dir=None) dict[str, Any] [source]
- selfdrive.test.process_replay.process_replay.generate_params_config(lr=None, CP=None, fingerprint=None, custom_params=None) dict[str, Any] [source]
- selfdrive.test.process_replay.process_replay.get_car_params_callback(rc, pm, msgs, fingerprint)[source]
- selfdrive.test.process_replay.process_replay.get_custom_params_from_lr(lr: Iterable[type[_DynamicStructReader]], initial_state: str = 'first') dict[str, Any] [source]
Use this to get custom params dict based on provided logs. Useful when replaying following processes: calibrationd, paramsd, torqued The params may be based on first or last message of given type (carParams, liveCalibration, liveParameters, liveTorqueParameters) in the logs.
- selfdrive.test.process_replay.process_replay.get_process_config(name: str) ProcessConfig [source]
- selfdrive.test.process_replay.process_replay.locationd_config_pubsub_callback(params, cfg, lr)[source]
- selfdrive.test.process_replay.process_replay.replay_process(cfg: ProcessConfig | Iterable[ProcessConfig], lr: Iterable[type[_DynamicStructReader]], frs: dict[str, BaseFrameReader] = None, fingerprint: str = None, return_all_logs: bool = False, custom_params: dict[str, Any] = None, captured_output_store: dict[str, dict[str, str]] = None, disable_progress: bool = False) list[_DynamicStructReader] [source]
selfdrive.test.process_replay.regen module
- class selfdrive.test.process_replay.regen.DummyFrameReader(w: int, h: int, frame_count: int, pix_val: int)[source]
Bases:
BaseFrameReader
- selfdrive.test.process_replay.regen.regen_and_save(route: str, sidx: int, processes: str | Iterable[str] = 'all', outdir: str = '/tmp/openpilot/openpilot/selfdrive/test/process_replay/fakedata/', upload: bool = False, use_route_meta: bool = False, disable_tqdm: bool = False, dummy_driver_cam: bool = False) str [source]
- selfdrive.test.process_replay.regen.regen_segment(lr: ~collections.abc.Iterable[type[~capnp.lib.capnp._DynamicStructReader]], frs: dict[str, ~typing.Any] = None, processes: ~collections.abc.Iterable[~openpilot.selfdrive.test.process_replay.process_replay.ProcessConfig] = [ProcessConfig(proc_name='controlsd', pubs=['can', 'deviceState', 'pandaStates', 'peripheralState', 'liveCalibration', 'driverMonitoringState', 'longitudinalPlan', 'liveLocationKalman', 'liveParameters', 'radarState', 'modelV2', 'driverCameraState', 'roadCameraState', 'wideRoadCameraState', 'managerState', 'testJoystick', 'liveTorqueParameters', 'accelerometer', 'gyroscope'], subs=['controlsState', 'carState', 'carControl', 'sendcan', 'onroadEvents', 'carParams'], ignore=['logMonoTime', 'controlsState.startMonoTime', 'controlsState.cumLagMs'], config_callback=<function controlsd_config_callback>, init_callback=<function controlsd_fingerprint_callback>, should_recv_callback=<function controlsd_rcv_callback>, tolerance=1e-07, processing_time=0.004, timeout=30, simulation=True, main_pub='can', main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='radard', pubs=['can', 'carState', 'modelV2'], subs=['radarState', 'liveTracks'], ignore=['logMonoTime', 'radarState.cumLagMs'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.MessageBasedRcvCallback object>, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub='can', main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='plannerd', pubs=['modelV2', 'carControl', 'carState', 'controlsState', 'radarState'], subs=['longitudinalPlan', 'uiPlan'], ignore=['logMonoTime', 'longitudinalPlan.processingDelay', 'longitudinalPlan.solverExecutionTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='calibrationd', pubs=['carState', 'cameraOdometry', 'carParams'], subs=['liveCalibration'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=<function calibration_rcv_callback>, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='dmonitoringd', pubs=['driverStateV2', 'liveCalibration', 'carState', 'modelV2', 'controlsState'], subs=['driverMonitoringState'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='locationd', pubs=['cameraOdometry', 'accelerometer', 'gyroscope', 'gpsLocationExternal', 'liveCalibration', 'carState', 'gpsLocation'], subs=['liveLocationKalman'], ignore=['logMonoTime'], config_callback=<function locationd_config_pubsub_callback>, init_callback=None, should_recv_callback=None, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='paramsd', pubs=['liveLocationKalman', 'carState'], subs=['liveParameters'], ignore=['logMonoTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.FrequencyBasedRcvCallback object>, tolerance=1e-07, processing_time=0.004, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='ubloxd', pubs=['ubloxRaw'], subs=['ubloxGnss', 'gpsLocationExternal'], ignore=['logMonoTime'], config_callback=None, init_callback=None, should_recv_callback=None, tolerance=None, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='torqued', pubs=['liveLocationKalman', 'carState', 'carControl'], subs=['liveTorqueParameters'], ignore=['logMonoTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<function torqued_rcv_callback>, tolerance=1e-07, processing_time=0.001, timeout=30, simulation=True, main_pub=None, main_pub_drained=True, vision_pubs=[], ignore_alive_pubs=[], unlocked_pubs=[]), ProcessConfig(proc_name='modeld', pubs=['deviceState', 'roadCameraState', 'wideRoadCameraState', 'liveCalibration', 'driverMonitoringState'], subs=['modelV2', 'cameraOdometry'], ignore=['logMonoTime', 'modelV2.frameDropPerc', 'modelV2.modelExecutionTime'], config_callback=None, init_callback=<function get_car_params_callback>, should_recv_callback=<openpilot.selfdrive.test.process_replay.process_replay.ModeldCameraSyncRcvCallback object>, tolerance=1e-07, processing_time=0.02, timeout=30, simulation=True, main_pub='visionipc_camerad_0', main_pub_drained=False, vision_pubs=['roadCameraState', 'wideRoadCameraState'], ignore_alive_pubs=['wideRoadCameraState'], unlocked_pubs=[]), ProcessConfig(proc_name='dmonitoringmodeld', pubs=['liveCalibration', 'driverCameraState'], subs=['driverStateV2'], ignore=['logMonoTime', 'driverStateV2.modelExecutionTime', 'driverStateV2.dspExecutionTime'], config_callback=None, init_callback=None, should_recv_callback=<function dmonitoringmodeld_rcv_callback>, tolerance=1e-07, processing_time=0.02, timeout=30, simulation=True, main_pub='visionipc_camerad_1', main_pub_drained=False, vision_pubs=['driverCameraState'], ignore_alive_pubs=['driverCameraState'], unlocked_pubs=[])], disable_tqdm: bool = False) list[_DynamicStructReader] [source]
selfdrive.test.process_replay.regen_all module
selfdrive.test.process_replay.test_fuzzy module
selfdrive.test.process_replay.test_imgproc module
selfdrive.test.process_replay.test_processes module
selfdrive.test.process_replay.test_regen module
selfdrive.test.process_replay.vision_meta module
- class selfdrive.test.process_replay.vision_meta.VideoStreamMeta(camera_state, encode_index, stream, dt, frame_sizes)
Bases:
tuple
- camera_state
Alias for field number 0
- dt
Alias for field number 3
- encode_index
Alias for field number 1
- frame_sizes
Alias for field number 4
- stream
Alias for field number 2