diff --git a/cereal/log.capnp b/cereal/log.capnp index 73a34a2ea8c6f1..78b037228182e6 100644 --- a/cereal/log.capnp +++ b/cereal/log.capnp @@ -2427,6 +2427,19 @@ struct AudioFeedback { blockNum @1 :UInt16; } +struct SoundRequest { + sound @0 :Car.CarControl.HUDControl.AudibleAlert; +} + +struct LiveStreamCamera { + camera @0 :CameraType; + + enum CameraType { + driver @0; + wideRoad @1; + } +} + struct Touch { sec @0 :Int64; usec @1 :Int64; @@ -2536,6 +2549,11 @@ struct Event { livestreamWideRoadEncodeData @121 :EncodeData; livestreamDriverEncodeData @122 :EncodeData; + livestreamCameraEncodeData @152 :EncodeData; + livestreamCameraSwitch @153 :LiveStreamCamera; + + soundRequest @154 :SoundRequest; + # *********** Custom: reserved for forks *********** # DO change the name of the field diff --git a/cereal/services.py b/cereal/services.py index c2d38d852db133..8b9821ddee45f6 100755 --- a/cereal/services.py +++ b/cereal/services.py @@ -77,6 +77,8 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "rawAudioData": (False, 20.), "bookmarkButton": (True, 0., 1), "audioFeedback": (True, 0., 1), + "soundRequest": (False, 0.), + "livestreamCameraSwitch": (False, 0.), "roadEncodeData": (False, 20., None, QueueSize.BIG), "driverEncodeData": (False, 20., None, QueueSize.BIG), "wideRoadEncodeData": (False, 20., None, QueueSize.BIG), @@ -92,6 +94,7 @@ def __init__(self, should_log: bool, frequency: float, decimation: Optional[int] "livestreamWideRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamRoadEncodeData": (False, 20., None, QueueSize.MEDIUM), "livestreamDriverEncodeData": (False, 20., None, QueueSize.MEDIUM), + "livestreamCameraEncodeData": (False, 20., None, QueueSize.MEDIUM), "customReservedRawData0": (True, 0.), } SERVICE_LIST = {name: Service(*vals) for diff --git a/selfdrive/ui/soundd.py b/selfdrive/ui/soundd.py index 8225efabf9af5a..86c36498ae155b 100644 --- a/selfdrive/ui/soundd.py +++ b/selfdrive/ui/soundd.py @@ -3,7 +3,6 @@ import time import wave - from cereal import car, messaging from openpilot.common.basedir import BASEDIR from openpilot.common.filter_simple import FirstOrderFilter @@ -128,7 +127,9 @@ def update_alert(self, new_alert): self.current_sound_frame = 0 def get_audible_alert(self, sm): - if sm.updated['selfdriveState']: + if sm.updated['soundRequest'] and sm['soundRequest'].sound.raw != AudibleAlert.none: + self.update_alert(sm['soundRequest'].sound.raw) + elif sm.updated['selfdriveState']: new_alert = sm['selfdriveState'].alertSound.raw self.update_alert(new_alert) elif check_selfdrive_timeout_alert(sm): @@ -153,7 +154,7 @@ def soundd_thread(self): # sounddevice must be imported after forking processes import sounddevice as sd - sm = messaging.SubMaster(['selfdriveState', 'soundPressure']) + sm = messaging.SubMaster(['selfdriveState', 'soundPressure', 'soundRequest']) with self.get_stream(sd) as stream: rk = Ratekeeper(20) diff --git a/system/athena/athenad.py b/system/athena/athenad.py index b52ef21ba63702..7dc7e8c24967cc 100755 --- a/system/athena/athenad.py +++ b/system/athena/athenad.py @@ -28,7 +28,7 @@ create_connection) import cereal.messaging as messaging -from cereal import log +from cereal import car, log from cereal.services import SERVICE_LIST from openpilot.common.api import Api, get_key_pair from openpilot.common.utils import CallbackReader, get_upload_stream @@ -44,6 +44,7 @@ ATHENA_HOST = os.getenv('ATHENA_HOST', 'wss://athena.comma.ai') HANDLER_THREADS = int(os.getenv('HANDLER_THREADS', "4")) LOCAL_PORT_WHITELIST = {22, } # SSH +WEBRTCD_PORT = 5001 LOG_ATTR_NAME = 'user.upload' LOG_ATTR_VALUE_MAX_UNIX_TIME = int.to_bytes(2147483647, 4, sys.byteorder) @@ -536,6 +537,16 @@ def getSshAuthorizedKeys() -> str: def getGithubUsername() -> str: return cast(str, Params().get("GithubUsername") or "") + +@dispatcher.add_method +def getNotCar() -> bool: + cp_bytes = Params().get("CarParamsPersistent") + if cp_bytes is not None: + with car.CarParams.from_bytes(cp_bytes) as CP: + return CP.notCar + return False + + @dispatcher.add_method def getSimInfo(): return HARDWARE.get_sim_info() @@ -557,6 +568,26 @@ def getNetworks(): return HARDWARE.get_networks() +@dispatcher.add_method +def startJoystickStream(sdp: str) -> dict: + from openpilot.system.webrtc.webrtcd import StreamRequestBody + body = StreamRequestBody(sdp, ["driver"], ["testJoystick", "soundRequest", "livestreamCameraSwitch"], ["carState"]) + try: + resp = requests.post(f"http://localhost:{WEBRTCD_PORT}/stream", + json=asdict(body), timeout=10) + if not resp.ok: + try: + error_body = resp.json() + raise Exception(error_body.get("message", f"webrtcd returned {resp.status_code}")) + except ValueError: + resp.raise_for_status() + return resp.json() + except requests.ConnectTimeout: + raise Exception("webrtc took too long to respond. is it on?") from None + except requests.ConnectionError: + raise Exception("webrtc is not running. turn on comma body ignition.") from None + + @dispatcher.add_method def takeSnapshot() -> str | dict[str, str] | None: from openpilot.system.camerad.snapshot import jpeg_write, snapshot diff --git a/system/loggerd/encoderd.cc b/system/loggerd/encoderd.cc index 9d4b81a3f90230..b00b7b66aed33a 100644 --- a/system/loggerd/encoderd.cc +++ b/system/loggerd/encoderd.cc @@ -151,6 +151,61 @@ void encoderd_thread(const LogCameraInfo (&cameras)[N]) { } } +template +void stream_encoderd_thread(const LogCameraInfo (&cameras)[N]) { + while (!do_exit) { + if (!VisionIpcClient::getAvailableStreams("camerad", false).empty()) break; + util::sleep_for(100); + } + + SubMaster sm({"livestreamCameraSwitch"}); + const LogCameraInfo *active_cam = &cameras[0]; + + while (!do_exit) { + VisionIpcClient vipc_client("camerad", active_cam->stream_type, false); + if (!vipc_client.connect(false)) { + util::sleep_for(5); + continue; + } + + // init encoder + const VisionBuf &buf_info = vipc_client.buffers[0]; + LOGW("stream encoder init %zux%zu", buf_info.width, buf_info.height); + assert(buf_info.width > 0 && buf_info.height > 0); + auto encoder = std::make_unique(active_cam->encoder_infos[0], buf_info.width, buf_info.height); + encoder->encoder_open(); + + while (!do_exit) { + sm.update(0); + + // Switch camera if the request differs from the current one + if (sm.updated("livestreamCameraSwitch")) { + auto requested = sm["livestreamCameraSwitch"].getLivestreamCameraSwitch().getCamera(); + VisionStreamType requested_stream = requested == cereal::LiveStreamCamera::CameraType::DRIVER + ? VISION_STREAM_DRIVER : VISION_STREAM_WIDE_ROAD; + if (requested_stream != active_cam->stream_type) { + LOGW("stream encoder switching camera"); + auto it = std::find_if(std::begin(cameras), std::end(cameras), + [requested_stream](const auto &cam) { return cam.stream_type == requested_stream; }); + if (it != std::end(cameras)) active_cam = &(*it); + break; // reinit encoder with new camera selection + } + } + + // encode frame + VisionIpcBufExtra extra; + VisionBuf *buf = vipc_client.recv(&extra); + if (buf == nullptr) continue; + if (buf->get_frame_id() != extra.frame_id) continue; + if (encoder->encode_frame(buf, &extra) == -1) { + LOGE("stream encoder: failed to encode frame. frame_id: %d", extra.frame_id); + } + } + + encoder->encoder_close(); + } +} + int main(int argc, char* argv[]) { if (!Hardware::PC()) { int ret; @@ -162,7 +217,7 @@ int main(int argc, char* argv[]) { if (argc > 1) { std::string arg1(argv[1]); if (arg1 == "--stream") { - encoderd_thread(stream_cameras_logged); + stream_encoderd_thread(stream_cameras_logged); } else { LOGE("Argument '%s' is not supported", arg1.c_str()); } diff --git a/system/loggerd/loggerd.h b/system/loggerd/loggerd.h index 6aa0c8be40b96f..804550d80c772c 100644 --- a/system/loggerd/loggerd.h +++ b/system/loggerd/loggerd.h @@ -47,8 +47,8 @@ struct EncoderSettings { } static EncoderSettings StreamEncoderSettings() { - int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 1'000'000; - return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 15}; + int _stream_bitrate = getenv("STREAM_BITRATE") ? atoi(getenv("STREAM_BITRATE")) : 4'000'000; + return EncoderSettings{.encode_type = cereal::EncodeIndex::Type::QCAMERA_H264, .bitrate = _stream_bitrate , .gop_size = 5}; } }; @@ -100,28 +100,13 @@ const EncoderInfo main_driver_encoder_info = { INIT_ENCODE_FUNCTIONS(DriverEncode), }; -const EncoderInfo stream_road_encoder_info = { - .publish_name = "livestreamRoadEncodeData", - //.thumbnail_name = "thumbnail", - .record = false, - .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, - INIT_ENCODE_FUNCTIONS(LivestreamRoadEncode), -}; - -const EncoderInfo stream_wide_road_encoder_info = { - .publish_name = "livestreamWideRoadEncodeData", +const EncoderInfo stream_encoder_info = { + .publish_name = "livestreamCameraEncodeData", .record = false, .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, INIT_ENCODE_FUNCTIONS(LivestreamWideRoadEncode), }; -const EncoderInfo stream_driver_encoder_info = { - .publish_name = "livestreamDriverEncodeData", - .record = false, - .get_settings = [](int){return EncoderSettings::StreamEncoderSettings();}, - INIT_ENCODE_FUNCTIONS(LivestreamDriverEncode), -}; - const EncoderInfo qcam_encoder_info = { .publish_name = "qRoadEncodeData", .filename = "qcamera.ts", @@ -153,20 +138,20 @@ const LogCameraInfo driver_camera_info{ const LogCameraInfo stream_road_camera_info{ .thread_name = "road_cam_encoder", .stream_type = VISION_STREAM_ROAD, - .encoder_infos = {stream_road_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo stream_wide_road_camera_info{ .thread_name = "wide_road_cam_encoder", .stream_type = VISION_STREAM_WIDE_ROAD, - .encoder_infos = {stream_wide_road_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo stream_driver_camera_info{ .thread_name = "driver_cam_encoder", .stream_type = VISION_STREAM_DRIVER, - .encoder_infos = {stream_driver_encoder_info} + .encoder_infos = {stream_encoder_info} }; const LogCameraInfo cameras_logged[] = {road_camera_info, wide_road_camera_info, driver_camera_info}; -const LogCameraInfo stream_cameras_logged[] = {stream_road_camera_info, stream_wide_road_camera_info, stream_driver_camera_info}; +const LogCameraInfo stream_cameras_logged[] = {stream_driver_camera_info, stream_wide_road_camera_info}; diff --git a/system/webrtc/device/video.py b/system/webrtc/device/video.py index 50feab4f4a910d..54f333b9689817 100644 --- a/system/webrtc/device/video.py +++ b/system/webrtc/device/video.py @@ -1,27 +1,42 @@ import asyncio +import struct import time - import av from teleoprtc.tracks import TiciVideoStreamTrack from cereal import messaging from openpilot.common.realtime import DT_MDL, DT_DMON +# arbitrary 16-byte UUID identifying openpilot frame-timing SEI messages +TIMING_SEI_UUID = bytes([ + 0xa5, 0xe0, 0xc4, 0xa4, 0x5b, 0x6e, 0x4e, 0x1e, + 0x9c, 0x7e, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, +]) +_SEI_PREFIX = b'\x00\x00\x00\x01\x06\x05\x30' + TIMING_SEI_UUID -class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): - camera_to_sock_mapping = { - "driver": "livestreamDriverEncodeData", - "wideRoad": "livestreamWideRoadEncodeData", - "road": "livestreamRoadEncodeData", - } +class LiveStreamVideoStreamTrack(TiciVideoStreamTrack): def __init__(self, camera_type: str): dt = DT_DMON if camera_type == "driver" else DT_MDL super().__init__(camera_type, dt) - self._sock = messaging.sub_sock(self.camera_to_sock_mapping[camera_type], conflate=True) - self._pts = 0 + self._sock = messaging.sub_sock("livestreamCameraEncodeData", conflate=True) self._t0_ns = time.monotonic_ns() + self.timing_sei_enabled = False + + def _build_frame_data(self, msg) -> bytes: + encode_data = getattr(msg, msg.which()) + if not self.timing_sei_enabled: + return encode_data.header + encode_data.data + + idx = encode_data.idx + sei_nal = _SEI_PREFIX + struct.pack('>4d', + (idx.timestampEof - idx.timestampSof) / 1e6, + (msg.logMonoTime - idx.timestampEof) / 1e6, + (time.monotonic_ns() - msg.logMonoTime) / 1e6, + time.time() * 1000, # noqa: TID251 + ) + b'\x80' + return encode_data.header + sei_nal + encode_data.data async def recv(self): while True: @@ -30,15 +45,12 @@ async def recv(self): break await asyncio.sleep(0.005) - evta = getattr(msg, msg.which()) - - packet = av.Packet(evta.header + evta.data) + packet = av.Packet(self._build_frame_data(msg)) packet.time_base = self._time_base - self._pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 - packet.pts = self._pts - self.log_debug("track sending frame %d", self._pts) - + pts = ((time.monotonic_ns() - self._t0_ns) * self._clock_rate) // 1_000_000_000 + packet.pts = pts + self.log_debug("track sending frame %d", pts) return packet def codec_preference(self) -> str | None: diff --git a/system/webrtc/tests/test_stream_session.py b/system/webrtc/tests/test_stream_session.py index f44d217d58ced6..9730f9e16e3d07 100644 --- a/system/webrtc/tests/test_stream_session.py +++ b/system/webrtc/tests/test_stream_session.py @@ -67,7 +67,7 @@ def test_incoming_proxy(self, mocker): mocked_pubmaster.reset_mock() def test_livestream_track(self, mocker): - fake_msg = messaging.new_message("livestreamDriverEncodeData") + fake_msg = messaging.new_message("livestreamCameraEncodeData") config = {"receive.return_value": fake_msg.to_bytes()} mocker.patch("msgq.SubSocket", spec=True, **config) diff --git a/system/webrtc/webrtcd.py b/system/webrtc/webrtcd.py index d2c90cafb5b2e6..0b64d579384858 100755 --- a/system/webrtc/webrtcd.py +++ b/system/webrtc/webrtcd.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 - +import time import argparse import asyncio +import contextlib import json -import uuid import logging +import uuid from dataclasses import dataclass, field from typing import Any, TYPE_CHECKING @@ -21,6 +22,8 @@ from openpilot.system.webrtc.schema import generate_field from cereal import messaging, log +INITIAL_CAMERA = "driver" +REQUIRED_VIDEO_CODEC = "H264" class CerealOutgoingMessageProxy: def __init__(self, sm: messaging.SubMaster): @@ -83,11 +86,16 @@ def start(self): assert self.task is None self.task = asyncio.create_task(self.run()) - def stop(self): - if self.task is None or self.task.done(): + async def stop(self): + if self.task is None: return - self.task.cancel() + task = self.task self.task = None + if task.done(): + return + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task async def run(self): from aiortc.exceptions import InvalidStateError @@ -122,14 +130,11 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o from aiortc.mediastreams import VideoStreamTrack from openpilot.system.webrtc.device.video import LiveStreamVideoStreamTrack from teleoprtc import WebRTCAnswerBuilder - from teleoprtc.info import parse_info_from_offer - config = parse_info_from_offer(sdp) builder = WebRTCAnswerBuilder(sdp) - assert len(cameras) == config.n_expected_camera_tracks, "Incoming stream has misconfigured number of video tracks" - for cam in cameras: - builder.add_video_stream(cam, LiveStreamVideoStreamTrack(cam) if not debug_mode else VideoStreamTrack()) + self.video_track = LiveStreamVideoStreamTrack(INITIAL_CAMERA) if not debug_mode else VideoStreamTrack() + builder.add_video_stream(INITIAL_CAMERA, self.video_track) self.stream = builder.stream() self.identifier = str(uuid.uuid4()) @@ -145,27 +150,53 @@ def __init__(self, sdp: str, cameras: list[str], incoming_services: list[str], o self.outgoing_bridge_runner = CerealProxyRunner(self.outgoing_bridge) self.run_task: asyncio.Task | None = None + self._cleanup_lock = asyncio.Lock() + self._cleanup_done = False self.logger = logging.getLogger("webrtcd") - self.logger.info("New stream session (%s), cameras %s, incoming services %s, outgoing services %s", - self.identifier, cameras, incoming_services, outgoing_services) + self.logger.info( + "New stream session (%s), cameras %s, incoming services %s, outgoing services %s", + self.identifier, cameras, incoming_services, outgoing_services, + ) def start(self): self.run_task = asyncio.create_task(self.run()) - def stop(self): - if self.run_task.done(): - return - self.run_task.cancel() + async def stop(self): + if self.run_task is not None and not self.run_task.done() and self.run_task is not asyncio.current_task(): + self.run_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self.run_task self.run_task = None - asyncio.run(self.post_run_cleanup()) + await self.post_run_cleanup() async def get_answer(self): return await self.stream.start() - async def message_handler(self, message: bytes): - assert self.incoming_bridge is not None + def message_handler(self, message: bytes): try: - self.incoming_bridge.send(message) + payload = json.loads(message) if isinstance(message, (bytes, str)) else None + if not isinstance(payload, dict): + raise ValueError + msg_type = payload.get("type") + + if msg_type == "clockSync": + data = payload.get("data", {}) + pong = json.dumps({"type": "clockSync", "data": { + "action": "pong", "browserSendTime": data.get("browserSendTime"), "deviceTime": time.time() * 1000, # noqa: TID251 + }}) + self.stream.get_messaging_channel().send(pong) + return + + if msg_type == "enableTimingSei": + enabled = bool(payload.get("data", {}).get("enabled")) + if hasattr(self.video_track, 'timing_sei_enabled'): + self.video_track.timing_sei_enabled = enabled + return + + if self.incoming_bridge is not None: + self.incoming_bridge.send(message) + except ValueError: + self.logger.warning("Ignoring malformed request: %s", payload) except Exception: self.logger.exception("Cereal incoming proxy failure") @@ -175,24 +206,32 @@ async def run(self): if self.stream.has_messaging_channel(): if self.incoming_bridge is not None: await self.shared_pub_master.add_services_if_needed(self.incoming_bridge_services) - self.stream.set_message_handler(self.message_handler) + # set camera to default + self.incoming_bridge.send(json.dumps({"type": "livestreamCameraSwitch", "data": {"camera": INITIAL_CAMERA}}).encode()) + self.stream.set_message_handler(self.message_handler) if self.outgoing_bridge_runner is not None: channel = self.stream.get_messaging_channel() self.outgoing_bridge_runner.proxy.add_channel(channel) self.outgoing_bridge_runner.start() + self.logger.info("Stream session (%s) connected", self.identifier) await self.stream.wait_for_disconnection() - await self.post_run_cleanup() self.logger.info("Stream session (%s) ended", self.identifier) except Exception: self.logger.exception("Stream session failure") + finally: + await self.post_run_cleanup() async def post_run_cleanup(self): - await self.stream.stop() - if self.outgoing_bridge is not None: - self.outgoing_bridge_runner.stop() + async with self._cleanup_lock: + if self._cleanup_done: + return + self._cleanup_done = True + if self.outgoing_bridge_runner is not None: + await self.outgoing_bridge_runner.stop() + await self.stream.stop() @dataclass @@ -202,19 +241,84 @@ class StreamRequestBody: bridge_services_in: list[str] = field(default_factory=list) bridge_services_out: list[str] = field(default_factory=list) +def _add_cors_headers(_, response: 'web.Response'): + response.headers["Access-Control-Allow-Origin"] = "*" + response.headers["Access-Control-Allow-Headers"] = "Content-Type" + response.headers["Access-Control-Allow-Methods"] = "POST, OPTIONS" + response.headers["Access-Control-Allow-Private-Network"] = "true" -async def get_stream(request: 'web.Request'): - stream_dict, debug_mode = request.app['streams'], request.app['debug'] - raw_body = await request.json() - body = StreamRequestBody(**raw_body) - session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) - answer = await session.get_answer() - session.start() +@web.middleware +async def cors_middleware(request: 'web.Request', handler): + try: + response = await handler(request) + except web.HTTPException as ex: + _add_cors_headers(request, ex) + raise + _add_cors_headers(request, response) + return response + + +async def stream_options(request: 'web.Request'): + response = web.Response() + _add_cors_headers(request, response) + return response + + +def _validate_sdp_video_codecs(sdp: str): + import aiortc.sdp + desc = aiortc.sdp.SessionDescription.parse(sdp) + required_mime = f"video/{REQUIRED_VIDEO_CODEC}" + for m in desc.media: + if m.kind != "video": + continue + offered_mimes = {c.mimeType for c in m.rtp.codecs} + if required_mime not in offered_mimes: + raise web.HTTPBadRequest( + text=json.dumps({"error": "unsupported_codec", "message": f"Frontend must offer {REQUIRED_VIDEO_CODEC} via setCodecPreferences()"}), + content_type="application/json", + ) - stream_dict[session.identifier] = session - return web.json_response({"sdp": answer.sdp, "type": answer.type}) +async def get_stream(request: 'web.Request'): + logger = logging.getLogger("webrtcd") + try: + stream_dict, debug_mode = request.app['streams'], request.app['debug'] + + raw_body = await request.json() + body = StreamRequestBody(**raw_body) + _validate_sdp_video_codecs(body.sdp) + + async with request.app['stream_lock']: + # Fully disconnect any other active stream before starting the replacement. + for sid, s in list(stream_dict.items()): + if s.run_task and not s.run_task.done(): + try: + ch = s.stream.get_messaging_channel() + ch.send(json.dumps({"type": "connectionReplaced", "data": "Another device has connected, closing this session."})) + except Exception: + pass + await s.stop() + del stream_dict[sid] + + session = StreamSession(body.sdp, body.cameras, body.bridge_services_in, body.bridge_services_out, debug_mode) + try: + answer = await session.get_answer() + except Exception: + await session.stop() + raise + session.start() + + stream_dict[session.identifier] = session + + response = web.json_response({"sdp": answer.sdp, "type": answer.type}) + _add_cors_headers(request, response) + return response + except web.HTTPException: + raise + except Exception: + logger.exception("Error in /stream handler") + raise async def get_schema(request: 'web.Request'): @@ -224,6 +328,7 @@ async def get_schema(request: 'web.Request'): schema_dict = {s: generate_field(log.Event.schema.fields[s]) for s in services} return web.json_response(schema_dict) + async def post_notify(request: 'web.Request'): try: payload = await request.json() @@ -239,9 +344,10 @@ async def post_notify(request: 'web.Request'): return web.Response(status=200, text="OK") + async def on_shutdown(app: 'web.Application'): for session in app['streams'].values(): - session.stop() + await session.stop() del app['streams'] @@ -251,11 +357,13 @@ def webrtcd_thread(host: str, port: int, debug: bool): logging.getLogger("WebRTCStream").setLevel(logging_level) logging.getLogger("webrtcd").setLevel(logging_level) - app = web.Application() + app = web.Application(middlewares=[cors_middleware]) app['streams'] = dict() + app['stream_lock'] = asyncio.Lock() app['debug'] = debug app.on_shutdown.append(on_shutdown) + app.router.add_route("OPTIONS", "/stream", stream_options) app.router.add_post("/stream", get_stream) app.router.add_post("/notify", post_notify) app.router.add_get("/schema", get_schema)