Source code for system.sensord.tests.test_sensord

#!/usr/bin/env python3
import os
import pytest
import time
import unittest
import numpy as np
from collections import namedtuple, defaultdict

import cereal.messaging as messaging
from cereal import log
from cereal.services import SERVICE_LIST
from openpilot.common.gpio import get_irqs_for_action
from openpilot.common.timeout import Timeout
from openpilot.selfdrive.manager.process_config import managed_processes

BMX = {
  ('bmx055', 'acceleration'),
  ('bmx055', 'gyroUncalibrated'),
  ('bmx055', 'magneticUncalibrated'),
  ('bmx055', 'temperature'),
}

LSM = {
  ('lsm6ds3', 'acceleration'),
  ('lsm6ds3', 'gyroUncalibrated'),
  ('lsm6ds3', 'temperature'),
}
LSM_C = {(x[0]+'trc', x[1]) for x in LSM}

MMC = {
  ('mmc5603nj', 'magneticUncalibrated'),
}

SENSOR_CONFIGURATIONS = (
  (BMX | LSM),
  (MMC | LSM),
  (BMX | LSM_C),
  (MMC| LSM_C),
)

Sensor = log.SensorEventData.SensorSource
SensorConfig = namedtuple('SensorConfig', ['type', 'sanity_min', 'sanity_max'])
ALL_SENSORS = {
  Sensor.lsm6ds3: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.lsm6ds3trc: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.bmx055: {
    SensorConfig("acceleration", 5, 15),
    SensorConfig("gyroUncalibrated", 0, .2),
    SensorConfig("magneticUncalibrated", 0, 300),
    SensorConfig("temperature", 0, 60),
  },

  Sensor.mmc5603nj: {
    SensorConfig("magneticUncalibrated", 0, 300),
  }
}


