Source code for selfdrive.car.tesla.radar_interface

#!/usr/bin/env python3
from cereal import car
from opendbc.can.parser import CANParser
from openpilot.selfdrive.car.tesla.values import CAR, DBC, CANBUS
from openpilot.selfdrive.car.interfaces import RadarInterfaceBase


[docs] class RadarInterface(RadarInterfaceBase): def __init__(self, CP): super().__init__(CP) self.CP = CP if CP.carFingerprint == CAR.TESLA_MODELS_RAVEN: messages = [('RadarStatus', 16)] self.num_points = 40 self.trigger_msg = 1119 else: messages = [('TeslaRadarSguInfo', 10)] self.num_points = 32 self.trigger_msg = 878 for i in range(self.num_points): messages.extend([ (f'RadarPoint{i}_A', 16), (f'RadarPoint{i}_B', 16), ]) self.rcp = CANParser(DBC[CP.carFingerprint]['radar'], messages, CANBUS.radar) self.updated_messages = set() self.track_id = 0
[docs] def update(self, can_strings): if self.rcp is None: return super().update(None) values = self.rcp.update_strings(can_strings) self.updated_messages.update(values) if self.trigger_msg not in self.updated_messages: return None ret = car.RadarData.new_message() # Errors errors = [] if not self.rcp.can_valid: errors.append('canError') if self.CP.carFingerprint == CAR.TESLA_MODELS_RAVEN: radar_status = self.rcp.vl['RadarStatus'] if radar_status['sensorBlocked'] or radar_status['shortTermUnavailable'] or radar_status['vehDynamicsError']: errors.append('fault') else: radar_status = self.rcp.vl['TeslaRadarSguInfo'] if radar_status['RADC_HWFail'] or radar_status['RADC_SGUFail'] or radar_status['RADC_SensorDirty']: errors.append('fault') ret.errors = errors # Radar tracks for i in range(self.num_points): msg_a = self.rcp.vl[f'RadarPoint{i}_A'] msg_b = self.rcp.vl[f'RadarPoint{i}_B'] # Make sure msg A and B are together if msg_a['Index'] != msg_b['Index2']: continue # Check if it's a valid track if not msg_a['Tracked']: if i in self.pts: del self.pts[i] continue # New track! if i not in self.pts: self.pts[i] = car.RadarData.RadarPoint.new_message() self.pts[i].trackId = self.track_id self.track_id += 1 # Parse track data self.pts[i].dRel = msg_a['LongDist'] self.pts[i].yRel = msg_a['LatDist'] self.pts[i].vRel = msg_a['LongSpeed'] self.pts[i].aRel = msg_a['LongAccel'] self.pts[i].yvRel = msg_b['LatSpeed'] self.pts[i].measured = bool(msg_a['Meas']) ret.points = list(self.pts.values()) self.updated_messages.clear() return ret