Source code for selfdrive.test.test_valgrind_replay

#!/usr/bin/env python3
import os
import threading
import time
import unittest
import subprocess
import signal

if "CI" in os.environ:
  def tqdm(x):
    return x
  from tqdm import tqdm   # type: ignore

import cereal.messaging as messaging
from collections import namedtuple
from import LogReader
from import get_url
from openpilot.common.basedir import BASEDIR

ProcessConfig = namedtuple('ProcessConfig', ['proc_name', 'pub_sub', 'ignore', 'command', 'path', 'segment', 'wait_for_response'])

      "ubloxRaw": ["ubloxGnss", "gpsLocationExternal"],

[docs] class TestValgrind(unittest.TestCase):
[docs] def extract_leak_sizes(self, log): if "All heap blocks were freed -- no leaks are possible" in log: return (0,0,0) log = log.replace(",","") # fixes casting to int issue with large leaks err_lost1 = log.split("definitely lost: ")[1] err_lost2 = log.split("indirectly lost: ")[1] err_lost3 = log.split("possibly lost: ")[1] definitely_lost = int(err_lost1.split(" ")[0]) indirectly_lost = int(err_lost2.split(" ")[0]) possibly_lost = int(err_lost3.split(" ")[0]) return (definitely_lost, indirectly_lost, possibly_lost)
[docs] def valgrindlauncher(self, arg, cwd): os.chdir(os.path.join(BASEDIR, cwd)) # Run valgrind on a process command = "valgrind --leak-check=full " + arg p = subprocess.Popen(command, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setsid) while not self.replay_done: time.sleep(0.1) # Kill valgrind and extract leak output os.killpg(os.getpgid(, signal.SIGINT) _, err = p.communicate() error_msg = str(err, encoding='utf-8') with open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "a") as f: f.write(error_msg) f.write(5 * "\n") definitely_lost, indirectly_lost, possibly_lost = self.extract_leak_sizes(error_msg) if max(definitely_lost, indirectly_lost, possibly_lost) > 0: self.leak = True print("LEAKS from", arg, "\nDefinitely lost:", definitely_lost, "\nIndirectly lost", indirectly_lost, "\nPossibly lost", possibly_lost) else: self.leak = False
[docs] def replay_process(self, config, logreader): pub_sockets = list(config.pub_sub.keys()) # We dump data from logs here sub_sockets = [s for _, sub in config.pub_sub.items() for s in sub] # We get responses here pm = messaging.PubMaster(pub_sockets) sm = messaging.SubMaster(sub_sockets) print("Sorting logs") all_msgs = sorted(logreader, key=lambda msg: msg.logMonoTime) pub_msgs = [msg for msg in all_msgs if msg.which() in list(config.pub_sub.keys())] thread = threading.Thread(target=self.valgrindlauncher, args=(config.command, config.path)) thread.daemon = True thread.start() while not all(pm.all_readers_updated(s) for s in config.pub_sub.keys()): time.sleep(0) for msg in tqdm(pub_msgs): pm.send(msg.which(), msg.as_builder()) if config.wait_for_response: sm.update(100) self.replay_done = True
[docs] def test_config(self): open(os.path.join(BASEDIR, "selfdrive/test/valgrind_logs.txt"), "w").close() for cfg in CONFIGS: self.leak = None self.replay_done = False r, n = cfg.segment.rsplit("--", 1) lr = LogReader(get_url(r, n)) self.replay_process(cfg, lr) while self.leak is None: time.sleep(0.1) # Wait for the valgrind to finish self.assertFalse(self.leak)
if __name__ == "__main__": unittest.main()