common package

Subpackages

Submodules

common.basedir module

common.conversions module

class common.conversions.Conversions[source]

Bases: object

DEG_TO_RAD = 0.017453292519943295
KNOTS_TO_MS = 0.5144562197756971
KPH_TO_MPH = 0.621371192237334
KPH_TO_MS = 0.2777777777777778
LB_TO_KG = 0.453592
MPH_TO_KPH = 1.609344
MPH_TO_MS = 0.44704000000000005
MS_TO_KNOTS = 1.9438
MS_TO_KPH = 3.6
MS_TO_MPH = 2.2369362920544025
RAD_TO_DEG = 57.29577951308232

common.dict_helpers module

common.dict_helpers.strip_deprecated_keys(d)[source]

common.ffi_wrapper module

common.ffi_wrapper.suffix()[source]

common.file_helpers module

class common.file_helpers.CallbackReader(f, callback, *args)[source]

Bases: object

Wraps a file, but overrides the read method to also call a callback function with the number of bytes read so far.

read(*args, **kwargs)[source]
common.file_helpers.atomic_write_in_dir(path: str, mode: str = 'w', buffering: int = -1, encoding: str = None, newline: str = None, overwrite: bool = False)[source]

Write to a file atomically using a temporary file in the same directory as the destination file.

common.filter_simple module

class common.filter_simple.FirstOrderFilter(x0, rc, dt, initialized=True)[source]

Bases: object

update(x)[source]
update_alpha(rc)[source]

common.git module

common.git.get_branch(cwd: str = None) str[source]
common.git.get_commit(cwd: str = None, branch: str = 'HEAD') str[source]
common.git.get_commit_date(cwd: str = None, commit: str = 'HEAD') str[source]
common.git.get_normalized_origin(cwd: str = None) str[source]
common.git.get_origin(cwd: str = None) str[source]
common.git.get_short_branch(cwd: str = None) str[source]

common.gpio module

common.gpio.get_irq_action(irq: int) list[str][source]
common.gpio.get_irqs_for_action(action: str) list[str][source]
common.gpio.gpio_export(pin: int) None[source]
common.gpio.gpio_init(pin: int, output: bool) None[source]
common.gpio.gpio_read(pin: int) bool | None[source]
common.gpio.gpio_set(pin: int, high: bool) None[source]

common.logging_extra module

class common.logging_extra.NiceOrderedDict[source]

Bases: OrderedDict

class common.logging_extra.SwagErrorFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class common.logging_extra.SwagFormatter(swaglogger)[source]

Bases: Formatter

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

format_dict(record)[source]
class common.logging_extra.SwagLogFileFormatter(swaglogger)[source]

Bases: SwagFormatter

fix_kv(k, v)[source]
format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class common.logging_extra.SwagLogger[source]

Bases: Logger

bind(**kwargs)[source]
bind_global(**kwargs)[source]
ctx(**kwargs)[source]
event(event, *args, **kwargs)[source]
findCaller(stack_info=False, stacklevel=1)[source]

Find the stack frame of the caller so that we can note the source file name, line number and function name.

get_ctx()[source]
local_ctx()[source]
timestamp(event_name)[source]
common.logging_extra.json_handler(obj)[source]
common.logging_extra.json_robust_dumps(obj)[source]

common.numpy_fast module

common.numpy_fast.clip(x, lo, hi)[source]
common.numpy_fast.interp(x, xp, fp)[source]
common.numpy_fast.mean(x)[source]

common.params module

common.params_pyx module

class common.params_pyx.ParamKeyType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: IntFlag

ALL = 4294967295
CLEAR_ON_MANAGER_START = 4
CLEAR_ON_OFFROAD_TRANSITION = 16
CLEAR_ON_ONROAD_TRANSITION = 8
DEVELOPMENT_ONLY = 64
PERSISTENT = 2
class common.params_pyx.Params

Bases: object

