Source code for selfdrive.car.hyundai.radar_interface

import math

from cereal import car
from opendbc.can.parser import CANParser
from openpilot.selfdrive.car.interfaces import RadarInterfaceBase
from openpilot.selfdrive.car.hyundai.values import DBC

RADAR_START_ADDR = 0x500
RADAR_MSG_COUNT = 32

# POC for parsing corner radars: https://github.com/commaai/openpilot/pull/24221/

[docs] def get_radar_can_parser(CP): if DBC[CP.carFingerprint]['radar'] is None: return None messages = [(f"RADAR_TRACK_{addr:x}", 50) for addr in range(RADAR_START_ADDR, RADAR_START_ADDR + RADAR_MSG_COUNT)] return CANParser(DBC[CP.carFingerprint]['radar'], messages, 1)
[docs] class RadarInterface(RadarInterfaceBase): def __init__(self, CP): super().__init__(CP) self.updated_messages = set() self.trigger_msg = RADAR_START_ADDR + RADAR_MSG_COUNT - 1 self.track_id = 0 self.radar_off_can = CP.radarUnavailable self.rcp = get_radar_can_parser(CP)
[docs] def update(self, can_strings): if self.radar_off_can or (self.rcp is None): return super().update(None) vls = self.rcp.update_strings(can_strings) self.updated_messages.update(vls) if self.trigger_msg not in self.updated_messages: return None rr = self._update(self.updated_messages) self.updated_messages.clear() return rr
def _update(self, updated_messages): ret = car.RadarData.new_message() if self.rcp is None: return ret errors = [] if not self.rcp.can_valid: errors.append("canError") ret.errors = errors for addr in range(RADAR_START_ADDR, RADAR_START_ADDR + RADAR_MSG_COUNT): msg = self.rcp.vl[f"RADAR_TRACK_{addr:x}"] if addr not in self.pts: self.pts[addr] = car.RadarData.RadarPoint.new_message() self.pts[addr].trackId = self.track_id self.track_id += 1 valid = msg['STATE'] in (3, 4) if valid: azimuth = math.radians(msg['AZIMUTH']) self.pts[addr].measured = True self.pts[addr].dRel = math.cos(azimuth) * msg['LONG_DIST'] self.pts[addr].yRel = 0.5 * -math.sin(azimuth) * msg['LONG_DIST'] self.pts[addr].vRel = msg['REL_SPEED'] self.pts[addr].aRel = msg['REL_ACCEL'] self.pts[addr].yvRel = float('nan') else: del self.pts[addr] ret.points = list(self.pts.values()) return ret