Source code for cereal.messaging.tests.test_pub_sub_master

#!/usr/bin/env python3
import random
import time
from typing import Sized, cast
import unittest

import cereal.messaging as messaging
from cereal.messaging.tests.test_messaging import events, random_sock, random_socks, \
                                                  random_bytes, random_carstate, assert_carstate, \
                                                  zmq_sleep


[docs] class TestSubMaster(unittest.TestCase):
[docs] def setUp(self): # ZMQ pub socket takes too long to die # sleep to prevent multiple publishers error between tests zmq_sleep(3)
[docs] def test_init(self): sm = messaging.SubMaster(events) for p in [sm.updated, sm.recv_time, sm.recv_frame, sm.alive, sm.sock, sm.data, sm.logMonoTime, sm.valid]: self.assertEqual(len(cast(Sized, p)), len(events))
[docs] def test_init_state(self): socks = random_socks() sm = messaging.SubMaster(socks) self.assertEqual(sm.frame, -1) self.assertFalse(any(sm.updated.values())) self.assertFalse(any(sm.alive.values())) self.assertTrue(all(t == 0. for t in sm.recv_time.values())) self.assertTrue(all(f == 0 for f in sm.recv_frame.values())) self.assertTrue(all(t == 0 for t in sm.logMonoTime.values())) for p in [sm.updated, sm.recv_time, sm.recv_frame, sm.alive, sm.sock, sm.data, sm.logMonoTime, sm.valid]: self.assertEqual(len(cast(Sized, p)), len(socks))
[docs] def test_getitem(self): sock = "carState" pub_sock = messaging.pub_sock(sock) sm = messaging.SubMaster([sock,]) zmq_sleep() msg = random_carstate() pub_sock.send(msg.to_bytes()) sm.update(1000) assert_carstate(msg.carState, sm[sock])
# TODO: break this test up to individually test SubMaster.update and SubMaster.update_msgs
[docs] def test_update(self): sock = "carState" pub_sock = messaging.pub_sock(sock) sm = messaging.SubMaster([sock,]) zmq_sleep() for i in range(10): msg = messaging.new_message(sock) pub_sock.send(msg.to_bytes()) sm.update(1000) self.assertEqual(sm.frame, i) self.assertTrue(all(sm.updated.values()))
[docs] def test_update_timeout(self): sock = random_sock() sm = messaging.SubMaster([sock,]) for _ in range(5): timeout = random.randrange(1000, 5000) start_time = time.monotonic() sm.update(timeout) t = time.monotonic() - start_time self.assertGreaterEqual(t, timeout/1000.) self.assertLess(t, 5) self.assertFalse(any(sm.updated.values()))
[docs] def test_avg_frequency_checks(self): for poll in (True, False): sm = messaging.SubMaster(["modelV2", "carParams", "carState", "cameraOdometry", "liveCalibration"], poll=("modelV2" if poll else None), frequency=(20. if not poll else None)) checks = { "carState": (20, 20), "modelV2": (20, 20 if poll else 10), "cameraOdometry": (20, 10), "liveCalibration": (4, 4), "carParams": (None, None), } for service, (max_freq, min_freq) in checks.items(): if max_freq is not None: assert sm._check_avg_freq(service) assert sm.max_freq[service] == max_freq*1.2 assert sm.min_freq[service] == min_freq*0.8 else: assert not sm._check_avg_freq(service)
[docs] def test_alive(self): pass
[docs] def test_ignore_alive(self): pass
[docs] def test_valid(self): pass
# SubMaster should always conflate
[docs] def test_conflate(self): sock = "carState" pub_sock = messaging.pub_sock(sock) sm = messaging.SubMaster([sock,]) n = 10 for i in range(n+1): msg = messaging.new_message(sock) msg.carState.vEgo = i pub_sock.send(msg.to_bytes()) time.sleep(0.01) sm.update(1000) self.assertEqual(sm[sock].vEgo, n)
[docs] class TestPubMaster(unittest.TestCase):
[docs] def setUp(self): # ZMQ pub socket takes too long to die # sleep to prevent multiple publishers error between tests zmq_sleep(3)
[docs] def test_init(self): messaging.PubMaster(events)
[docs] def test_send(self): socks = random_socks() pm = messaging.PubMaster(socks) sub_socks = {s: messaging.sub_sock(s, conflate=True, timeout=1000) for s in socks} zmq_sleep() # PubMaster accepts either a capnp msg builder or bytes for capnp in [True, False]: for i in range(100): sock = socks[i % len(socks)] if capnp: try: msg = messaging.new_message(sock) except Exception: msg = messaging.new_message(sock, random.randrange(50)) else: msg = random_bytes() pm.send(sock, msg) recvd = sub_socks[sock].receive() if capnp: msg.clear_write_flag() msg = msg.to_bytes() self.assertEqual(msg, recvd, i)
if __name__ == "__main__": unittest.main()