Source code for system.timed

#!/usr/bin/env python3
import datetime
import os
import subprocess
import time
from typing import NoReturn

from timezonefinder import TimezoneFinder

import cereal.messaging as messaging
from openpilot.common.time import system_time_valid
from openpilot.common.params import Params
from openpilot.common.swaglog import cloudlog
from openpilot.system.hardware import AGNOS


[docs] def set_timezone(timezone): valid_timezones = subprocess.check_output('timedatectl list-timezones', shell=True, encoding='utf8').strip().split('\n') if timezone not in valid_timezones: cloudlog.error(f"Timezone not supported {timezone}") return cloudlog.debug(f"Setting timezone to {timezone}") try: if AGNOS: tzpath = os.path.join("/usr/share/zoneinfo/", timezone) subprocess.check_call(f'sudo su -c "ln -snf {tzpath} /data/etc/tmptime && \ mv /data/etc/tmptime /data/etc/localtime"', shell=True) subprocess.check_call(f'sudo su -c "echo \"{timezone}\" > /data/etc/timezone"', shell=True) else: subprocess.check_call(f'sudo timedatectl set-timezone {timezone}', shell=True) except subprocess.CalledProcessError: cloudlog.exception(f"Error setting timezone to {timezone}")
[docs] def set_time(new_time): diff = datetime.datetime.now() - new_time if diff < datetime.timedelta(seconds=10): cloudlog.debug(f"Time diff too small: {diff}") return cloudlog.debug(f"Setting time to {new_time}") try: subprocess.run(f"TZ=UTC date -s '{new_time}'", shell=True, check=True) except subprocess.CalledProcessError: cloudlog.exception("timed.failed_setting_time")
[docs] def main() -> NoReturn: """ timed has two responsibilities: - getting the current time - getting the current timezone GPS directly gives time, and timezone is looked up from GPS position. AGNOS will also use NTP to update the time. """ params = Params() # Restore timezone from param tz = params.get("Timezone", encoding='utf8') tf = TimezoneFinder() if tz is not None: cloudlog.debug("Restoring timezone from param") set_timezone(tz) pm = messaging.PubMaster(['clocks']) sm = messaging.SubMaster(['liveLocationKalman']) while True: sm.update(1000) msg = messaging.new_message('clocks') msg.valid = system_time_valid() msg.clocks.wallTimeNanos = time.time_ns() pm.send('clocks', msg) llk = sm['liveLocationKalman'] if not llk.gpsOK or (time.monotonic() - sm.logMonoTime['liveLocationKalman']/1e9) > 0.2: continue # set time # TODO: account for unixTimesatmpMillis being a (usually short) time in the past gps_time = datetime.datetime.fromtimestamp(llk.unixTimestampMillis / 1000.) set_time(gps_time) # set timezone pos = llk.positionGeodetic.value if len(pos) == 3: gps_timezone = tf.timezone_at(lat=pos[0], lng=pos[1]) if gps_timezone is None: cloudlog.critical(f"No timezone found based on {pos=}") else: set_timezone(gps_timezone) params.put_nonblocking("Timezone", gps_timezone) time.sleep(10)
if __name__ == "__main__": main()