Source code for common.tests.test_params

import os
import threading
import time
import uuid
import unittest

from openpilot.common.params import Params, ParamKeyType, UnknownKeyName

[docs] class TestParams(unittest.TestCase):
[docs] def setUp(self): self.params = Params()
[docs] def test_params_put_and_get(self): self.params.put("DongleId", "cb38263377b873ee") assert self.params.get("DongleId") == b"cb38263377b873ee"
[docs] def test_params_non_ascii(self): st = b"\xe1\x90\xff" self.params.put("CarParams", st) assert self.params.get("CarParams") == st
[docs] def test_params_get_cleared_manager_start(self): self.params.put("CarParams", "test") self.params.put("DongleId", "cb38263377b873ee") assert self.params.get("CarParams") == b"test" undefined_param = self.params.get_param_path(uuid.uuid4().hex) with open(undefined_param, "w") as f: f.write("test") assert os.path.isfile(undefined_param) self.params.clear_all(ParamKeyType.CLEAR_ON_MANAGER_START) assert self.params.get("CarParams") is None assert self.params.get("DongleId") is not None assert not os.path.isfile(undefined_param)
[docs] def test_params_two_things(self): self.params.put("DongleId", "bob") self.params.put("AthenadPid", "123") assert self.params.get("DongleId") == b"bob" assert self.params.get("AthenadPid") == b"123"
[docs] def test_params_get_block(self): def _delayed_writer(): time.sleep(0.1) self.params.put("CarParams", "test") threading.Thread(target=_delayed_writer).start() assert self.params.get("CarParams") is None assert self.params.get("CarParams", True) == b"test"
[docs] def test_params_unknown_key_fails(self): with self.assertRaises(UnknownKeyName): self.params.get("swag") with self.assertRaises(UnknownKeyName): self.params.get_bool("swag") with self.assertRaises(UnknownKeyName): self.params.put("swag", "abc") with self.assertRaises(UnknownKeyName): self.params.put_bool("swag", True)
[docs] def test_remove_not_there(self): assert self.params.get("CarParams") is None self.params.remove("CarParams") assert self.params.get("CarParams") is None
[docs] def test_get_bool(self): self.params.remove("IsMetric") self.assertFalse(self.params.get_bool("IsMetric")) self.params.put_bool("IsMetric", True) self.assertTrue(self.params.get_bool("IsMetric")) self.params.put_bool("IsMetric", False) self.assertFalse(self.params.get_bool("IsMetric")) self.params.put("IsMetric", "1") self.assertTrue(self.params.get_bool("IsMetric")) self.params.put("IsMetric", "0") self.assertFalse(self.params.get_bool("IsMetric"))
[docs] def test_put_non_blocking_with_get_block(self): q = Params() def _delayed_writer(): time.sleep(0.1) Params().put_nonblocking("CarParams", "test") threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"test"
[docs] def test_put_bool_non_blocking_with_get_block(self): q = Params() def _delayed_writer(): time.sleep(0.1) Params().put_bool_nonblocking("CarParams", True) threading.Thread(target=_delayed_writer).start() assert q.get("CarParams") is None assert q.get("CarParams", True) == b"1"
[docs] def test_params_all_keys(self): keys = Params().all_keys() # sanity checks assert len(keys) > 20 assert len(keys) == len(set(keys)) assert b"CarParams" in keys
if __name__ == "__main__": unittest.main()