selfdrive.manager package

Subpackages

Submodules

selfdrive.manager.build module

selfdrive.manager.build.build(spinner: Spinner, dirty: bool = False, minimal: bool = False) None[source]

selfdrive.manager.helpers module

selfdrive.manager.helpers.save_bootlog()[source]
selfdrive.manager.helpers.unblock_stdout() None[source]
selfdrive.manager.helpers.write_onroad_params(started, params)[source]

selfdrive.manager.manager module

selfdrive.manager.manager.main() None[source]
selfdrive.manager.manager.manager_cleanup() None[source]
selfdrive.manager.manager.manager_init() None[source]
selfdrive.manager.manager.manager_thread() None[source]

selfdrive.manager.process module

class selfdrive.manager.process.DaemonProcess(name, module, param_name, enabled=True)[source]

Bases: ManagerProcess

Python process that has to stay running across manager restart. This is used for athena so you don’t lose SSH access when restarting manager.

prepare() None[source]
static should_run(started, params, CP)[source]
start() None[source]
stop(retry=True, block=True, sig=None) None[source]
class selfdrive.manager.process.ManagerProcess[source]

Bases: ABC

check_watchdog(started: bool) None[source]
daemon = False
enabled = True
get_process_state_msg()[source]
last_watchdog_time = 0
name = ''
abstract prepare() None[source]
proc: Process | None = None
restart() None[source]
should_run: _StructModule object at 0x7f1343b53f90>], bool]
shutting_down = False
sigkill = False
signal(sig: int) None[source]
abstract start() None[source]
stop(retry: bool = True, block: bool = True, sig: Signals = None) int | None[source]
watchdog_max_dt: int | None = None
watchdog_seen = False
class selfdrive.manager.process.NativeProcess(name, cwd, cmdline, should_run, enabled=True, sigkill=False, watchdog_max_dt=None)[source]

Bases: ManagerProcess

prepare() None[source]
start() None[source]
class selfdrive.manager.process.PythonProcess(name, module, should_run, enabled=True, sigkill=False, watchdog_max_dt=None)[source]

Bases: ManagerProcess

prepare() None[source]
start() None[source]
selfdrive.manager.process.ensure_running(procs: ~collections.abc.ValuesView[~selfdrive.manager.process.ManagerProcess], started: bool, params=None, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90> = None, not_run: list[str] | None = None) list[ManagerProcess][source]
selfdrive.manager.process.join_process(process: Process, timeout: float) None[source]
selfdrive.manager.process.launcher(proc: str, name: str) None[source]
selfdrive.manager.process.nativelauncher(pargs: list[str], cwd: str, name: str) None[source]

selfdrive.manager.process_config module

selfdrive.manager.process_config.always_run(started, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.driverview(started: bool, params: ~common.params_pyx.Params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.iscar(started: bool, params: ~common.params_pyx.Params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.logging(started, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.notcar(started: bool, params: ~common.params_pyx.Params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.only_offroad(started, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.only_onroad(started: bool, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.qcomgps(started, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.ublox(started, params, CP: <capnp.lib.capnp._StructModule object at 0x7f1343b53f90>) bool[source]
selfdrive.manager.process_config.ublox_available() bool[source]

Module contents