all_keys()
check_key(key)
clear_all(tx_type=<ParamKeyType.ALL: 4294967295>)
get(key, block=False, encoding=None)
get_bool(key, block=False)
get_param_path(key='')
put(key, dat)

Warning: This function blocks until the param is written to disk! In very rare cases this can take over a second, and your code will hang. Use the put_nonblocking, put_bool_nonblocking in time sensitive code, but in general try to avoid writing params as much as possible.

put_bool(key, val)
put_bool_nonblocking(key, val)
put_nonblocking(key, dat)
remove(key)
exception common.params_pyx.UnknownKeyName

Bases: Exception

common.params_pyx.ensure_bytes(v)

common.prefix module

class common.prefix.OpenpilotPrefix(prefix: str = None, clean_dirs_on_exit: bool = True, shared_download_cache: bool = False)[source]

Bases: object

clean_dirs()[source]

common.realtime module

Utilities for reading real time clocks and keeping soft real time constraints.

class common.realtime.Priority[source]

Bases: object

CTRL_HIGH = 53
CTRL_LOW = 51
class common.realtime.Ratekeeper(rate: float, print_delay_threshold: float | None = 0.0)[source]

Bases: object

property frame: int
keep_time() bool[source]
property lagging: bool
monitor_time() bool[source]
property remaining: float
common.realtime.config_realtime_process(cores: int | list[int], priority: int) None[source]
common.realtime.set_core_affinity(cores: list[int]) None[source]
common.realtime.set_realtime_priority(level: int) None[source]

common.retry module

common.retry.retry(attempts=3, delay=1.0, ignore_failure=False)[source]

common.run module

common.run.run_cmd(cmd: list[str], cwd=None, env=None) str[source]
common.run.run_cmd_default(cmd: list[str], default: str = '', cwd=None, env=None) str[source]

common.simple_kalman module

class common.simple_kalman.KF1D(x0, A, C, K)[source]

Bases: object

set_x(x)[source]
update(meas)[source]
property x
common.simple_kalman.get_kalman_gain(dt, A, C, Q, R, iterations=100)[source]

common.spinner module

class common.spinner.Spinner[source]

Bases: object

close()[source]
update(spinner_text: str)[source]
update_progress(cur: float, total: float)[source]

common.stat_live module

class common.stat_live.RunningStat(priors=None, max_trackable=-1)[source]

Bases: object

mean()[source]
params_to_save()[source]
push_data(new_data)[source]
reset()[source]
std()[source]
variance()[source]
class common.stat_live.RunningStatFilter(raw_priors=None, filtered_priors=None, max_trackable=-1)[source]

Bases: object

push_and_update(new_data)[source]
reset()[source]

common.swaglog module

class common.swaglog.SwaglogRotatingFileHandler(base_filename, interval=60, max_bytes=262144, backup_count=2500, encoding=None)[source]

Bases: BaseRotatingHandler

doRollover()[source]
get_existing_logfiles()[source]
shouldRollover(record)[source]
class common.swaglog.UnixDomainSocketHandler(formatter)[source]

Bases: Handler

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

connect()[source]
emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

common.swaglog.add_file_handler(log)[source]

Function to add the file log handler to swaglog. This can be used to store logs when logmessaged is not running.

common.swaglog.get_file_handler()[source]

common.text_window module

class common.text_window.TextWindow(text)[source]

Bases: object

close()[source]
get_status()[source]
wait_for_exit()[source]

common.time module

common.time.min_date()[source]
common.time.system_time_valid()[source]

common.timeout module

class common.timeout.Timeout(seconds, error_msg=None)[source]

Bases: object

Timeout context manager. For example this code will raise a TimeoutException: with Timeout(seconds=5, error_msg=”Sleep was too long”):

time.sleep(10)

handle_timeout(signume, frame)[source]
exception common.timeout.TimeoutException[source]

Bases: Exception

common.utils module

class common.utils.Freezable[source]

Bases: object

freeze()[source]
common.utils.cache(user_function: Callable[[...], _RT], /) Callable[[...], _RT][source]

Module contents