Source code for selfdrive.controls.tests.test_cruise_speed

#!/usr/bin/env python3
import itertools
import numpy as np
import unittest

from parameterized import parameterized_class
from cereal import log
from openpilot.selfdrive.controls.lib.drive_helpers import VCruiseHelper, V_CRUISE_MIN, V_CRUISE_MAX, V_CRUISE_INITIAL, IMPERIAL_INCREMENT
from cereal import car
from openpilot.common.conversions import Conversions as CV
from openpilot.selfdrive.test.longitudinal_maneuvers.maneuver import Maneuver

ButtonEvent = car.CarState.ButtonEvent
ButtonType = car.CarState.ButtonEvent.Type


[docs] def run_cruise_simulation(cruise, e2e, personality, t_end=20.): man = Maneuver( '', duration=t_end, initial_speed=max(cruise - 1., 0.0), lead_relevancy=True, initial_distance_lead=100, cruise_values=[cruise], prob_lead_values=[0.0], breakpoints=[0.], e2e=e2e, personality=personality, ) valid, output = man.evaluate() assert valid return output[-1, 3]
[docs] @parameterized_class(("e2e", "personality", "speed"), itertools.product( [True, False], # e2e log.LongitudinalPersonality.schema.enumerants, # personality [5,35])) # speed class TestCruiseSpeed(unittest.TestCase): def test_cruise_speed(self): print(f'Testing {self.speed} m/s') cruise_speed = float(self.speed) simulation_steady_state = run_cruise_simulation(cruise_speed, self.e2e, self.personality) self.assertAlmostEqual(simulation_steady_state, cruise_speed, delta=.01, msg=f'Did not reach {self.speed} m/s')
# TODO: test pcmCruise
[docs] @parameterized_class(('pcm_cruise',), [(False,)]) class TestVCruiseHelper(unittest.TestCase):
[docs] def setUp(self): self.CP = car.CarParams(pcmCruise=self.pcm_cruise) self.v_cruise_helper = VCruiseHelper(self.CP) self.reset_cruise_speed_state()
[docs] def reset_cruise_speed_state(self): # Two resets previous cruise speed for _ in range(2): self.v_cruise_helper.update_v_cruise(car.CarState(cruiseState={"available": False}), enabled=False, is_metric=False)
[docs] def enable(self, v_ego, experimental_mode): # Simulates user pressing set with a current speed self.v_cruise_helper.initialize_v_cruise(car.CarState(vEgo=v_ego), experimental_mode)
def test_adjust_speed(self): """ Asserts speed changes on falling edges of buttons. """ self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) for btn in (ButtonType.accelCruise, ButtonType.decelCruise): for pressed in (True, False): CS = car.CarState(cruiseState={"available": True}) CS.buttonEvents = [ButtonEvent(type=btn, pressed=pressed)] self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) self.assertEqual(pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) def test_rising_edge_enable(self): """ Some car interfaces may enable on rising edge of a button, ensure we don't adjust speed if enabled changes mid-press. """ # NOTE: enabled is always one frame behind the result from button press in controlsd for enabled, pressed in ((False, False), (False, True), (True, False)): CS = car.CarState(cruiseState={"available": True}) CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=pressed)] self.v_cruise_helper.update_v_cruise(CS, enabled=enabled, is_metric=False) if pressed: self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) # Expected diff on enabling. Speed should not change on falling edge of pressed self.assertEqual(not pressed, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) def test_resume_in_standstill(self): """ Asserts we don't increment set speed if user presses resume/accel to exit cruise standstill. """ self.enable(0, False) for standstill in (True, False): for pressed in (True, False): CS = car.CarState(cruiseState={"available": True, "standstill": standstill}) CS.buttonEvents = [ButtonEvent(type=ButtonType.accelCruise, pressed=pressed)] self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) # speed should only update if not at standstill and button falling edge should_equal = standstill or pressed self.assertEqual(should_equal, self.v_cruise_helper.v_cruise_kph == self.v_cruise_helper.v_cruise_kph_last) def test_set_gas_pressed(self): """ Asserts pressing set while enabled with gas pressed sets the speed to the maximum of vEgo and current cruise speed. """ for v_ego in np.linspace(0, 100, 101): self.reset_cruise_speed_state() self.enable(V_CRUISE_INITIAL * CV.KPH_TO_MS, False) # first decrement speed, then perform gas pressed logic expected_v_cruise_kph = self.v_cruise_helper.v_cruise_kph - IMPERIAL_INCREMENT expected_v_cruise_kph = max(expected_v_cruise_kph, v_ego * CV.MS_TO_KPH) # clip to min of vEgo expected_v_cruise_kph = float(np.clip(round(expected_v_cruise_kph, 1), V_CRUISE_MIN, V_CRUISE_MAX)) CS = car.CarState(vEgo=float(v_ego), gasPressed=True, cruiseState={"available": True}) CS.buttonEvents = [ButtonEvent(type=ButtonType.decelCruise, pressed=False)] self.v_cruise_helper.update_v_cruise(CS, enabled=True, is_metric=False) # TODO: fix skipping first run due to enabled on rising edge exception if v_ego == 0.0: continue self.assertEqual(expected_v_cruise_kph, self.v_cruise_helper.v_cruise_kph) def test_initialize_v_cruise(self): """ Asserts allowed cruise speeds on enabling with SET. """ for experimental_mode in (True, False): for v_ego in np.linspace(0, 100, 101): self.reset_cruise_speed_state() self.assertFalse(self.v_cruise_helper.v_cruise_initialized) self.enable(float(v_ego), experimental_mode) self.assertTrue(V_CRUISE_INITIAL <= self.v_cruise_helper.v_cruise_kph <= V_CRUISE_MAX) self.assertTrue(self.v_cruise_helper.v_cruise_initialized)
if __name__ == "__main__": unittest.main()