From e622925051e4f9b029424e6597f6cdc9d827f6bb Mon Sep 17 00:00:00 2001 From: INCIRIOS Date: Mon, 18 May 2026 10:49:36 +0100 Subject: [PATCH 1/5] Add coroutine WebSocket controller API and integration tests --- lib/inc/drogon/WebSocketController.h | 127 ++++++++++++++++++ lib/tests/CMakeLists.txt | 6 +- .../client/WebSocketCoroTest.cc | 93 +++++++++++++ .../server/WebSocketCoroTest.cc | 53 ++++++++ .../server/WebSocketCoroTest.h | 31 +++++ 5 files changed, 309 insertions(+), 1 deletion(-) create mode 100644 lib/tests/integration_test/client/WebSocketCoroTest.cc create mode 100644 lib/tests/integration_test/server/WebSocketCoroTest.cc create mode 100644 lib/tests/integration_test/server/WebSocketCoroTest.h diff --git a/lib/inc/drogon/WebSocketController.h b/lib/inc/drogon/WebSocketController.h index 70fb77f5b4..4d15c9e20e 100644 --- a/lib/inc/drogon/WebSocketController.h +++ b/lib/inc/drogon/WebSocketController.h @@ -23,6 +23,9 @@ #include #include #include +#ifdef __cpp_impl_coroutine +#include +#endif #define WS_PATH_LIST_BEGIN \ static void initPathRouting() \ @@ -31,6 +34,11 @@ #define WS_ADD_PATH_VIA_REGEX(regExp, ...) \ registerSelfRegex__(regExp, {__VA_ARGS__}) #define WS_PATH_LIST_END } +#define WS_CORO_PATH_LIST_BEGIN WS_PATH_LIST_BEGIN +#define WS_CORO_PATH_ADD(path, ...) WS_PATH_ADD(path, __VA_ARGS__) +#define WS_CORO_ADD_PATH_VIA_REGEX(regExp, ...) \ + WS_ADD_PATH_VIA_REGEX(regExp, __VA_ARGS__) +#define WS_CORO_PATH_LIST_END WS_PATH_LIST_END namespace drogon { @@ -135,4 +143,123 @@ template typename WebSocketController::pathRegistrator WebSocketController::registrator_; +#ifdef __cpp_impl_coroutine +/** + * @brief The abstract base class for coroutine WebSocket controllers. + */ +class WebSocketCoroControllerBase : public WebSocketControllerBase +{ + public: + ~WebSocketCoroControllerBase() override = default; + + void handleNewMessage(const WebSocketConnectionPtr &conn, + std::string &&message, + const WebSocketMessageType &type) final + { + drogon::async_run([this, + conn, + message = std::move(message), + type]() mutable -> Task<> { + co_await handleNewMessageCoro(conn, std::move(message), type); + }); + } + + void handleNewConnection(const HttpRequestPtr &req, + const WebSocketConnectionPtr &conn) final + { + drogon::async_run([this, req, conn]() mutable -> Task<> { + co_await handleNewConnectionCoro(req, conn); + }); + } + + void handleConnectionClosed(const WebSocketConnectionPtr &conn) final + { + drogon::async_run([this, conn]() mutable -> Task<> { + co_await handleConnectionClosedCoro(conn); + }); + } + + virtual Task<> handleNewMessageCoro(const WebSocketConnectionPtr &, + std::string &&, + const WebSocketMessageType &) = 0; + + virtual Task<> handleNewConnectionCoro(const HttpRequestPtr &, + const WebSocketConnectionPtr &) = 0; + + virtual Task<> handleConnectionClosedCoro( + const WebSocketConnectionPtr &) = 0; +}; + +/** + * @brief Reflection base class template for coroutine WebSocket controllers. + */ +template +class WebSocketCoroController : public DrObject, + public WebSocketCoroControllerBase +{ + public: + static const bool isAutoCreation = AutoCreation; + + virtual ~WebSocketCoroController() + { + } + + protected: + WebSocketCoroController() + { + } + + static void registerSelf__( + const std::string &path, + const std::vector &constraints) + { + LOG_TRACE << "register websocket coro controller(" + << WebSocketCoroController::classTypeName() + << ") on path:" << path; + app().registerWebSocketController( + path, + WebSocketCoroController::classTypeName(), + constraints); + } + + static void registerSelfRegex__( + const std::string ®Exp, + const std::vector &constraints) + { + LOG_TRACE << "register websocket coro controller(" + << WebSocketCoroController::classTypeName() + << ") on regExp:" << regExp; + app().registerWebSocketControllerRegex( + regExp, + WebSocketCoroController::classTypeName(), + constraints); + } + + private: + class pathRegistrator + { + public: + pathRegistrator() + { + if (AutoCreation) + { + T::initPathRouting(); + } + } + }; + + friend pathRegistrator; + static pathRegistrator registrator_; + + virtual void *touch() + { + return ®istrator_; + } +}; + +template +typename WebSocketCoroController::pathRegistrator + WebSocketCoroController::registrator_; +#endif + } // namespace drogon diff --git a/lib/tests/CMakeLists.txt b/lib/tests/CMakeLists.txt index c8249b41ab..f8af556a88 100644 --- a/lib/tests/CMakeLists.txt +++ b/lib/tests/CMakeLists.txt @@ -85,7 +85,11 @@ if (BUILD_CTL) set(INTEGRATION_TEST_SERVER_SOURCES ${INTEGRATION_TEST_SERVER_SOURCES} integration_test/server/CoroFilter.cpp - integration_test/server/api_v1_CoroTest.cc) + integration_test/server/api_v1_CoroTest.cc + integration_test/server/WebSocketCoroTest.cc) + set(INTEGRATION_TEST_CLIENT_SOURCES + ${INTEGRATION_TEST_CLIENT_SOURCES} + integration_test/client/WebSocketCoroTest.cc) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_STANDARD_REQUIRED TRUE) endif(DROGON_CXX_STANDARD GREATER_EQUAL 20 AND HAS_COROUTINE) diff --git a/lib/tests/integration_test/client/WebSocketCoroTest.cc b/lib/tests/integration_test/client/WebSocketCoroTest.cc new file mode 100644 index 0000000000..f0ee9b4237 --- /dev/null +++ b/lib/tests/integration_test/client/WebSocketCoroTest.cc @@ -0,0 +1,93 @@ +#include +#include + +#include +#include +#include + +using namespace drogon; +using namespace std::chrono_literals; + +DROGON_TEST(WebSocketCoroControllerTest) +{ +#if defined(__cpp_impl_coroutine) + auto promise = std::make_shared>(); + auto future = promise->get_future(); + + auto first = WebSocketClient::newWebSocketClient("127.0.0.1", 8848); + auto second = WebSocketClient::newWebSocketClient("127.0.0.1", 8848); + + first->setMessageHandler( + [first, second, promise, TEST_CTX](const std::string &message, + const WebSocketClientPtr &, + const WebSocketMessageType &type) { + CHECK(type == WebSocketMessageType::Text); + if (message == "opened") + { + first->getConnection()->send("hello-coro"); + } + else if (message == "coro:hello-coro") + { + first->getConnection()->shutdown(); + } + }); + + first->setConnectionClosedHandler([second, promise, TEST_CTX]( + const WebSocketClientPtr &) { + auto req2 = HttpRequest::newHttpRequest(); + req2->setPath("/coro-chat"); + + second->setMessageHandler( + [second, promise, TEST_CTX](const std::string &message, + const WebSocketClientPtr &, + const WebSocketMessageType &type) { + CHECK(type == WebSocketMessageType::Text); + if (message == "opened") + { + second->getConnection()->send("stats"); + return; + } + + if (message.rfind("stats:", 0) == 0) + { + int opened = 0; + int msg = 0; + int closed = 0; + auto parsed = sscanf(message.c_str(), + "stats:%d:%d:%d", + &opened, + &msg, + &closed); + REQUIRE(parsed == 3); + CHECK(opened >= 2); + CHECK(msg >= 2); + CHECK(closed >= 1); + second->stop(); + promise->set_value(); + } + }); + + second->connectToServer(req2, + [second, TEST_CTX](ReqResult r, + const HttpResponsePtr &resp, + const WebSocketClientPtr &) { + REQUIRE(r == ReqResult::Ok); + REQUIRE(resp != nullptr); + CHECK(second->getConnection()->connected()); + }); + }); + + auto req = HttpRequest::newHttpRequest(); + req->setPath("/coro-chat"); + first->connectToServer(req, + [first, TEST_CTX](ReqResult r, + const HttpResponsePtr &resp, + const WebSocketClientPtr &) { + REQUIRE(r == ReqResult::Ok); + REQUIRE(resp != nullptr); + CHECK(first->getConnection()->connected()); + }); + + REQUIRE(future.wait_for(5s) == std::future_status::ready); +#endif +} diff --git a/lib/tests/integration_test/server/WebSocketCoroTest.cc b/lib/tests/integration_test/server/WebSocketCoroTest.cc new file mode 100644 index 0000000000..48b9dcf835 --- /dev/null +++ b/lib/tests/integration_test/server/WebSocketCoroTest.cc @@ -0,0 +1,53 @@ +#include "WebSocketCoroTest.h" + +#include + +using namespace example; + +std::atomic WebSocketCoroTest::openedCount_{0}; +std::atomic WebSocketCoroTest::messageCount_{0}; +std::atomic WebSocketCoroTest::closedCount_{0}; + +drogon::Task<> WebSocketCoroTest::handleNewConnectionCoro( + const drogon::HttpRequestPtr &, + const drogon::WebSocketConnectionPtr &conn) +{ + ++openedCount_; + co_await drogon::sleepCoro(drogon::app().getLoop(), 0.001); + conn->send("opened"); + co_return; +} + +drogon::Task<> WebSocketCoroTest::handleNewMessageCoro( + const drogon::WebSocketConnectionPtr &conn, + std::string &&message, + const drogon::WebSocketMessageType &type) +{ + if (type != drogon::WebSocketMessageType::Text) + { + co_return; + } + + ++messageCount_; + co_await drogon::sleepCoro(drogon::app().getLoop(), 0.001); + + if (message == "stats") + { + conn->send("stats:" + std::to_string(openedCount_.load()) + ":" + + std::to_string(messageCount_.load()) + ":" + + std::to_string(closedCount_.load())); + } + else + { + conn->send("coro:" + message); + } + co_return; +} + +drogon::Task<> WebSocketCoroTest::handleConnectionClosedCoro( + const drogon::WebSocketConnectionPtr &) +{ + ++closedCount_; + co_await drogon::sleepCoro(drogon::app().getLoop(), 0.001); + co_return; +} diff --git a/lib/tests/integration_test/server/WebSocketCoroTest.h b/lib/tests/integration_test/server/WebSocketCoroTest.h new file mode 100644 index 0000000000..5b376e3d41 --- /dev/null +++ b/lib/tests/integration_test/server/WebSocketCoroTest.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +namespace example +{ +class WebSocketCoroTest + : public drogon::WebSocketCoroController +{ + public: + drogon::Task<> handleNewMessageCoro( + const drogon::WebSocketConnectionPtr &, + std::string &&, + const drogon::WebSocketMessageType &) override; + drogon::Task<> handleConnectionClosedCoro( + const drogon::WebSocketConnectionPtr &) override; + drogon::Task<> handleNewConnectionCoro( + const drogon::HttpRequestPtr &, + const drogon::WebSocketConnectionPtr &) override; + + WS_CORO_PATH_LIST_BEGIN + WS_CORO_PATH_ADD("/coro-chat", drogon::Get); + WS_CORO_PATH_LIST_END + + private: + static std::atomic openedCount_; + static std::atomic messageCount_; + static std::atomic closedCount_; +}; +} // namespace example From 039b9619bf56f5776248f5de65e053c96dbdf219 Mon Sep 17 00:00:00 2001 From: INCIRIOS Date: Mon, 18 May 2026 16:14:41 +0100 Subject: [PATCH 2/5] reshaping websocker controller to match http coro --- lib/inc/drogon/WebSocketController.h | 59 +++++++++++++++------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/lib/inc/drogon/WebSocketController.h b/lib/inc/drogon/WebSocketController.h index 4d15c9e20e..e265458704 100644 --- a/lib/inc/drogon/WebSocketController.h +++ b/lib/inc/drogon/WebSocketController.h @@ -152,33 +152,6 @@ class WebSocketCoroControllerBase : public WebSocketControllerBase public: ~WebSocketCoroControllerBase() override = default; - void handleNewMessage(const WebSocketConnectionPtr &conn, - std::string &&message, - const WebSocketMessageType &type) final - { - drogon::async_run([this, - conn, - message = std::move(message), - type]() mutable -> Task<> { - co_await handleNewMessageCoro(conn, std::move(message), type); - }); - } - - void handleNewConnection(const HttpRequestPtr &req, - const WebSocketConnectionPtr &conn) final - { - drogon::async_run([this, req, conn]() mutable -> Task<> { - co_await handleNewConnectionCoro(req, conn); - }); - } - - void handleConnectionClosed(const WebSocketConnectionPtr &conn) final - { - drogon::async_run([this, conn]() mutable -> Task<> { - co_await handleConnectionClosedCoro(conn); - }); - } - virtual Task<> handleNewMessageCoro(const WebSocketConnectionPtr &, std::string &&, const WebSocketMessageType &) = 0; @@ -204,6 +177,38 @@ class WebSocketCoroController : public DrObject, { } + void handleNewMessage(const WebSocketConnectionPtr &conn, + std::string &&message, + const WebSocketMessageType &type) final + { + auto objPtr = DrClassMap::getSingleInstance(); + drogon::async_run([objPtr, + conn, + message = std::move(message), + type]() mutable -> Task<> { + co_await objPtr->handleNewMessageCoro(conn, + std::move(message), + type); + }); + } + + void handleNewConnection(const HttpRequestPtr &req, + const WebSocketConnectionPtr &conn) final + { + auto objPtr = DrClassMap::getSingleInstance(); + drogon::async_run([objPtr, req, conn]() mutable -> Task<> { + co_await objPtr->handleNewConnectionCoro(req, conn); + }); + } + + void handleConnectionClosed(const WebSocketConnectionPtr &conn) final + { + auto objPtr = DrClassMap::getSingleInstance(); + drogon::async_run([objPtr, conn]() mutable -> Task<> { + co_await objPtr->handleConnectionClosedCoro(conn); + }); + } + protected: WebSocketCoroController() { From d9fa67363553071618306a2b32518a022481041f Mon Sep 17 00:00:00 2001 From: INCIRIOS Date: Mon, 18 May 2026 16:22:55 +0100 Subject: [PATCH 3/5] Fix websocket handshake binder lifetime --- lib/src/HttpServer.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/src/HttpServer.cc b/lib/src/HttpServer.cc index f6e4bff1be..f284cf7eae 100644 --- a/lib/src/HttpServer.cc +++ b/lib/src/HttpServer.cc @@ -758,7 +758,10 @@ void HttpServer::websocketRequestHandling( std::function &&callback, WebSocketConnectionImplPtr &&wsConnPtr) { - binderPtr->handleRequest( + auto binder = std::move(binderPtr); + auto wsConn = std::move(wsConnPtr); + + binder->handleRequest( req, [req, callback = std::move(callback)](const HttpResponsePtr &resp) { AopAdvice::instance().passPostHandlingAdvices(req, resp); @@ -766,8 +769,8 @@ void HttpServer::websocketRequestHandling( }); // TODO: more elegant? - static_cast(binderPtr.get()) - ->handleNewConnection(req, wsConnPtr); + static_cast(binder.get()) + ->handleNewConnection(req, wsConn); } void HttpServer::handleResponse( From 430e652963c7d378c86493bad45cf9cec3fb2b8b Mon Sep 17 00:00:00 2001 From: INCIRIOS Date: Mon, 18 May 2026 16:27:17 +0100 Subject: [PATCH 4/5] Stabilize websocket handshake request lifetime --- lib/src/HttpServer.cc | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/src/HttpServer.cc b/lib/src/HttpServer.cc index f284cf7eae..65602f6fd9 100644 --- a/lib/src/HttpServer.cc +++ b/lib/src/HttpServer.cc @@ -758,19 +758,21 @@ void HttpServer::websocketRequestHandling( std::function &&callback, WebSocketConnectionImplPtr &&wsConnPtr) { + auto request = req; auto binder = std::move(binderPtr); auto wsConn = std::move(wsConnPtr); binder->handleRequest( - req, - [req, callback = std::move(callback)](const HttpResponsePtr &resp) { - AopAdvice::instance().passPostHandlingAdvices(req, resp); + request, + [request, callback = std::move(callback)]( + const HttpResponsePtr &resp) { + AopAdvice::instance().passPostHandlingAdvices(request, resp); callback(resp); }); // TODO: more elegant? static_cast(binder.get()) - ->handleNewConnection(req, wsConn); + ->handleNewConnection(request, wsConn); } void HttpServer::handleResponse( From a92c682969a9c0b7cb588a8e3eab271cb433b64d Mon Sep 17 00:00:00 2001 From: INCIRIOS Date: Tue, 19 May 2026 09:18:11 +0100 Subject: [PATCH 5/5] Apply clang-format to websocket coroutine changes --- lib/inc/drogon/WebSocketController.h | 3 ++- lib/src/HttpServer.cc | 3 +-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/inc/drogon/WebSocketController.h b/lib/inc/drogon/WebSocketController.h index e265458704..003d800a5b 100644 --- a/lib/inc/drogon/WebSocketController.h +++ b/lib/inc/drogon/WebSocketController.h @@ -164,7 +164,8 @@ class WebSocketCoroControllerBase : public WebSocketControllerBase }; /** - * @brief Reflection base class template for coroutine WebSocket controllers. + * @brief Reflection base class template for coroutine WebSocket + * controllers. */ template class WebSocketCoroController : public DrObject, diff --git a/lib/src/HttpServer.cc b/lib/src/HttpServer.cc index 65602f6fd9..1daef46809 100644 --- a/lib/src/HttpServer.cc +++ b/lib/src/HttpServer.cc @@ -764,8 +764,7 @@ void HttpServer::websocketRequestHandling( binder->handleRequest( request, - [request, callback = std::move(callback)]( - const HttpResponsePtr &resp) { + [request, callback = std::move(callback)](const HttpResponsePtr &resp) { AopAdvice::instance().passPostHandlingAdvices(request, resp); callback(resp); });