Source code for selfdrive.ui.soundd

import math
import numpy as np
import time
import wave


from cereal import car, messaging
from openpilot.common.basedir import BASEDIR
from openpilot.common.filter_simple import FirstOrderFilter
from openpilot.common.realtime import Ratekeeper
from openpilot.common.retry import retry
from openpilot.common.swaglog import cloudlog

from openpilot.system import micd

SAMPLE_RATE = 48000
SAMPLE_BUFFER = 4096 # (approx 100ms)
MAX_VOLUME = 1.0
MIN_VOLUME = 0.1
CONTROLS_TIMEOUT = 5 # 5 seconds
FILTER_DT = 1. / (micd.SAMPLE_RATE / micd.FFT_SAMPLES)

AMBIENT_DB = 30 # DB where MIN_VOLUME is applied
DB_SCALE = 30 # AMBIENT_DB + DB_SCALE is where MAX_VOLUME is applied

AudibleAlert = car.CarControl.HUDControl.AudibleAlert


sound_list: dict[int, tuple[str, int | None, float]] = {
  # AudibleAlert, file name, play count (none for infinite)
  AudibleAlert.engage: ("engage.wav", 1, MAX_VOLUME),
  AudibleAlert.disengage: ("disengage.wav", 1, MAX_VOLUME),
  AudibleAlert.refuse: ("refuse.wav", 1, MAX_VOLUME),

  AudibleAlert.prompt: ("prompt.wav", 1, MAX_VOLUME),
  AudibleAlert.promptRepeat: ("prompt.wav", None, MAX_VOLUME),
  AudibleAlert.promptDistracted: ("prompt_distracted.wav", None, MAX_VOLUME),

  AudibleAlert.warningSoft: ("warning_soft.wav", None, MAX_VOLUME),
  AudibleAlert.warningImmediate: ("warning_immediate.wav", None, MAX_VOLUME),
}

[docs] def check_controls_timeout_alert(sm): controls_missing = time.monotonic() - sm.recv_time['controlsState'] if controls_missing > CONTROLS_TIMEOUT: if sm['controlsState'].enabled and (controls_missing - CONTROLS_TIMEOUT) < 10: return True return False
[docs] class Soundd: def __init__(self): self.load_sounds() self.current_alert = AudibleAlert.none self.current_volume = MIN_VOLUME self.current_sound_frame = 0 self.controls_timeout_alert = False self.spl_filter_weighted = FirstOrderFilter(0, 2.5, FILTER_DT, initialized=False)
[docs] def load_sounds(self): self.loaded_sounds: dict[int, np.ndarray] = {} # Load all sounds for sound in sound_list: filename, play_count, volume = sound_list[sound] wavefile = wave.open(BASEDIR + "/selfdrive/assets/sounds/" + filename, 'r') assert wavefile.getnchannels() == 1 assert wavefile.getsampwidth() == 2 assert wavefile.getframerate() == SAMPLE_RATE length = wavefile.getnframes() self.loaded_sounds[sound] = np.frombuffer(wavefile.readframes(length), dtype=np.int16).astype(np.float32) / (2**16/2)
[docs] def get_sound_data(self, frames): # get "frames" worth of data from the current alert sound, looping when required ret = np.zeros(frames, dtype=np.float32) if self.current_alert != AudibleAlert.none: num_loops = sound_list[self.current_alert][1] sound_data = self.loaded_sounds[self.current_alert] written_frames = 0 current_sound_frame = self.current_sound_frame % len(sound_data) loops = self.current_sound_frame // len(sound_data) while written_frames < frames and (num_loops is None or loops < num_loops): available_frames = sound_data.shape[0] - current_sound_frame frames_to_write = min(available_frames, frames - written_frames) ret[written_frames:written_frames+frames_to_write] = sound_data[current_sound_frame:current_sound_frame+frames_to_write] written_frames += frames_to_write self.current_sound_frame += frames_to_write return ret * self.current_volume
[docs] def callback(self, data_out: np.ndarray, frames: int, time, status) -> None: if status: cloudlog.warning(f"soundd stream over/underflow: {status}") data_out[:frames, 0] = self.get_sound_data(frames)
[docs] def update_alert(self, new_alert): current_alert_played_once = self.current_alert == AudibleAlert.none or self.current_sound_frame > len(self.loaded_sounds[self.current_alert]) if self.current_alert != new_alert and (new_alert != AudibleAlert.none or current_alert_played_once): self.current_alert = new_alert self.current_sound_frame = 0
[docs] def get_audible_alert(self, sm): if sm.updated['controlsState']: new_alert = sm['controlsState'].alertSound.raw self.update_alert(new_alert) elif check_controls_timeout_alert(sm): self.update_alert(AudibleAlert.warningImmediate) self.controls_timeout_alert = True elif self.controls_timeout_alert: self.update_alert(AudibleAlert.none) self.controls_timeout_alert = False
[docs] def calculate_volume(self, weighted_db): volume = ((weighted_db - AMBIENT_DB) / DB_SCALE) * (MAX_VOLUME - MIN_VOLUME) + MIN_VOLUME return math.pow(10, (np.clip(volume, MIN_VOLUME, MAX_VOLUME) - 1))
[docs] @retry(attempts=7, delay=3) def get_stream(self, sd): # reload sounddevice to reinitialize portaudio sd._terminate() sd._initialize() return sd.OutputStream(channels=1, samplerate=SAMPLE_RATE, callback=self.callback, blocksize=SAMPLE_BUFFER)
[docs] def soundd_thread(self): # sounddevice must be imported after forking processes import sounddevice as sd sm = messaging.SubMaster(['controlsState', 'microphone']) with self.get_stream(sd) as stream: rk = Ratekeeper(20) cloudlog.info(f"soundd stream started: {stream.samplerate=} {stream.channels=} {stream.dtype=} {stream.device=}, {stream.blocksize=}") while True: sm.update(0) if sm.updated['microphone'] and self.current_alert == AudibleAlert.none: # only update volume filter when not playing alert self.spl_filter_weighted.update(sm["microphone"].soundPressureWeightedDb) self.current_volume = self.calculate_volume(float(self.spl_filter_weighted.x)) self.get_audible_alert(sm) rk.keep_time() assert stream.active
[docs] def main(): s = Soundd() s.soundd_thread()
if __name__ == "__main__": main()