tools.lib package

Subpackages

Submodules

tools.lib.api module

exception tools.lib.api.APIError[source]

Bases: Exception

class tools.lib.api.CommaApi(token=None)[source]

Bases: object

get(endpoint, **kwargs)[source]
post(endpoint, **kwargs)[source]
request(method, endpoint, **kwargs)[source]
exception tools.lib.api.UnauthorizedError[source]

Bases: Exception

tools.lib.auth module

Usage:

usage: auth.py [-h] [{google,apple,github,jwt}] [jwt]

Login to your comma account

positional arguments:
  {google,apple,github,jwt}
  jwt

optional arguments:
  -h, --help            show this help message and exit

Examples:

./auth.py  # Log in with google account
./auth.py github  # Log in with GitHub Account
./auth.py jwt ey......hw  # Log in with a JWT from https://jwt.comma.ai, for use in CI
class tools.lib.auth.ClientRedirectHandler(request, client_address, server)[source]

Bases: BaseHTTPRequestHandler

do_GET()[source]
log_message(*args)[source]

Log an arbitrary message.

This is used by all other logging functions. Override it if you have specific logging wishes.

The first argument, FORMAT, is a format string for the message to be logged. If the format string contains any % escapes requiring parameters, they should be specified as subsequent arguments (it’s just like printf!).

The client ip and current date/time are prefixed to every message.

Unicode control characters are replaced with escaped hex before writing the output to stderr.

class tools.lib.auth.ClientRedirectServer(server_address, RequestHandlerClass, bind_and_activate=True)[source]

Bases: HTTPServer

query_params: dict[str, Any] = {}
tools.lib.auth.login(method)[source]

tools.lib.auth_config module

exception tools.lib.auth_config.MissingAuthConfigError[source]

Bases: Exception

tools.lib.auth_config.clear_token()[source]
tools.lib.auth_config.get_token()[source]
tools.lib.auth_config.set_token(token)[source]

tools.lib.azure_container module

class tools.lib.azure_container.AzureContainer(account, container)[source]

Bases: object

property ACCOUNT_URL: str
property BASE_URL: str
get_client_and_key()[source]
get_url(route_name: str, segment_num, log_type='rlog') str[source]
upload_bytes(data: bytes | IO, blob_name: str) str[source]
upload_file(path: str | PathLike, blob_name: str) str[source]
tools.lib.azure_container.get_azure_credential()[source]
tools.lib.azure_container.get_container_sas(account_name: str, container_name: str)[source]

tools.lib.bootlog module

class tools.lib.bootlog.Bootlog(url: str)[source]

Bases: object

property dongle_id: str
property id: str
property url: str
tools.lib.bootlog.get_bootlog_from_id(bootlog_id: str) Bootlog | None[source]
tools.lib.bootlog.get_bootlogs(dongle_id: str) list[Bootlog][source]

tools.lib.cache module

tools.lib.cache.cache_path_for_file_path(fn, cache_dir='/root/.commacache')[source]

tools.lib.comma_car_segments module

tools.lib.comma_car_segments.get_comma_car_segments_database()[source]
tools.lib.comma_car_segments.get_lfs_file_url(oid, size)[source]
tools.lib.comma_car_segments.get_repo_raw_url(path)[source]
tools.lib.comma_car_segments.get_repo_url(path)[source]
tools.lib.comma_car_segments.get_url(route, segment, file='rlog.bz2')[source]
tools.lib.comma_car_segments.parse_lfs_pointer(text)[source]

tools.lib.exceptions module

exception tools.lib.exceptions.DataUnreadableError[source]

Bases: Exception

tools.lib.filereader module

tools.lib.filereader.FileReader(fn, debug=False)[source]
tools.lib.filereader.file_exists(fn)[source]
tools.lib.filereader.internal_source_available()[source]
tools.lib.filereader.resolve_name(fn)[source]

tools.lib.framereader module

class tools.lib.framereader.BaseFrameReader[source]

Bases: object

close()[source]
get(num, count=1, pix_fmt='yuv420p')[source]
class tools.lib.framereader.DoNothingContextManager[source]

