Source code for selfdrive.locationd.laikad

#!/usr/bin/env python3
import json
import math
import os
import time
from collections import defaultdict
from concurrent.futures import Future, ProcessPoolExecutor
from datetime import datetime
from enum import IntEnum
from typing import List, Optional

import numpy as np

from cereal import log, messaging
from common.params import Params, put_nonblocking
from laika import AstroDog
from laika.constants import SECS_IN_HR, SECS_IN_MIN
from laika.downloader import DownloadFailed
from laika.ephemeris import Ephemeris, EphemerisType, convert_ublox_ephem
from laika.gps_time import GPSTime
from laika.helpers import ConstellationId
from laika.raw_gnss import GNSSMeasurement, correct_measurements, process_measurements, read_raw_ublox, read_raw_qcom
from selfdrive.locationd.laikad_helpers import calc_pos_fix_gauss_newton, get_posfix_sympy_fun
from selfdrive.locationd.models.constants import GENERATED_DIR, ObservationKind
from selfdrive.locationd.models.gnss_kf import GNSSKalman
from selfdrive.locationd.models.gnss_kf import States as GStates
from system.swaglog import cloudlog

MAX_TIME_GAP = 10
EPHEMERIS_CACHE = 'LaikadEphemeris'
DOWNLOADS_CACHE_FOLDER = "/tmp/comma_download_cache/"
CACHE_VERSION = 0.1
POS_FIX_RESIDUAL_THRESHOLD = 100.0


