Source code for selfdrive.car.gm.radar_interface

#!/usr/bin/env python3
import math
from cereal import car
from openpilot.common.conversions import Conversions as CV
from opendbc.can.parser import CANParser
from openpilot.selfdrive.car.gm.values import DBC, CanBus
from openpilot.selfdrive.car.interfaces import RadarInterfaceBase

RADAR_HEADER_MSG = 1120
SLOT_1_MSG = RADAR_HEADER_MSG + 1
NUM_SLOTS = 20

# Actually it's 0x47f, but can parser only reports
# messages that are present in DBC
LAST_RADAR_MSG = RADAR_HEADER_MSG + NUM_SLOTS


[docs] def create_radar_can_parser(car_fingerprint): # C1A-ARS3-A by Continental radar_targets = list(range(SLOT_1_MSG, SLOT_1_MSG + NUM_SLOTS)) signals = list(zip(['FLRRNumValidTargets', 'FLRRSnsrBlckd', 'FLRRYawRtPlsblityFlt', 'FLRRHWFltPrsntInt', 'FLRRAntTngFltPrsnt', 'FLRRAlgnFltPrsnt', 'FLRRSnstvFltPrsntInt'] + ['TrkRange'] * NUM_SLOTS + ['TrkRangeRate'] * NUM_SLOTS + ['TrkRangeAccel'] * NUM_SLOTS + ['TrkAzimuth'] * NUM_SLOTS + ['TrkWidth'] * NUM_SLOTS + ['TrkObjectID'] * NUM_SLOTS, [RADAR_HEADER_MSG] * 7 + radar_targets * 6, strict=True)) messages = list({(s[1], 14) for s in signals}) return CANParser(DBC[car_fingerprint]['radar'], messages, CanBus.OBSTACLE)
[docs] class RadarInterface(RadarInterfaceBase): def __init__(self, CP): super().__init__(CP) self.rcp = None if CP.radarUnavailable else create_radar_can_parser(CP.carFingerprint) self.trigger_msg = LAST_RADAR_MSG self.updated_messages = set() self.radar_ts = CP.radarTimeStep
[docs] def update(self, can_strings): if self.rcp is None: return super().update(None) vls = self.rcp.update_strings(can_strings) self.updated_messages.update(vls) if self.trigger_msg not in self.updated_messages: return None ret = car.RadarData.new_message() header = self.rcp.vl[RADAR_HEADER_MSG] fault = header['FLRRSnsrBlckd'] or header['FLRRSnstvFltPrsntInt'] or \ header['FLRRYawRtPlsblityFlt'] or header['FLRRHWFltPrsntInt'] or \ header['FLRRAntTngFltPrsnt'] or header['FLRRAlgnFltPrsnt'] errors = [] if not self.rcp.can_valid: errors.append("canError") if fault: errors.append("fault") ret.errors = errors currentTargets = set() num_targets = header['FLRRNumValidTargets'] # Not all radar messages describe targets, # no need to monitor all of the self.rcp.msgs_upd for ii in self.updated_messages: if ii == RADAR_HEADER_MSG: continue if num_targets == 0: break cpt = self.rcp.vl[ii] # Zero distance means it's an empty target slot if cpt['TrkRange'] > 0.0: targetId = cpt['TrkObjectID'] currentTargets.add(targetId) if targetId not in self.pts: self.pts[targetId] = car.RadarData.RadarPoint.new_message() self.pts[targetId].trackId = targetId distance = cpt['TrkRange'] self.pts[targetId].dRel = distance # from front of car # From driver's pov, left is positive self.pts[targetId].yRel = math.sin(cpt['TrkAzimuth'] * CV.DEG_TO_RAD) * distance self.pts[targetId].vRel = cpt['TrkRangeRate'] self.pts[targetId].aRel = float('nan') self.pts[targetId].yvRel = float('nan') for oldTarget in list(self.pts.keys()): if oldTarget not in currentTargets: del self.pts[oldTarget] ret.points = list(self.pts.values()) self.updated_messages.clear() return ret