Source code for tools.sim.tests.test_sim_bridge

import os
import subprocess
import time
import unittest

from multiprocessing import Queue

from cereal import messaging
from openpilot.common.basedir import BASEDIR

SIM_DIR = os.path.join(BASEDIR, "tools/sim")

[docs] class TestSimBridgeBase(unittest.TestCase):
[docs] @classmethod def setUpClass(cls): if cls is TestSimBridgeBase: raise unittest.SkipTest("Don't run this base class, run instead")
[docs] def setUp(self): self.processes = []
[docs] def test_engage(self): # Startup manager and Check processes are running, then engage and verify. p_manager = subprocess.Popen("./", cwd=SIM_DIR) self.processes.append(p_manager) sm = messaging.SubMaster(['controlsState', 'onroadEvents', 'managerState']) q = Queue() bridge = self.create_bridge() p_bridge =, retries=10) self.processes.append(p_bridge) max_time_per_step = 60 # Wait for bridge to startup start_waiting = time.monotonic() while not bridge.started.value and time.monotonic() < start_waiting + max_time_per_step: time.sleep(0.1) self.assertEqual(p_bridge.exitcode, None, f"Bridge process should be running, but exited with code {p_bridge.exitcode}") start_time = time.monotonic() no_car_events_issues_once = False car_event_issues = [] not_running = [] while time.monotonic() < start_time + max_time_per_step: sm.update() not_running = [ for p in sm['managerState'].processes if not p.running and p.shouldBeRunning] car_event_issues = [ for event in sm['onroadEvents'] if any([event.noEntry, event.softDisable, event.immediateDisable])] if sm.all_alive() and len(car_event_issues) == 0 and len(not_running) == 0: no_car_events_issues_once = True break self.assertTrue(no_car_events_issues_once, f"Failed because no messages received, or CarEvents '{car_event_issues}' or processes not running '{not_running}'") start_time = time.monotonic() min_counts_control_active = 100 control_active = 0 while time.monotonic() < start_time + max_time_per_step: sm.update() if sm.all_alive() and sm['controlsState'].active: control_active += 1 if control_active == min_counts_control_active: break self.assertEqual(min_counts_control_active, control_active, f"Simulator did not engage a minimal of {min_counts_control_active} steps was {control_active}")
[docs] def tearDown(self): print("Test shutting down. CommIssues are acceptable") for p in reversed(self.processes): p.terminate() for p in reversed(self.processes): if isinstance(p, subprocess.Popen): p.wait(15) else: p.join(15)
if __name__ == "__main__": unittest.main()