diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 50feab4f4a910d..35036d7b505973 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -1,4 +1,5 @@ import asyncio +import struct import time import av @@ -7,6 +8,13 @@ from cereal import messaging from openpilot.common.realtime import DT_MDL, DT_DMON +# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages +TIMING_SEI_UUID = bytes([ + 0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e, + 0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, +]) +_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID + class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): camera_to_sock_mapping = { @@ -22,6 +30,21 @@ def __init__(self, camera_type: str): self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) self._pts = 0 self._t0_ns = time.monotonic_ns() + self.timing_sei_enabled = False + + def _build_frame_data(self, msg) -> bytes: + encode_data = getattr(msg, msg.which()) + if not self.timing_sei_enabled: + return encode_data.header + encode_data.data + + idx = encode_data.idx + sei_nal = _SEI_PREFIX + struct.pack('>4d', + (idx.timestampEof - idx.timestampSof) / 1e6, + (msg.logMonoTime - idx.timestampEof) / 1e6, + (time.monotonic_ns() - msg.logMonoTime) / 1e6, + time.time() * 1000, # noqa: TID251 + ) + b'\x80' + return encode_data.header + sei_nal + encode_data.data async def recv(self): while True: @@ -30,9 +53,7 @@ async def recv(self): break await asyncio.sleep(0.005) - evta = getattr(msg, msg.which()) - - packet = av.Packet(evta.header + evta.data) + packet = av.Packet(self._build_frame_data(msg)) packet.time_base = self._time_base self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index 5f66e62bb85fde..08b26c47b585f0 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +import time import argparse import asyncio import contextlib @@ -174,8 +175,26 @@ async def get_answer(self): return await self.stream.start() def message_handler(self, message: bytes): - assert self.incoming_bridge is not None try: + payload = json.loads(message) if isinstance(message, (bytes, str)) else None + if isinstance(payload, dict): + msg_type = payload.get("type") + + if msg_type == "clockSync": + data = payload.get("data", {}) + pong = json.dumps({"type": "clockSync", "data": { + "action": "pong", "browserSendTime": data.get("browserSendTime"), "deviceTime": time.time() * 1000, # noqa: TID251 + }}) + self.stream.get_messaging_channel().send(pong) + return + + if msg_type == "enableTimingSei": + enabled = bool(payload.get("data", {}).get("enabled")) + for track in self.video_tracks: + if hasattr(track, 'timing_sei_enabled'): + track.timing_sei_enabled = enabled + return + self.incoming_bridge.send(message) except Exception: self.logger.exception("Cereal incoming proxy failure")