selfdrive.athena package
Subpackages
- selfdrive.athena.tests package
- Submodules
- selfdrive.athena.tests.helpers module
- selfdrive.athena.tests.test_athenad module
TestAthenadMethods
TestAthenadMethods.setUp()
TestAthenadMethods.setUpClass()
TestAthenadMethods.test_cancelExpiry()
TestAthenadMethods.test_cancelUpload()
TestAthenadMethods.test_do_upload
TestAthenadMethods.test_do_upload_0()
TestAthenadMethods.test_do_upload_1()
TestAthenadMethods.test_echo()
TestAthenadMethods.test_getGithubUsername()
TestAthenadMethods.test_getMessage()
TestAthenadMethods.test_getSshAuthorizedKeys()
TestAthenadMethods.test_getVersion()
TestAthenadMethods.test_get_logs_to_send_sorted()
TestAthenadMethods.test_jsonrpc_handler()
TestAthenadMethods.test_listDataDirectory()
TestAthenadMethods.test_listUploadQueue()
TestAthenadMethods.test_listUploadQueueCurrent()
TestAthenadMethods.test_listUploadQueueEmpty()
TestAthenadMethods.test_startLocalProxy()
TestAthenadMethods.test_strip_bz2_extension()
TestAthenadMethods.test_uploadFileToUrl()
TestAthenadMethods.test_uploadFileToUrl_does_not_exist()
TestAthenadMethods.test_uploadFileToUrl_duplicate()
TestAthenadMethods.test_upload_handler()
TestAthenadMethods.test_upload_handler_retry
TestAthenadMethods.test_upload_handler_retry_0()
TestAthenadMethods.test_upload_handler_retry_1()
TestAthenadMethods.test_upload_handler_timeout()
TestAthenadMethods.test_upload_queue_persistence()
seed_athena_server()
with_upload_handler()
- selfdrive.athena.tests.test_athenad_ping module
- selfdrive.athena.tests.test_registration module
- Module contents
Submodules
selfdrive.athena.athenad module
- class selfdrive.athena.athenad.UploadFile(fn: 'str', url: 'str', headers: 'dict[str, str]', allow_cellular: 'bool')[source]
Bases:
object
- allow_cellular: bool
- fn: str
- classmethod from_dict(d: dict) UploadFile [source]
- headers: dict[str, str]
- url: str
- class selfdrive.athena.athenad.UploadItem(path: 'str', url: 'str', headers: 'dict[str, str]', created_at: 'int', id: 'str | None', retry_count: 'int' = 0, current: 'bool' = False, progress: 'float' = 0, allow_cellular: 'bool' = False)[source]
Bases:
object
- allow_cellular: bool = False
- created_at: int
- current: bool = False
- classmethod from_dict(d: dict) UploadItem [source]
- headers: dict[str, str]
- id: str | None
- path: str
- progress: float = 0
- retry_count: int = 0
- url: str
- class selfdrive.athena.athenad.UploadQueueCache[source]
Bases:
object
- static cache(upload_queue: Queue[UploadItem]) None [source]
- static initialize(upload_queue: Queue[UploadItem]) None [source]
- selfdrive.athena.athenad.listUploadQueue() list[dict[str, str | bool | int | float | dict[str, str]]] [source]
- selfdrive.athena.athenad.retry_upload(tid: int, end_event: Event, increase_count: bool = True) None [source]
- selfdrive.athena.athenad.startLocalProxy(global_end_event: Event, remote_ws_uri: str, local_port: int) dict[str, int] [source]
- selfdrive.athena.athenad.uploadFileToUrl(fn: str, url: str, headers: dict[str, str]) dict[str, int | list[dict[str, str | bool | int | float | dict[str, str]]] | list[str]] [source]
- selfdrive.athena.athenad.uploadFilesToUrls(files_data: list[dict[str, str | int | float | bool]]) dict[str, int | list[dict[str, str | bool | int | float | dict[str, str]]] | list[str]] [source]
- selfdrive.athena.athenad.ws_proxy_recv(ws: WebSocket, local_sock: socket, ssock: socket, end_event: Event, global_end_event: Event) None [source]