Bases: object

tools.lib.framereader.FrameIterator(fn, pix_fmt, **kwargs)[source]
tools.lib.framereader.FrameReader(fn, cache_dir='/root/.commacache', readahead=False, readbehind=False, index_data=None)[source]
class tools.lib.framereader.FrameType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

h265_stream = 2
raw = 1
tools.lib.framereader.GOPFrameIterator(gop_reader, pix_fmt)[source]
class tools.lib.framereader.GOPFrameReader(readahead=False, readbehind=False)[source]

Bases: BaseFrameReader

close()[source]
get(num, count=1, pix_fmt='yuv420p')[source]
class tools.lib.framereader.GOPReader[source]

Bases: object

get_gop(num)[source]
class tools.lib.framereader.RawData(f)[source]

Bases: object

read(i)[source]
class tools.lib.framereader.RawFrameReader(fn)[source]

Bases: BaseFrameReader

get(num, count=1, pix_fmt='yuv420p')[source]
load_and_debayer(img)[source]
class tools.lib.framereader.StreamFrameReader(fn, frame_type, index_data, readahead=False, readbehind=False)[source]

Bases: StreamGOPReader, GOPFrameReader

class tools.lib.framereader.StreamGOPReader(fn, frame_type, index_data)[source]

Bases: GOPReader

get_gop(num)[source]
class tools.lib.framereader.VideoStreamDecompressor(fn, vid_fmt, w, h, pix_fmt)[source]

Bases: object

read()[source]
write_thread()[source]
tools.lib.framereader.cache_fn(func)[source]
tools.lib.framereader.decompress_video_data(rawdat, vid_fmt, w, h, pix_fmt)[source]
tools.lib.framereader.ffprobe(fn, fmt=None)[source]
tools.lib.framereader.fingerprint_video(fn)[source]
tools.lib.framereader.get_video_index(fn, frame_type, cache_dir='/root/.commacache')[source]
tools.lib.framereader.index_stream(fn, ft)[source]
tools.lib.framereader.read_file_check_size(f, sz, cookie)[source]
tools.lib.framereader.rgb24tonv12(rgb)[source]
tools.lib.framereader.rgb24toyuv(rgb)[source]
tools.lib.framereader.rgb24toyuv420(rgb)[source]

tools.lib.helpers module

class tools.lib.helpers.RE[source]

Bases: object

BOOTLOG_NAME = '(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))'
DONGLE_ID = '(?P<dongle_id>[a-f0-9]{16})'
EXPLORER_FILE = '^(?P<segment_name>(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))(?:--|/)(?P<segment_num>[0-9]+))--(?P<file_name>[a-z]+\\.[a-z0-9]+)$'
INDEX = '-?[0-9]+'
LOG_ID = '(?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10})))'
LOG_ID_V2 = '(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10})'
OP_SEGMENT_DIR = '^(?P<segment_name>(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))(?:--|/)(?P<segment_num>[0-9]+))$'
ROUTE_NAME = '(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))'
SEGMENT_NAME = '(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))(?:--|/)(?P<segment_num>[0-9]+)'
SEGMENT_RANGE = '(?P<route_name>(?P<dongle_id>[a-f0-9]{16})[|_/](?P<log_id>(?:(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})|(?P<count>[a-f0-9]{8})--(?P<uid>[a-z0-9]{10}))))(?:(--|/)(?P<slice>((?P<start>-?[0-9]+)?:?(?P<end>-?[0-9]+)?:?(?P<step>-?[0-9]+)?)))?(?:/(?P<selector>([qras])))?'
SLICE = '(?P<start>-?[0-9]+)?:?(?P<end>-?[0-9]+)?:?(?P<step>-?[0-9]+)?'
TIMESTAMP = '(?P<timestamp>[0-9]{4}-[0-9]{2}-[0-9]{2}--[0-9]{2}-[0-9]{2}-[0-9]{2})'
tools.lib.helpers.save_log(dest, log_msgs, compress=True)[source]
tools.lib.helpers.timestamp_to_datetime(t: str) datetime[source]

Convert an openpilot route timestamp to a python datetime

