common package

Subpackages

Submodules

common.basedir module

common.clock module

common.clock.monotonic_time()
common.clock.sec_since_boot()

common.conversions module

class common.conversions.Conversions[source]

Bases: object

DEG_TO_RAD = 0.017453292519943295
KNOTS_TO_MS = 0.5144562197756971
KPH_TO_MPH = 0.621371192237334
KPH_TO_MS = 0.2777777777777778
LB_TO_KG = 0.453592
MPH_TO_KPH = 1.609344
MPH_TO_MS = 0.44704000000000005
MS_TO_KNOTS = 1.9438
MS_TO_KPH = 3.6
MS_TO_MPH = 2.2369362920544025
RAD_TO_DEG = 57.29577951308232

common.dict_helpers module

common.dict_helpers.strip_deprecated_keys(d)[source]

common.ffi_wrapper module

common.ffi_wrapper.compile_code(name, c_code, c_header, directory, cflags='', libraries=None)[source]
common.ffi_wrapper.ffi_wrap(name, c_code, c_header, tmpdir='/tmp/ccache', cflags='', libraries=None)[source]
common.ffi_wrapper.suffix()[source]
common.ffi_wrapper.wrap_compiled(name, directory)[source]

common.file_helpers module

class common.file_helpers.CallbackReader(f, callback, *args)[source]

Bases: object

Wraps a file, but overrides the read method to also call a callback function with the number of bytes read so far.

read(*args, **kwargs)[source]
class common.file_helpers.NamedTemporaryDir(temp_dir=None)[source]

Bases: object

close()[source]
property name
common.file_helpers.atomic_write_in_dir(path, **kwargs)[source]

Creates an atomic writer using a temporary file in the same directory as the destination file.

common.file_helpers.atomic_write_on_fs_tmp(path, **kwargs)[source]

Creates an atomic writer using a temporary file in a temporary directory on the same filesystem as path.

common.file_helpers.get_tmpdir_on_same_filesystem(path)[source]
common.file_helpers.mkdirs_exists_ok(path)[source]
common.file_helpers.rm_not_exists_ok(path)[source]

common.filter_simple module

class common.filter_simple.FirstOrderFilter(x0, rc, dt, initialized=True)[source]

Bases: object

update(x)[source]
update_alpha(rc)[source]

common.gpio module

common.gpio.gpio_init(pin: int, output: bool) None[source]
common.gpio.gpio_read(pin: int) Optional[bool][source]
common.gpio.gpio_set(pin: int, high: bool) None[source]

common.lazy_property module

class common.lazy_property.lazy_property(func)[source]

Bases: object

Defines a property whose value will be computed only once and as needed.

This can only be used on instance methods.

common.logging_extra module

class common.logging_extra.NiceOrderedDict[source]

Bases: OrderedDict

class common.logging_extra.SwagErrorFilter(name='')[source]

Bases: Filter

filter(record)[source]

Determine if the specified record is to be logged.

Returns True if the record should be logged, or False otherwise. If deemed appropriate, the record may be modified in-place.

class common.logging_extra.SwagFormatter(swaglogger)[source]

Bases: Formatter

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

format_dict(record)[source]
class common.logging_extra.SwagLogFileFormatter(swaglogger)[source]

Bases: SwagFormatter

fix_kv(k, v)[source]
format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class common.logging_extra.SwagLogger[source]

Bases: Logger

bind(**kwargs)[source]
bind_global(**kwargs)[source]
ctx(**kwargs)[source]
event(event_name, *args, **kwargs)[source]
findCaller(stack_info=False, stacklevel=1)[source]

Find the stack frame of the caller so that we can note the source file name, line number and function name.

get_ctx()[source]
local_ctx()[source]
timestamp(event_name)[source]
common.logging_extra.json_handler(obj)[source]
common.logging_extra.json_robust_dumps(obj)[source]

common.numpy_fast module

common.numpy_fast.clip(x, lo, hi)[source]
common.numpy_fast.interp(x, xp, fp)[source]
common.numpy_fast.mean(x)[source]

common.numpy_helpers module

common.numpy_helpers.deep_interp_np(x, xp, fp, axis=None)[source]

common.params module

common.params_pyx module

class common.params_pyx.Params

Bases: object

all_keys()
check_key()
clear_all()
get()
get_bool()
get_param_path()
put()

Warning: This function blocks until the param is written to disk! In very rare cases this can take over a second, and your code will hang. Use the put_nonblocking helper function in time sensitive code, but in general try to avoid writing params as much as possible.

put_bool()
remove()
exception common.params_pyx.UnknownKeyName

Bases: Exception

common.params_pyx.ensure_bytes()
common.params_pyx.put_bool_nonblocking()
common.params_pyx.put_nonblocking()

common.profiler module

class common.profiler.Profiler(enabled=False)[source]

Bases: object

checkpoint(name, ignore=False)[source]
display()[source]
reset(enabled=False)[source]

common.realtime module

Utilities for reading real time clocks and keeping soft real time constraints.

class common.realtime.Priority[source]

Bases: object

CTRL_HIGH = 53
CTRL_LOW = 51
class common.realtime.Ratekeeper(rate: float, print_delay_threshold: Optional[float] = 0.0)[source]

Bases: object

property frame: int
keep_time() bool[source]
property lagging: bool
monitor_time() bool[source]
property remaining: float
common.realtime.config_realtime_process(cores: Union[int, List[int]], priority: int) None[source]
common.realtime.set_core_affinity(cores: List[int]) None[source]
common.realtime.set_realtime_priority(level: int) None[source]

common.spinner module

class common.spinner.Spinner[source]

Bases: object

close()[source]
update(spinner_text: str)[source]
update_progress(cur: float, total: float)[source]

common.stat_live module

class common.stat_live.RunningStat(priors=None, max_trackable=-1)[source]

Bases: object

mean()[source]
params_to_save()[source]
push_data(new_data)[source]
reset()[source]
std()[source]
variance()[source]
class common.stat_live.RunningStatFilter(raw_priors=None, filtered_priors=None, max_trackable=-1)[source]

Bases: object

push_and_update(new_data)[source]
reset()[source]

common.text_window module

class common.text_window.TextWindow(text)[source]

Bases: object

close()[source]
get_status()[source]
wait_for_exit()[source]

common.timeout module

class common.timeout.Timeout(seconds, error_msg=None)[source]

Bases: object

Timeout context manager. For example this code will raise a TimeoutException: with Timeout(seconds=5, error_msg=”Sleep was too long”):

time.sleep(10)

handle_timeout(signume, frame)[source]
exception common.timeout.TimeoutException[source]

Bases: Exception

common.window module

common.xattr module

common.xattr.getxattr(path, name, size=128)[source]
common.xattr.listxattr(path, size=128) List[Any][source]
common.xattr.removexattr(path, name) None[source]
common.xattr.setxattr(path, name, value, flags=0) None[source]

Module contents