[docs]class Laikad: def __init__(self, valid_const=("GPS", "GLONASS"), auto_fetch_orbits=True, auto_update=False, valid_ephem_types=(EphemerisType.ULTRA_RAPID_ORBIT, EphemerisType.NAV), save_ephemeris=False, use_qcom=False): """ valid_const: GNSS constellation which can be used auto_fetch_orbits: If true fetch orbits from internet when needed auto_update: If true download AstroDog will download all files needed. This can be ephemeris or correction data like ionosphere. valid_ephem_types: Valid ephemeris types to be used by AstroDog save_ephemeris: If true saves and loads nav and orbit ephemeris to cache. """ self.astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, clear_old_ephemeris=True, cache_dir=DOWNLOADS_CACHE_FOLDER) self.gnss_kf = GNSSKalman(GENERATED_DIR, cython=True, erratic_clock=use_qcom) self.auto_fetch_orbits = auto_fetch_orbits self.orbit_fetch_executor: Optional[ProcessPoolExecutor] = None self.orbit_fetch_future: Optional[Future] = None self.last_fetch_orbits_t = None self.got_first_gnss_msg = False self.last_cached_t = None self.save_ephemeris = save_ephemeris self.load_cache() self.posfix_functions = {constellation: get_posfix_sympy_fun(constellation) for constellation in (ConstellationId.GPS, ConstellationId.GLONASS)} self.last_pos_fix = [] self.last_pos_residual = [] self.last_pos_fix_t = None self.use_qcom = use_qcom
[docs] def load_cache(self): if not self.save_ephemeris: return cache = Params().get(EPHEMERIS_CACHE) if not cache: return try: cache = json.loads(cache, object_hook=deserialize_hook) self.astro_dog.add_orbits(cache['orbits']) self.astro_dog.add_navs(cache['nav']) self.last_fetch_orbits_t = cache['last_fetch_orbits_t'] except json.decoder.JSONDecodeError: cloudlog.exception("Error parsing cache") timestamp = self.last_fetch_orbits_t.as_datetime() if self.last_fetch_orbits_t is not None else 'Nan' cloudlog.debug( f"Loaded nav ({sum([len(v) for v in cache['nav']])}) and orbits ({sum([len(v) for v in cache['orbits']])}) cache with timestamp: {timestamp}. Unique orbit and nav sats: {list(cache['orbits'].keys())} {list(cache['nav'].keys())} " + f"With time range: {[f'{start.as_datetime()}, {end.as_datetime()}' for (start,end) in self.astro_dog.orbit_fetched_times._ranges]}")
[docs] def cache_ephemeris(self, t: GPSTime): if self.save_ephemeris and (self.last_cached_t is None or t - self.last_cached_t > SECS_IN_MIN): put_nonblocking(EPHEMERIS_CACHE, json.dumps( {'version': CACHE_VERSION, 'last_fetch_orbits_t': self.last_fetch_orbits_t, 'orbits': self.astro_dog.orbits, 'nav': self.astro_dog.nav}, cls=CacheSerializer)) cloudlog.debug("Cache saved") self.last_cached_t = t
[docs] def get_est_pos(self, t, processed_measurements): if self.last_pos_fix_t is None or abs(self.last_pos_fix_t - t) >= 2: min_measurements = 6 if any(p.constellation_id == ConstellationId.GLONASS for p in processed_measurements) else 5 pos_fix, pos_fix_residual = calc_pos_fix_gauss_newton(processed_measurements, self.posfix_functions, min_measurements=min_measurements) if len(pos_fix) > 0: self.last_pos_fix_t = t residual_median = np.median(np.abs(pos_fix_residual)) if np.median(np.abs(pos_fix_residual)) < POS_FIX_RESIDUAL_THRESHOLD: cloudlog.debug(f"Pos fix is within threshold with median: {residual_median.round()}") self.last_pos_fix = pos_fix[:3] self.last_pos_residual = pos_fix_residual else: cloudlog.debug(f"Pos fix failed with median: {residual_median.round()}. All residuals: {np.round(pos_fix_residual)}") return self.last_pos_fix
[docs] def is_good_report(self, gnss_msg): if gnss_msg.which == 'drMeasurementReport' and self.use_qcom: constellation_id = ConstellationId.from_qcom_source(gnss_msg.drMeasurementReport.source) # TODO support GLONASS return constellation_id in [ConstellationId.GPS, ConstellationId.SBAS] elif gnss_msg.which == 'measurementReport' and not self.use_qcom: return True else: return False
[docs] def read_report(self, gnss_msg): if self.use_qcom: report = gnss_msg.drMeasurementReport week = report.gpsWeek tow = report.gpsMilliseconds / 1000.0 new_meas = read_raw_qcom(report) else: report = gnss_msg.measurementReport week = report.gpsWeek tow = report.rcvTow new_meas = read_raw_ublox(report) return week, tow, new_meas
[docs] def process_gnss_msg(self, gnss_msg, gnss_mono_time: int, block=False): if self.is_good_report(gnss_msg): week, tow, new_meas = self.read_report(gnss_msg) t = gnss_mono_time * 1e-9 if week > 0: self.got_first_gnss_msg = True latest_msg_t = GPSTime(week, tow) if self.auto_fetch_orbits: self.fetch_orbits(latest_msg_t, block) # Filter measurements with unexpected pseudoranges for GPS and GLONASS satellites new_meas = [m for m in new_meas if 1e7 < m.observables['C1C'] < 3e7] processed_measurements = process_measurements(new_meas, self.astro_dog) est_pos = self.get_est_pos(t, processed_measurements) corrected_measurements = correct_measurements(processed_measurements, est_pos, self.astro_dog) if len(est_pos) > 0 else [] if gnss_mono_time % 10 == 0: cloudlog.debug(f"Measurements Incoming/Processed/Corrected: {len(new_meas), len(processed_measurements), len(corrected_measurements)}") self.update_localizer(est_pos, t, corrected_measurements) kf_valid = all(self.kf_valid(t)) ecef_pos = self.gnss_kf.x[GStates.ECEF_POS] ecef_vel = self.gnss_kf.x[GStates.ECEF_VELOCITY] p = self.gnss_kf.P.diagonal() pos_std = np.sqrt(p[GStates.ECEF_POS]) vel_std = np.sqrt(p[GStates.ECEF_VELOCITY]) meas_msgs = [create_measurement_msg(m) for m in corrected_measurements] dat = messaging.new_message("gnssMeasurements") measurement_msg = log.LiveLocationKalman.Measurement.new_message dat.gnssMeasurements = { "gpsWeek": week, "gpsTimeOfWeek": tow, "positionECEF": measurement_msg(value=ecef_pos.tolist(), std=pos_std.tolist(), valid=kf_valid), "velocityECEF": measurement_msg(value=ecef_vel.tolist(), std=vel_std.tolist(), valid=kf_valid), "positionFixECEF": measurement_msg(value=self.last_pos_fix, std=self.last_pos_residual, valid=self.last_pos_fix_t == t), "ubloxMonoTime": gnss_mono_time, "correctedMeasurements": meas_msgs } return dat # TODO this only works on GLONASS, qcom needs live ephemeris parsing too elif gnss_msg.which == 'ephemeris': ephem = convert_ublox_ephem(gnss_msg.ephemeris) self.astro_dog.add_navs({ephem.prn: [ephem]}) self.cache_ephemeris(t=ephem.epoch)
#elif gnss_msg.which == 'ionoData': # todo add this. Needed to better correct messages offline. First fix ublox_msg.cc to sent them.
[docs] def update_localizer(self, est_pos, t: float, measurements: List[GNSSMeasurement]): # Check time and outputs are valid valid = self.kf_valid(t) if not all(valid): if not valid[0]: # Filter not initialized pass elif not valid[1]: cloudlog.error("Time gap of over 10s detected, gnss kalman reset") elif not valid[2]: cloudlog.error("Gnss kalman filter state is nan") if len(est_pos) > 0: cloudlog.info(f"Reset kalman filter with {est_pos}") self.init_gnss_localizer(est_pos) else: return if len(measurements) > 0: kf_add_observations(self.gnss_kf, t, measurements) else: # Ensure gnss filter is updated even with no new measurements self.gnss_kf.predict(t)
[docs] def kf_valid(self, t: float) -> List[bool]: filter_time = self.gnss_kf.filter.get_filter_time() return [not math.isnan(filter_time), abs(t - filter_time) < MAX_TIME_GAP, all(np.isfinite(self.gnss_kf.x[GStates.ECEF_POS]))]
[docs] def init_gnss_localizer(self, est_pos): x_initial, p_initial_diag = np.copy(GNSSKalman.x_initial), np.copy(np.diagonal(GNSSKalman.P_initial)) x_initial[GStates.ECEF_POS] = est_pos p_initial_diag[GStates.ECEF_POS] = 1000 ** 2 self.gnss_kf.init_state(x_initial, covs_diag=p_initial_diag)
[docs] def fetch_orbits(self, t: GPSTime, block): # Download new orbits if 1 hour of orbits data left if t + SECS_IN_HR not in self.astro_dog.orbit_fetched_times and (self.last_fetch_orbits_t is None or abs(t - self.last_fetch_orbits_t) > SECS_IN_MIN): astro_dog_vars = self.astro_dog.valid_const, self.astro_dog.auto_update, self.astro_dog.valid_ephem_types, self.astro_dog.cache_dir ret = None if block: # Used for testing purposes ret = get_orbit_data(t, *astro_dog_vars) elif self.orbit_fetch_future is None: self.orbit_fetch_executor = ProcessPoolExecutor(max_workers=1) self.orbit_fetch_future = self.orbit_fetch_executor.submit(get_orbit_data, t, *astro_dog_vars) elif self.orbit_fetch_future.done(): ret = self.orbit_fetch_future.result() self.orbit_fetch_executor = self.orbit_fetch_future = None if ret is not None: if ret[0] is None: self.last_fetch_orbits_t = ret[2] else: self.astro_dog.orbits, self.astro_dog.orbit_fetched_times, self.last_fetch_orbits_t = ret self.cache_ephemeris(t=t)
[docs]def get_orbit_data(t: GPSTime, valid_const, auto_update, valid_ephem_types, cache_dir): astro_dog = AstroDog(valid_const=valid_const, auto_update=auto_update, valid_ephem_types=valid_ephem_types, cache_dir=cache_dir) cloudlog.info(f"Start to download/parse orbits for time {t.as_datetime()}") start_time = time.monotonic() try: astro_dog.get_orbit_data(t, only_predictions=True) cloudlog.info(f"Done parsing orbits. Took {time.monotonic() - start_time:.1f}s") cloudlog.debug(f"Downloaded orbits ({sum([len(v) for v in astro_dog.orbits])}): {list(astro_dog.orbits.keys())}" + f"With time range: {[f'{start.as_datetime()}, {end.as_datetime()}' for (start,end) in astro_dog.orbit_fetched_times._ranges]}") return astro_dog.orbits, astro_dog.orbit_fetched_times, t except (DownloadFailed, RuntimeError, ValueError, IOError) as e: cloudlog.warning(f"No orbit data found or parsing failure: {e}") return None, None, t
[docs]def create_measurement_msg(meas: GNSSMeasurement): c = log.GnssMeasurements.CorrectedMeasurement.new_message() c.constellationId = meas.constellation_id.value c.svId = meas.sv_id c.glonassFrequency = meas.glonass_freq if meas.constellation_id == ConstellationId.GLONASS else 0 c.pseudorange = float(meas.observables_final['C1C']) c.pseudorangeStd = float(meas.observables_std['C1C']) c.pseudorangeRate = float(meas.observables_final['D1C']) c.pseudorangeRateStd = float(meas.observables_std['D1C']) c.satPos = meas.sat_pos_final.tolist() c.satVel = meas.sat_vel.tolist() c.satVel = meas.sat_vel.tolist() ephem = meas.sat_ephemeris assert ephem is not None if ephem.eph_type == EphemerisType.NAV: source_type = EphemerisSourceType.nav week, time_of_week = -1, -1 else: assert ephem.file_epoch is not None week = ephem.file_epoch.week time_of_week = ephem.file_epoch.tow file_src = ephem.file_source if file_src == 'igu': # example nasa: '2214/igu22144_00.sp3.Z' source_type = EphemerisSourceType.nasaUltraRapid elif file_src == 'Sta': # example nasa: '22166/ultra/Stark_1D_22061518.sp3' source_type = EphemerisSourceType.glonassIacUltraRapid else: raise Exception(f"Didn't expect file source {file_src}") c.ephemerisSource.type = source_type.value c.ephemerisSource.gpsWeek = week c.ephemerisSource.gpsTimeOfWeek = int(time_of_week) return c
[docs]def kf_add_observations(gnss_kf: GNSSKalman, t: float, measurements: List[GNSSMeasurement]): ekf_data = defaultdict(list) for m in measurements: m_arr = m.as_array() if m.constellation_id == ConstellationId.GPS: ekf_data[ObservationKind.PSEUDORANGE_GPS].append(m_arr) elif m.constellation_id == ConstellationId.GLONASS: ekf_data[ObservationKind.PSEUDORANGE_GLONASS].append(m_arr) ekf_data[ObservationKind.PSEUDORANGE_RATE_GPS] = ekf_data[ObservationKind.PSEUDORANGE_GPS] ekf_data[ObservationKind.PSEUDORANGE_RATE_GLONASS] = ekf_data[ObservationKind.PSEUDORANGE_GLONASS] for kind, data in ekf_data.items(): if len(data) > 0: gnss_kf.predict_and_observe(t, kind, data)
[docs]class CacheSerializer(json.JSONEncoder):
[docs] def default(self, o): if isinstance(o, Ephemeris): return o.to_json() if isinstance(o, GPSTime): return o.__dict__ if isinstance(o, np.ndarray): return o.tolist() return json.JSONEncoder.default(self, o)
[docs]def deserialize_hook(dct): if 'ephemeris' in dct: return Ephemeris.from_json(dct) if 'week' in dct: return GPSTime(dct['week'], dct['tow']) return dct
[docs]class EphemerisSourceType(IntEnum): nav = 0 nasaUltraRapid = 1 glonassIacUltraRapid = 2
[docs]def main(sm=None, pm=None): use_qcom = os.path.isfile("/persist/comma/use-quectel-rawgps") if use_qcom: raw_gnss_socket = "qcomGnss" else: raw_gnss_socket = "ubloxGnss" if sm is None: sm = messaging.SubMaster([raw_gnss_socket, 'clocks']) if pm is None: pm = messaging.PubMaster(['gnssMeasurements']) replay = "REPLAY" in os.environ use_internet = "LAIKAD_NO_INTERNET" not in os.environ laikad = Laikad(save_ephemeris=not replay, auto_fetch_orbits=use_internet, use_qcom=use_qcom) while True: sm.update() if sm.updated[raw_gnss_socket]: gnss_msg = sm[raw_gnss_socket] msg = laikad.process_gnss_msg(gnss_msg, sm.logMonoTime[raw_gnss_socket], block=replay) if msg is not None: pm.send('gnssMeasurements', msg) if not laikad.got_first_gnss_msg and sm.updated['clocks']: clocks_msg = sm['clocks'] t = GPSTime.from_datetime(datetime.utcfromtimestamp(clocks_msg.wallTimeNanos * 1E-9)) if laikad.auto_fetch_orbits: laikad.fetch_orbits(t, block=replay)
if __name__ == "__main__": main()