tools.lib.kbhit module

class tools.lib.kbhit.KBHit[source]

Bases: object

static getarrow() int[source]

Returns an arrow-key code after kbhit() has been called. Codes are 0 : up 1 : right 2 : down 3 : left Should not be called in the same program as getch().

static getch() str[source]

Returns a keyboard character after kbhit() has been called. Should not be called in the same program as getarrow().

static kbhit()[source]

Returns True if keyboard character was hit, False otherwise.

set_kbhit_terminal() None[source]

Save old terminal settings for closure, remove ICANON & ECHO flags.

set_normal_term() None[source]

Resets to normal terminal. On Windows this is a no-op.

tools.lib.live_logreader module

tools.lib.live_logreader.live_logreader(services: list[str] = ['gyroscope', 'gyroscope2', 'accelerometer', 'accelerometer2', 'magnetometer', 'lightSensor', 'temperatureSensor', 'temperatureSensor2', 'gpsNMEA', 'deviceState', 'can', 'controlsState', 'pandaStates', 'peripheralState', 'radarState', 'roadEncodeIdx', 'liveTracks', 'sendcan', 'logMessage', 'errorLogMessage', 'liveCalibration', 'liveTorqueParameters', 'androidLog', 'carState', 'carControl', 'longitudinalPlan', 'procLog', 'gpsLocationExternal', 'gpsLocation', 'ubloxGnss', 'qcomGnss', 'gnssMeasurements', 'clocks', 'ubloxRaw', 'liveLocationKalman', 'liveParameters', 'cameraOdometry', 'thumbnail', 'onroadEvents', 'carParams', 'roadCameraState', 'driverCameraState', 'driverEncodeIdx', 'driverStateV2', 'driverMonitoringState', 'wideRoadEncodeIdx', 'wideRoadCameraState', 'modelV2', 'managerState', 'uploaderState', 'navInstruction', 'navRoute', 'navThumbnail', 'navModel', 'mapRenderState', 'uiPlan', 'qRoadEncodeIdx', 'userFlag', 'microphone', 'uiDebug', 'testJoystick', 'roadEncodeData', 'driverEncodeData', 'wideRoadEncodeData', 'qRoadEncodeData', 'livestreamWideRoadEncodeIdx', 'livestreamRoadEncodeIdx', 'livestreamDriverEncodeIdx', 'livestreamWideRoadEncodeData', 'livestreamRoadEncodeData', 'livestreamDriverEncodeData', 'customReservedRawData0', 'customReservedRawData1', 'customReservedRawData2'], addr: str = '127.0.0.1') Iterable[type[_DynamicStructReader]][source]
tools.lib.live_logreader.raw_live_logreader(services: list[str] = ['gyroscope', 'gyroscope2', 'accelerometer', 'accelerometer2', 'magnetometer', 'lightSensor', 'temperatureSensor', 'temperatureSensor2', 'gpsNMEA', 'deviceState', 'can', 'controlsState', 'pandaStates', 'peripheralState', 'radarState', 'roadEncodeIdx', 'liveTracks', 'sendcan', 'logMessage', 'errorLogMessage', 'liveCalibration', 'liveTorqueParameters', 'androidLog', 'carState', 'carControl', 'longitudinalPlan', 'procLog', 'gpsLocationExternal', 'gpsLocation', 'ubloxGnss', 'qcomGnss', 'gnssMeasurements', 'clocks', 'ubloxRaw', 'liveLocationKalman', 'liveParameters', 'cameraOdometry', 'thumbnail', 'onroadEvents', 'carParams', 'roadCameraState', 'driverCameraState', 'driverEncodeIdx', 'driverStateV2', 'driverMonitoringState', 'wideRoadEncodeIdx', 'wideRoadCameraState', 'modelV2', 'managerState', 'uploaderState', 'navInstruction', 'navRoute', 'navThumbnail', 'navModel', 'mapRenderState', 'uiPlan', 'qRoadEncodeIdx', 'userFlag', 'microphone', 'uiDebug', 'testJoystick', 'roadEncodeData', 'driverEncodeData', 'wideRoadEncodeData', 'qRoadEncodeData', 'livestreamWideRoadEncodeIdx', 'livestreamRoadEncodeIdx', 'livestreamDriverEncodeIdx', 'livestreamWideRoadEncodeData', 'livestreamRoadEncodeData', 'livestreamDriverEncodeData', 'customReservedRawData0', 'customReservedRawData1', 'customReservedRawData2'], addr: str = '127.0.0.1') Iterable[bytes][source]

