Source code for panda.tests.safety.test_ford

#!/usr/bin/env python3
import numpy as np
import random
import unittest

import panda.tests.safety.common as common

from panda import Panda
from panda.tests.libpanda import libpanda_py
from panda.tests.safety.common import CANPackerPanda

MSG_EngBrakeData = 0x165           # RX from PCM, for driver brake pedal and cruise state
MSG_EngVehicleSpThrottle = 0x204   # RX from PCM, for driver throttle input
MSG_BrakeSysFeatures = 0x415       # RX from ABS, for vehicle speed
MSG_EngVehicleSpThrottle2 = 0x202  # RX from PCM, for second vehicle speed
MSG_Yaw_Data_FD1 = 0x91            # RX from RCM, for yaw rate
MSG_Steering_Data_FD1 = 0x083      # TX by OP, various driver switches and LKAS/CC buttons
MSG_ACCDATA = 0x186                # TX by OP, ACC controls
MSG_ACCDATA_3 = 0x18A              # TX by OP, ACC/TJA user interface
MSG_Lane_Assist_Data1 = 0x3CA      # TX by OP, Lane Keep Assist
MSG_LateralMotionControl = 0x3D3   # TX by OP, Lateral Control message
MSG_LateralMotionControl2 = 0x3D6  # TX by OP, alternate Lateral Control message
MSG_IPMA_Data = 0x3D8              # TX by OP, IPMA and LKAS user interface


