selfdrive.athena package

Subpackages

Submodules

selfdrive.athena.athenad module

exception selfdrive.athena.athenad.AbortTransferException[source]

Bases: Exception

class selfdrive.athena.athenad.UploadFile(fn: 'str', url: 'str', headers: 'dict[str, str]', allow_cellular: 'bool')[source]

Bases: object

allow_cellular: bool
fn: str
classmethod from_dict(d: dict) UploadFile[source]
headers: dict[str, str]
url: str
class selfdrive.athena.athenad.UploadItem(path: 'str', url: 'str', headers: 'dict[str, str]', created_at: 'int', id: 'str | None', retry_count: 'int' = 0, current: 'bool' = False, progress: 'float' = 0, allow_cellular: 'bool' = False)[source]

Bases: object

allow_cellular: bool = False
created_at: int
current: bool = False
classmethod from_dict(d: dict) UploadItem[source]
headers: dict[str, str]
id: str | None
path: str
progress: float = 0
retry_count: int = 0
url: str
class selfdrive.athena.athenad.UploadQueueCache[source]

Bases: object

static cache(upload_queue: Queue[UploadItem]) None[source]
static initialize(upload_queue: Queue[UploadItem]) None[source]
selfdrive.athena.athenad.backoff(retries: int) int[source]
selfdrive.athena.athenad.cancelUpload(upload_id: str | list[str]) dict[str, int | str][source]
selfdrive.athena.athenad.cb(sm, item, tid, end_event: Event, sz: int, cur: int) None[source]
selfdrive.athena.athenad.getGithubUsername() str[source]
selfdrive.athena.athenad.getMessage(service: str, timeout: int = 1000) dict[source]
selfdrive.athena.athenad.getNetworkMetered() bool[source]
selfdrive.athena.athenad.getNetworkType()[source]
selfdrive.athena.athenad.getNetworks()[source]
selfdrive.athena.athenad.getPublicKey() str | None[source]
selfdrive.athena.athenad.getSimInfo()[source]
selfdrive.athena.athenad.getSshAuthorizedKeys() str[source]
selfdrive.athena.athenad.getVersion() dict[str, str][source]
selfdrive.athena.athenad.get_logs_to_send_sorted() list[str][source]
selfdrive.athena.athenad.handle_long_poll(ws: WebSocket, exit_event: Event | None) None[source]
selfdrive.athena.athenad.jsonrpc_handler(end_event: Event) None[source]
selfdrive.athena.athenad.listDataDirectory(prefix='') list[str][source]
selfdrive.athena.athenad.listUploadQueue() list[dict[str, str | bool | int | float | dict[str, str]]][source]
selfdrive.athena.athenad.log_handler(end_event: Event) None[source]
selfdrive.athena.athenad.main(exit_event: Event = None)[source]
selfdrive.athena.athenad.retry_upload(tid: int, end_event: Event, increase_count: bool = True) None[source]
selfdrive.athena.athenad.scan_dir(path: str, prefix: str) list[str][source]
selfdrive.athena.athenad.setNavDestination(latitude: int = 0, longitude: int = 0, place_name: str = None, place_details: str = None) dict[str, int][source]
selfdrive.athena.athenad.setRouteViewed(route: str) dict[str, int | str][source]
selfdrive.athena.athenad.startLocalProxy(global_end_event: Event, remote_ws_uri: str, local_port: int) dict[str, int][source]
selfdrive.athena.athenad.stat_handler(end_event: Event) None[source]
selfdrive.athena.athenad.strip_bz2_extension(fn: str) str[source]
selfdrive.athena.athenad.takeSnapshot() str | dict[str, str] | None[source]
selfdrive.athena.athenad.uploadFileToUrl(fn: str, url: str, headers: dict[str, str]) dict[str, int | list[dict[str, str | bool | int | float | dict[str, str]]] | list[str]][source]
selfdrive.athena.athenad.uploadFilesToUrls(files_data: list[dict[str, str | int | float | bool]]) dict[str, int | list[dict[str, str | bool | int | float | dict[str, str]]] | list[str]][source]
selfdrive.athena.athenad.upload_handler(end_event: Event) None[source]
selfdrive.athena.athenad.ws_manage(ws: WebSocket, end_event: Event) None[source]
selfdrive.athena.athenad.ws_proxy_recv(ws: WebSocket, local_sock: socket, ssock: socket, end_event: Event, global_end_event: Event) None[source]
selfdrive.athena.athenad.ws_proxy_send(ws: WebSocket, local_sock: socket, signal_sock: socket, end_event: Event) None[source]
selfdrive.athena.athenad.ws_recv(ws: WebSocket, end_event: Event) None[source]
selfdrive.athena.athenad.ws_send(ws: WebSocket, end_event: Event) None[source]

selfdrive.athena.manage_athenad module

selfdrive.athena.manage_athenad.main()[source]

selfdrive.athena.registration module

selfdrive.athena.registration.is_registered_device() bool[source]
selfdrive.athena.registration.register(show_spinner=False) str | None[source]

Module contents