tools.lib.logreader module

class tools.lib.logreader.LogReader(identifier: str | list[str], default_mode: ~tools.lib.logreader.ReadMode = ReadMode.RLOG, default_source=<function auto_source>, sort_by_time=False, only_union_types=False)[source]

Bases: object

filter(msg_type: str)[source]
first(msg_type: str)[source]
static from_bytes(dat)[source]
reset()[source]
run_across_segments(num_processes, func)[source]
class tools.lib.logreader.ReadMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: StrEnum

AUTO = 'a'
AUTO_INTERACTIVE = 'i'
QLOG = 'q'
RLOG = 'r'
SANITIZED = 's'
tools.lib.logreader.apply_strategy(mode: ~tools.lib.logreader.ReadMode, rlog_paths: list[str | None], qlog_paths: list[str | None], valid_file: ~collections.abc.Callable[[str | None], bool] = <function default_valid_file>) list[str | None][source]
tools.lib.logreader.auto_source(sr: SegmentRange, mode=ReadMode.RLOG) list[str | None][source]
tools.lib.logreader.auto_strategy(rlog_paths: list[str | None], qlog_paths: list[str | None], interactive: bool, valid_file: Callable[[str | None], bool]) list[str | None][source]
tools.lib.logreader.check_source(source: Callable[[SegmentRange, ReadMode], list[str | None]], *args) list[str | None][source]
tools.lib.logreader.comma_api_source(sr: SegmentRange, mode: ReadMode) list[str | None][source]
tools.lib.logreader.comma_car_segments_source(sr: SegmentRange, mode=ReadMode.RLOG) list[str | None][source]
tools.lib.logreader.default_valid_file(fn: str | None) bool[source]
tools.lib.logreader.direct_source(file_or_url: str) list[str | None][source]
tools.lib.logreader.get_invalid_files(files)[source]
tools.lib.logreader.internal_source(sr: SegmentRange, mode: ReadMode) list[str | None][source]
tools.lib.logreader.openpilotci_source(sr: SegmentRange, mode: ReadMode) list[str | None][source]
tools.lib.logreader.parse_cabana(identifier: str)[source]
tools.lib.logreader.parse_direct(identifier: str)[source]
tools.lib.logreader.parse_indirect(identifier: str)[source]
tools.lib.logreader.parse_useradmin(identifier: str)[source]

tools.lib.openpilotci module

tools.lib.openpilotci.get_url(*args, **kwargs)[source]
tools.lib.openpilotci.upload_bytes(*args, **kwargs)[source]
tools.lib.openpilotci.upload_file(*args, **kwargs)[source]

tools.lib.openpilotcontainers module

tools.lib.route module

class tools.lib.route.Route(name, data_dir=None)[source]

Bases: object

camera_paths()[source]
dcamera_paths()[source]
ecamera_paths()[source]
log_paths()[source]
property name
qcamera_paths()[source]
qlog_paths()[source]
property segments
class tools.lib.route.RouteName(name_str: str)[source]

Bases: object

property canonical_name: str
property dongle_id: str
property time_str: str
class tools.lib.route.Segment(name, log_path, qlog_path, camera_path, dcamera_path, ecamera_path, qcamera_path)[source]

Bases: object

property name
class tools.lib.route.SegmentName(name_str: str, allow_route_name=False)[source]

Bases: object

property canonical_name: str
property data_dir: str | None
property dongle_id: str
property route_name: RouteName
property segment_num: int
property time_str: str
class tools.lib.route.SegmentRange(segment_range: str)[source]

Bases: object

property dongle_id: str
property route_name: str
property seg_idxs: list[int]
property selector: str | None
property slice: str
property timestamp: str
tools.lib.route.get_max_seg_number_cached(sr: SegmentRange) int[source]