[docs] def checksum(msg): addr, t, dat, bus = msg ret = bytearray(dat) if addr == MSG_Yaw_Data_FD1: chksum = dat[0] + dat[1] # VehRol_W_Actl chksum += dat[2] + dat[3] # VehYaw_W_Actl chksum += dat[5] # VehRollYaw_No_Cnt chksum += dat[6] >> 6 # VehRolWActl_D_Qf chksum += (dat[6] >> 4) & 0x3 # VehYawWActl_D_Qf chksum = 0xff - (chksum & 0xff) ret[4] = chksum elif addr == MSG_BrakeSysFeatures: chksum = dat[0] + dat[1] # Veh_V_ActlBrk chksum += (dat[2] >> 2) & 0xf # VehVActlBrk_No_Cnt chksum += dat[2] >> 6 # VehVActlBrk_D_Qf chksum = 0xff - (chksum & 0xff) ret[3] = chksum elif addr == MSG_EngVehicleSpThrottle2: chksum = (dat[2] >> 3) & 0xf # VehVActlEng_No_Cnt chksum += (dat[4] >> 5) & 0x3 # VehVActlEng_D_Qf chksum += dat[6] + dat[7] # Veh_V_ActlEng chksum = 0xff - (chksum & 0xff) ret[1] = chksum return addr, t, ret, bus
[docs] class Buttons: CANCEL = 0 RESUME = 1 TJA_TOGGLE = 2
# Ford safety has four different configurations tested here: # * CAN with stock longitudinal # * CAN with openpilot longitudinal # * CAN FD with stock longitudinal # * CAN FD with openpilot longitudinal
[docs] class TestFordSafetyBase(common.PandaCarSafetyTest): STANDSTILL_THRESHOLD = 1 RELAY_MALFUNCTION_ADDRS = {0: (MSG_ACCDATA_3, MSG_Lane_Assist_Data1, MSG_LateralMotionControl, MSG_LateralMotionControl2, MSG_IPMA_Data)} FWD_BLACKLISTED_ADDRS = {2: [MSG_ACCDATA_3, MSG_Lane_Assist_Data1, MSG_LateralMotionControl, MSG_LateralMotionControl2, MSG_IPMA_Data]} FWD_BUS_LOOKUP = {0: 2, 2: 0} # Max allowed delta between car speeds MAX_SPEED_DELTA = 2.0 # m/s STEER_MESSAGE = 0 # Curvature control limits DEG_TO_CAN = 50000 # 1 / (2e-5) rad to can MAX_CURVATURE = 0.02 MAX_CURVATURE_ERROR = 0.002 CURVATURE_ERROR_MIN_SPEED = 10.0 # m/s ANGLE_RATE_BP = [5., 25., 25.] ANGLE_RATE_UP = [0.0002, 0.0001, 0.0001] # windup limit ANGLE_RATE_DOWN = [0.000225, 0.00015, 0.00015] # unwind limit cnt_speed = 0 cnt_speed_2 = 0 cnt_yaw_rate = 0 packer: CANPackerPanda safety: libpanda_py.Panda
[docs] @classmethod def setUpClass(cls): if cls.__name__ == "TestFordSafetyBase": raise unittest.SkipTest
def _set_prev_desired_angle(self, t): t = round(t * self.DEG_TO_CAN) self.safety.set_desired_angle_last(t) def _reset_curvature_measurement(self, curvature, speed): for _ in range(6): self._rx(self._speed_msg(speed)) self._rx(self._yaw_rate_msg(curvature, speed)) # Driver brake pedal def _user_brake_msg(self, brake: bool): # brake pedal and cruise state share same message, so we have to send # the other signal too enable = self.safety.get_controls_allowed() values = { "BpedDrvAppl_D_Actl": 2 if brake else 1, "CcStat_D_Actl": 5 if enable else 0, } return self.packer.make_can_msg_panda("EngBrakeData", 0, values) # ABS vehicle speed def _speed_msg(self, speed: float, quality_flag=True): values = {"Veh_V_ActlBrk": speed * 3.6, "VehVActlBrk_D_Qf": 3 if quality_flag else 0, "VehVActlBrk_No_Cnt": self.cnt_speed % 16} self.__class__.cnt_speed += 1 return self.packer.make_can_msg_panda("BrakeSysFeatures", 0, values, fix_checksum=checksum) # PCM vehicle speed def _speed_msg_2(self, speed: float, quality_flag=True): values = {"Veh_V_ActlEng": speed * 3.6, "VehVActlEng_D_Qf": 3 if quality_flag else 0, "VehVActlEng_No_Cnt": self.cnt_speed_2 % 16} self.__class__.cnt_speed_2 += 1 return self.packer.make_can_msg_panda("EngVehicleSpThrottle2", 0, values, fix_checksum=checksum) # Standstill state def _vehicle_moving_msg(self, speed: float): values = {"VehStop_D_Stat": 1 if speed <= self.STANDSTILL_THRESHOLD else random.choice((0, 2, 3))} return self.packer.make_can_msg_panda("DesiredTorqBrk", 0, values) # Current curvature def _yaw_rate_msg(self, curvature: float, speed: float, quality_flag=True): values = {"VehYaw_W_Actl": curvature * speed, "VehYawWActl_D_Qf": 3 if quality_flag else 0, "VehRollYaw_No_Cnt": self.cnt_yaw_rate % 256} self.__class__.cnt_yaw_rate += 1 return self.packer.make_can_msg_panda("Yaw_Data_FD1", 0, values, fix_checksum=checksum) # Drive throttle input def _user_gas_msg(self, gas: float): values = {"ApedPos_Pc_ActlArb": gas} return self.packer.make_can_msg_panda("EngVehicleSpThrottle", 0, values) # Cruise status def _pcm_status_msg(self, enable: bool): # brake pedal and cruise state share same message, so we have to send # the other signal too brake = self.safety.get_brake_pressed_prev() values = { "BpedDrvAppl_D_Actl": 2 if brake else 1, "CcStat_D_Actl": 5 if enable else 0, } return self.packer.make_can_msg_panda("EngBrakeData", 0, values) # LKAS command def _lkas_command_msg(self, action: int): values = { "LkaActvStats_D2_Req": action, } return self.packer.make_can_msg_panda("Lane_Assist_Data1", 0, values) # LCA command def _lat_ctl_msg(self, enabled: bool, path_offset: float, path_angle: float, curvature: float, curvature_rate: float): if self.STEER_MESSAGE == MSG_LateralMotionControl: values = { "LatCtl_D_Rq": 1 if enabled else 0, "LatCtlPathOffst_L_Actl": path_offset, # Path offset [-5.12|5.11] meter "LatCtlPath_An_Actl": path_angle, # Path angle [-0.5|0.5235] radians "LatCtlCurv_NoRate_Actl": curvature_rate, # Curvature rate [-0.001024|0.00102375] 1/meter^2 "LatCtlCurv_No_Actl": curvature, # Curvature [-0.02|0.02094] 1/meter } return self.packer.make_can_msg_panda("LateralMotionControl", 0, values) elif self.STEER_MESSAGE == MSG_LateralMotionControl2: values = { "LatCtl_D2_Rq": 1 if enabled else 0, "LatCtlPathOffst_L_Actl": path_offset, # Path offset [-5.12|5.11] meter "LatCtlPath_An_Actl": path_angle, # Path angle [-0.5|0.5235] radians "LatCtlCrv_NoRate2_Actl": curvature_rate, # Curvature rate [-0.001024|0.001023] 1/meter^2 "LatCtlCurv_No_Actl": curvature, # Curvature [-0.02|0.02094] 1/meter } return self.packer.make_can_msg_panda("LateralMotionControl2", 0, values) # Cruise control buttons def _acc_button_msg(self, button: int, bus: int): values = { "CcAslButtnCnclPress": 1 if button == Buttons.CANCEL else 0, "CcAsllButtnResPress": 1 if button == Buttons.RESUME else 0, "TjaButtnOnOffPress": 1 if button == Buttons.TJA_TOGGLE else 0, } return self.packer.make_can_msg_panda("Steering_Data_FD1", bus, values)
[docs] def test_rx_hook(self): # checksum, counter, and quality flag checks for quality_flag in [True, False]: for msg in ["speed", "speed_2", "yaw"]: self.safety.set_controls_allowed(True) # send multiple times to verify counter checks for _ in range(10): if msg == "speed": to_push = self._speed_msg(0, quality_flag=quality_flag) elif msg == "speed_2": to_push = self._speed_msg_2(0, quality_flag=quality_flag) elif msg == "yaw": to_push = self._yaw_rate_msg(0, 0, quality_flag=quality_flag) self.assertEqual(quality_flag, self._rx(to_push)) self.assertEqual(quality_flag, self.safety.get_controls_allowed()) # Mess with checksum to make it fail, checksum is not checked for 2nd speed to_push[0].data[3] = 0 # Speed checksum & half of yaw signal should_rx = msg == "speed_2" and quality_flag self.assertEqual(should_rx, self._rx(to_push)) self.assertEqual(should_rx, self.safety.get_controls_allowed())
[docs] def test_rx_hook_speed_mismatch(self): # Ford relies on speed for driver curvature limiting, so it checks two sources for speed in np.arange(0, 40, 0.5): for speed_delta in np.arange(-5, 5, 0.1): speed_2 = round(max(speed + speed_delta, 0), 1) # Set controls allowed in between rx since first message can reset it self._rx(self._speed_msg(speed)) self.safety.set_controls_allowed(True) self._rx(self._speed_msg_2(speed_2)) within_delta = abs(speed - speed_2) <= self.MAX_SPEED_DELTA self.assertEqual(self.safety.get_controls_allowed(), within_delta)
[docs] def test_angle_measurements(self): """Tests rx hook correctly parses the curvature measurement from the vehicle speed and yaw rate""" for speed in np.arange(0.5, 40, 0.5): for curvature in np.arange(0, self.MAX_CURVATURE * 2, 2e-3): self._rx(self._speed_msg(speed)) for c in (curvature, -curvature, 0, 0, 0, 0): self._rx(self._yaw_rate_msg(c, speed)) self.assertEqual(self.safety.get_angle_meas_min(), round(-curvature * self.DEG_TO_CAN)) self.assertEqual(self.safety.get_angle_meas_max(), round(curvature * self.DEG_TO_CAN)) self._rx(self._yaw_rate_msg(0, speed)) self.assertEqual(self.safety.get_angle_meas_min(), round(-curvature * self.DEG_TO_CAN)) self.assertEqual(self.safety.get_angle_meas_max(), 0) self._rx(self._yaw_rate_msg(0, speed)) self.assertEqual(self.safety.get_angle_meas_min(), 0) self.assertEqual(self.safety.get_angle_meas_max(), 0)
[docs] def test_steer_allowed(self): path_offsets = np.arange(-5.12, 5.11, 1).round() path_angles = np.arange(-0.5, 0.5235, 0.1).round(1) curvature_rates = np.arange(-0.001024, 0.00102375, 0.001).round(3) curvatures = np.arange(-0.02, 0.02094, 0.01).round(2) for speed in (self.CURVATURE_ERROR_MIN_SPEED - 1, self.CURVATURE_ERROR_MIN_SPEED + 1): for controls_allowed in (True, False): for steer_control_enabled in (True, False): for path_offset in path_offsets: for path_angle in path_angles: for curvature_rate in curvature_rates: for curvature in curvatures: self.safety.set_controls_allowed(controls_allowed) self._set_prev_desired_angle(curvature) self._reset_curvature_measurement(curvature, speed) should_tx = path_offset == 0 and path_angle == 0 and curvature_rate == 0 # when request bit is 0, only allow curvature of 0 since the signal range # is not large enough to enforce it tracking measured should_tx = should_tx and (controls_allowed if steer_control_enabled else curvature == 0) with self.subTest(controls_allowed=controls_allowed, steer_control_enabled=steer_control_enabled, path_offset=path_offset, path_angle=path_angle, curvature_rate=curvature_rate, curvature=curvature): self.assertEqual(should_tx, self._tx(self._lat_ctl_msg(steer_control_enabled, path_offset, path_angle, curvature, curvature_rate)))
[docs] def test_curvature_rate_limit_up(self): """ When the curvature error is exceeded, commanded curvature must start moving towards meas respecting rate limits. Since panda allows higher rate limits to avoid false positives, we need to allow a lower rate to move towards meas. """ self.safety.set_controls_allowed(True) small_curvature = 2 / self.DEG_TO_CAN # significant small amount of curvature to cross boundary for speed in np.arange(0, 40, 0.5): limit_command = speed > self.CURVATURE_ERROR_MIN_SPEED max_delta_up = np.interp(speed, self.ANGLE_RATE_BP, self.ANGLE_RATE_UP) max_delta_up_lower = np.interp(speed + 1, self.ANGLE_RATE_BP, self.ANGLE_RATE_UP) cases = [ (not limit_command, 0), (not limit_command, max_delta_up_lower - small_curvature), (True, max_delta_up_lower), (True, max_delta_up), (False, max_delta_up + small_curvature), ] for sign in (-1, 1): self._reset_curvature_measurement(sign * (self.MAX_CURVATURE_ERROR + 1e-3), speed) for should_tx, curvature in cases: self._set_prev_desired_angle(sign * small_curvature) self.assertEqual(should_tx, self._tx(self._lat_ctl_msg(True, 0, 0, sign * (small_curvature + curvature), 0)))
[docs] def test_curvature_rate_limit_down(self): self.safety.set_controls_allowed(True) small_curvature = 2 / self.DEG_TO_CAN # significant small amount of curvature to cross boundary for speed in np.arange(0, 40, 0.5): limit_command = speed > self.CURVATURE_ERROR_MIN_SPEED max_delta_down = np.interp(speed, self.ANGLE_RATE_BP, self.ANGLE_RATE_DOWN) max_delta_down_lower = np.interp(speed + 1, self.ANGLE_RATE_BP, self.ANGLE_RATE_DOWN) cases = [ (not limit_command, self.MAX_CURVATURE), (not limit_command, self.MAX_CURVATURE - max_delta_down_lower + small_curvature), (True, self.MAX_CURVATURE - max_delta_down_lower), (True, self.MAX_CURVATURE - max_delta_down), (False, self.MAX_CURVATURE - max_delta_down - small_curvature), ] for sign in (-1, 1): self._reset_curvature_measurement(sign * (self.MAX_CURVATURE - self.MAX_CURVATURE_ERROR - 1e-3), speed) for should_tx, curvature in cases: self._set_prev_desired_angle(sign * self.MAX_CURVATURE) self.assertEqual(should_tx, self._tx(self._lat_ctl_msg(True, 0, 0, sign * curvature, 0)))
[docs] def test_prevent_lkas_action(self): self.safety.set_controls_allowed(1) self.assertFalse(self._tx(self._lkas_command_msg(1))) self.safety.set_controls_allowed(0) self.assertFalse(self._tx(self._lkas_command_msg(1)))
[docs] def test_acc_buttons(self): for allowed in (0, 1): self.safety.set_controls_allowed(allowed) for enabled in (True, False): self._rx(self._pcm_status_msg(enabled)) self.assertTrue(self._tx(self._acc_button_msg(Buttons.TJA_TOGGLE, 2))) for allowed in (0, 1): self.safety.set_controls_allowed(allowed) for bus in (0, 2): self.assertEqual(allowed, self._tx(self._acc_button_msg(Buttons.RESUME, bus))) for enabled in (True, False): self._rx(self._pcm_status_msg(enabled)) for bus in (0, 2): self.assertEqual(enabled, self._tx(self._acc_button_msg(Buttons.CANCEL, bus)))
[docs] class TestFordStockSafety(TestFordSafetyBase): STEER_MESSAGE = MSG_LateralMotionControl TX_MSGS = [ [MSG_Steering_Data_FD1, 0], [MSG_Steering_Data_FD1, 2], [MSG_ACCDATA_3, 0], [MSG_Lane_Assist_Data1, 0], [MSG_LateralMotionControl, 0], [MSG_IPMA_Data, 0], ]
[docs] def setUp(self): self.packer = CANPackerPanda("ford_lincoln_base_pt") self.safety = libpanda_py.libpanda self.safety.set_safety_hooks(Panda.SAFETY_FORD, 0) self.safety.init_tests()
[docs] class TestFordCANFDStockSafety(TestFordSafetyBase): STEER_MESSAGE = MSG_LateralMotionControl2 TX_MSGS = [ [MSG_Steering_Data_FD1, 0], [MSG_Steering_Data_FD1, 2], [MSG_ACCDATA_3, 0], [MSG_Lane_Assist_Data1, 0], [MSG_LateralMotionControl2, 0], [MSG_IPMA_Data, 0], ]
[docs] def setUp(self): self.packer = CANPackerPanda("ford_lincoln_base_pt") self.safety = libpanda_py.libpanda self.safety.set_safety_hooks(Panda.SAFETY_FORD, Panda.FLAG_FORD_CANFD) self.safety.init_tests()
[docs] class TestFordLongitudinalSafetyBase(TestFordSafetyBase): RELAY_MALFUNCTION_ADDRS = {0: (MSG_ACCDATA, MSG_ACCDATA_3, MSG_Lane_Assist_Data1, MSG_LateralMotionControl, MSG_LateralMotionControl2, MSG_IPMA_Data)} FWD_BLACKLISTED_ADDRS = {2: [MSG_ACCDATA, MSG_ACCDATA_3, MSG_Lane_Assist_Data1, MSG_LateralMotionControl, MSG_LateralMotionControl2, MSG_IPMA_Data]} MAX_ACCEL = 2.0 # accel is used for brakes, but openpilot can set positive values MIN_ACCEL = -3.5 INACTIVE_ACCEL = 0.0 MAX_GAS = 2.0 MIN_GAS = -0.5 INACTIVE_GAS = -5.0
[docs] @classmethod def setUpClass(cls): if cls.__name__ == "TestFordLongitudinalSafetyBase": raise unittest.SkipTest
# ACC command def _acc_command_msg(self, gas: float, brake: float, cmbb_deny: bool = False): values = { "AccPrpl_A_Rq": gas, # [-5|5.23] m/s^2 "AccPrpl_A_Pred": gas, # [-5|5.23] m/s^2 "AccBrkTot_A_Rq": brake, # [-20|11.9449] m/s^2 "CmbbDeny_B_Actl": 1 if cmbb_deny else 0, # [0|1] deny AEB actuation } return self.packer.make_can_msg_panda("ACCDATA", 0, values)
[docs] def test_stock_aeb(self): # Test that CmbbDeny_B_Actl is never 1, it prevents the ABS module from actuating AEB requests from ACCDATA_2 for controls_allowed in (True, False): self.safety.set_controls_allowed(controls_allowed) for cmbb_deny in (True, False): should_tx = not cmbb_deny self.assertEqual(should_tx, self._tx(self._acc_command_msg(self.INACTIVE_GAS, self.INACTIVE_ACCEL, cmbb_deny))) should_tx = controls_allowed and not cmbb_deny self.assertEqual(should_tx, self._tx(self._acc_command_msg(self.MAX_GAS, self.MAX_ACCEL, cmbb_deny)))
[docs] def test_gas_safety_check(self): for controls_allowed in (True, False): self.safety.set_controls_allowed(controls_allowed) for gas in np.concatenate((np.arange(self.MIN_GAS - 2, self.MAX_GAS + 2, 0.05), [self.INACTIVE_GAS])): gas = round(gas, 2) # floats might not hit exact boundary conditions without rounding should_tx = (controls_allowed and self.MIN_GAS <= gas <= self.MAX_GAS) or gas == self.INACTIVE_GAS self.assertEqual(should_tx, self._tx(self._acc_command_msg(gas, self.INACTIVE_ACCEL)))
[docs] def test_brake_safety_check(self): for controls_allowed in (True, False): self.safety.set_controls_allowed(controls_allowed) for brake in np.arange(self.MIN_ACCEL - 2, self.MAX_ACCEL + 2, 0.05): brake = round(brake, 2) # floats might not hit exact boundary conditions without rounding should_tx = (controls_allowed and self.MIN_ACCEL <= brake <= self.MAX_ACCEL) or brake == self.INACTIVE_ACCEL self.assertEqual(should_tx, self._tx(self._acc_command_msg(self.INACTIVE_GAS, brake)))
[docs] class TestFordLongitudinalSafety(TestFordLongitudinalSafetyBase): STEER_MESSAGE = MSG_LateralMotionControl TX_MSGS = [ [MSG_Steering_Data_FD1, 0], [MSG_Steering_Data_FD1, 2], [MSG_ACCDATA, 0], [MSG_ACCDATA_3, 0], [MSG_Lane_Assist_Data1, 0], [MSG_LateralMotionControl, 0], [MSG_IPMA_Data, 0], ]
[docs] def setUp(self): self.packer = CANPackerPanda("ford_lincoln_base_pt") self.safety = libpanda_py.libpanda self.safety.set_safety_hooks(Panda.SAFETY_FORD, Panda.FLAG_FORD_LONG_CONTROL) self.safety.init_tests()
[docs] class TestFordCANFDLongitudinalSafety(TestFordLongitudinalSafetyBase): STEER_MESSAGE = MSG_LateralMotionControl2 TX_MSGS = [ [MSG_Steering_Data_FD1, 0], [MSG_Steering_Data_FD1, 2], [MSG_ACCDATA, 0], [MSG_ACCDATA_3, 0], [MSG_Lane_Assist_Data1, 0], [MSG_LateralMotionControl2, 0], [MSG_IPMA_Data, 0], ]
[docs] def setUp(self): self.packer = CANPackerPanda("ford_lincoln_base_pt") self.safety = libpanda_py.libpanda self.safety.set_safety_hooks(Panda.SAFETY_FORD, Panda.FLAG_FORD_LONG_CONTROL | Panda.FLAG_FORD_CANFD) self.safety.init_tests()
if __name__ == "__main__": unittest.main()