[docs] def get_irq_count(irq: int): with open(f"/sys/kernel/irq/{irq}/per_cpu_count") as f: per_cpu = map(int, f.read().split(",")) return sum(per_cpu)
[docs] def read_sensor_events(duration_sec): sensor_types = ['accelerometer', 'gyroscope', 'magnetometer', 'accelerometer2', 'gyroscope2', 'temperatureSensor', 'temperatureSensor2'] socks = {} poller = messaging.Poller() events = defaultdict(list) for stype in sensor_types: socks[stype] = messaging.sub_sock(stype, poller=poller, timeout=100) # wait for sensors to come up with Timeout(int(os.environ.get("SENSOR_WAIT", "5")), "sensors didn't come up"): while len(poller.poll(250)) == 0: pass time.sleep(1) for s in socks.values(): messaging.drain_sock_raw(s) st = time.monotonic() while time.monotonic() - st < duration_sec: for s in socks: events[s] += messaging.drain_sock(socks[s]) time.sleep(0.1) assert sum(map(len, events.values())) != 0, "No sensor events collected!" return {k: v for k, v in events.items() if len(v) > 0}
[docs] @pytest.mark.tici class TestSensord(unittest.TestCase):
[docs] @classmethod def setUpClass(cls): # enable LSM self test os.environ["LSM_SELF_TEST"] = "1" # read initial sensor values every test case can use os.system("pkill -f \\\\./sensord") try: managed_processes["sensord"].start() cls.sample_secs = int(os.getenv("SAMPLE_SECS", "10")) cls.events = read_sensor_events(cls.sample_secs) # determine sensord's irq cls.sensord_irq = get_irqs_for_action("sensord")[0] finally: # teardown won't run if this doesn't succeed managed_processes["sensord"].stop()
[docs] @classmethod def tearDownClass(cls): managed_processes["sensord"].stop()
[docs] def tearDown(self): managed_processes["sensord"].stop()
[docs] def test_sensors_present(self): # verify correct sensors configuration seen = set() for etype in self.events: for measurement in self.events[etype]: m = getattr(measurement, measurement.which()) seen.add((str(m.source), m.which())) self.assertIn(seen, SENSOR_CONFIGURATIONS)
[docs] def test_lsm6ds3_timing(self): # verify measurements are sampled and published at 104Hz sensor_t = { 1: [], # accel 5: [], # gyro } for measurement in self.events['accelerometer']: m = getattr(measurement, measurement.which()) sensor_t[m.sensor].append(m.timestamp) for measurement in self.events['gyroscope']: m = getattr(measurement, measurement.which()) sensor_t[m.sensor].append(m.timestamp) for s, vals in sensor_t.items(): with self.subTest(sensor=s): assert len(vals) > 0 tdiffs = np.diff(vals) / 1e6 # millis high_delay_diffs = list(filter(lambda d: d >= 20., tdiffs)) assert len(high_delay_diffs) < 15, f"Too many large diffs: {high_delay_diffs}" avg_diff = sum(tdiffs)/len(tdiffs) avg_freq = 1. / (avg_diff * 1e-3) assert 92. < avg_freq < 114., f"avg freq {avg_freq}Hz wrong, expected 104Hz" stddev = np.std(tdiffs) assert stddev < 2.0, f"Standard-dev to big {stddev}"
[docs] def test_sensor_frequency(self): for s, msgs in self.events.items(): with self.subTest(sensor=s): freq = len(msgs) / self.sample_secs ef = SERVICE_LIST[s].frequency assert ef*0.85 <= freq <= ef*1.15
[docs] def test_logmonottime_timestamp_diff(self): # ensure diff between the message logMonotime and sample timestamp is small tdiffs = list() for etype in self.events: for measurement in self.events[etype]: m = getattr(measurement, measurement.which()) # check if gyro and accel timestamps are before logMonoTime if str(m.source).startswith("lsm6ds3") and m.which() != 'temperature': err_msg = f"Timestamp after logMonoTime: {m.timestamp} > {measurement.logMonoTime}" assert m.timestamp < measurement.logMonoTime, err_msg # negative values might occur, as non interrupt packages created # before the sensor is read tdiffs.append(abs(measurement.logMonoTime - m.timestamp) / 1e6) # some sensors have a read procedure that will introduce an expected diff on the order of 20ms high_delay_diffs = set(filter(lambda d: d >= 25., tdiffs)) assert len(high_delay_diffs) < 20, f"Too many measurements published: {high_delay_diffs}" avg_diff = round(sum(tdiffs)/len(tdiffs), 4) assert avg_diff < 4, f"Avg packet diff: {avg_diff:.1f}ms"
[docs] def test_sensor_values(self): sensor_values = dict() for etype in self.events: for measurement in self.events[etype]: m = getattr(measurement, measurement.which()) key = (m.source.raw, m.which()) values = getattr(m, m.which()) if hasattr(values, 'v'): values = values.v values = np.atleast_1d(values) if key in sensor_values: sensor_values[key].append(values) else: sensor_values[key] = [values] # Sanity check sensor values for sensor, stype in sensor_values: for s in ALL_SENSORS[sensor]: if s.type != stype: continue key = (sensor, s.type) mean_norm = np.mean(np.linalg.norm(sensor_values[key], axis=1)) err_msg = f"Sensor '{sensor} {s.type}' failed sanity checks {mean_norm} is not between {s.sanity_min} and {s.sanity_max}" assert s.sanity_min <= mean_norm <= s.sanity_max, err_msg
[docs] def test_sensor_verify_no_interrupts_after_stop(self): managed_processes["sensord"].start() time.sleep(3) # read /proc/interrupts to verify interrupts are received state_one = get_irq_count(self.sensord_irq) time.sleep(1) state_two = get_irq_count(self.sensord_irq) error_msg = f"no interrupts received after sensord start!\n{state_one} {state_two}" assert state_one != state_two, error_msg managed_processes["sensord"].stop() time.sleep(1) # read /proc/interrupts to verify no more interrupts are received state_one = get_irq_count(self.sensord_irq) time.sleep(1) state_two = get_irq_count(self.sensord_irq) assert state_one == state_two, "Interrupts received after sensord stop!"
if __name__ == "__main__": unittest.main()