tools.lib.sanitizer module

tools.lib.sanitizer.sanitize(lr: Iterable[type[_DynamicStructReader]]) Iterable[type[_DynamicStructReader]][source]
tools.lib.sanitizer.sanitize_msg(msg: type[_DynamicStructReader]) type[_DynamicStructReader][source]
tools.lib.sanitizer.sanitize_vin(vin: str)[source]

tools.lib.url_file module

class tools.lib.url_file.URLFile(url: str, timeout: int = 10, debug: bool = False, cache: bool | None = None)[source]

Bases: object

get_length() int[source]
get_length_online() int[source]
property name: str
static pool_manager() PoolManager[source]
read(ll: int | None = None) bytes[source]
read_aux(ll: int | None = None) bytes[source]
static reset() None[source]
seek(pos: int) None[source]
exception tools.lib.url_file.URLFileException[source]

Bases: Exception

tools.lib.url_file.hash_256(link: str) str[source]

tools.lib.vidindex module

class tools.lib.vidindex.HevcNalUnitType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

AUD_NUT = 35
BLA_N_LP = 18
BLA_W_LP = 16
BLA_W_RADL = 17
CRA_NUT = 21
EOB_NUT = 37
EOS_NUT = 36
FD_NUT = 38
IDR_N_LP = 20
IDR_W_RADL = 19
PPS_NUT = 34
PREFIX_SEI_NUT = 39
RADL_N = 6
RADL_R = 7
RASL_N = 8
RASL_R = 9
RSV_IRAP_VCL22 = 22
RSV_IRAP_VCL23 = 23
RSV_NVCL41 = 41
RSV_NVCL42 = 42
RSV_NVCL43 = 43
RSV_NVCL44 = 44
RSV_NVCL45 = 45
RSV_NVCL46 = 46
RSV_NVCL47 = 47
RSV_VCL24 = 24
RSV_VCL25 = 25
RSV_VCL26 = 26
RSV_VCL27 = 27
RSV_VCL28 = 28
RSV_VCL29 = 29
RSV_VCL30 = 30
RSV_VCL31 = 31
RSV_VCL_N10 = 10
RSV_VCL_N12 = 12
RSV_VCL_N14 = 14
RSV_VCL_R11 = 11
RSV_VCL_R13 = 13
RSV_VCL_R15 = 15
SPS_NUT = 33
STSA_N = 4
STSA_R = 5
SUFFIX_SEI_NUT = 40
TRAIL_N = 0
TRAIL_R = 1
TSA_N = 2
TSA_R = 3
UNSPEC48 = 48
UNSPEC49 = 49
UNSPEC50 = 50
UNSPEC51 = 51
UNSPEC52 = 52
UNSPEC53 = 53
UNSPEC54 = 54
UNSPEC55 = 55
UNSPEC56 = 56
UNSPEC57 = 57
UNSPEC58 = 58
UNSPEC59 = 59
UNSPEC60 = 60
UNSPEC61 = 61
UNSPEC62 = 62
UNSPEC63 = 63
VPS_NUT = 32
exception tools.lib.vidindex.VideoFileInvalid[source]

Bases: Exception

tools.lib.vidindex.get_hevc_nal_unit_length(dat: bytes, nal_unit_start: int) int[source]
tools.lib.vidindex.get_hevc_nal_unit_type(dat: bytes, nal_unit_start: int) HevcNalUnitType[source]
tools.lib.vidindex.get_hevc_slice_type(dat: bytes, nal_unit_start: int, nal_unit_type: HevcNalUnitType) tuple[int, bool][source]
tools.lib.vidindex.get_ue(dat: bytes, start_idx: int, skip_bits: int) tuple[int, int][source]
tools.lib.vidindex.hevc_index(hevc_file_name: str, allow_corrupt: bool = False) tuple[list, int, bytes][source]
tools.lib.vidindex.main() None[source]
tools.lib.vidindex.require_nal_unit_start(dat: bytes, nal_unit_start: int) None[source]

Module contents