From e0739d4922d1a555a7ac07c5b09e80f7287cf2d0 Mon Sep 17 00:00:00 2001 From: yousongyang Date: Tue, 9 Jun 2026 11:20:27 +0800 Subject: [PATCH 01/12] =?UTF-8?q?ETCD=20=E4=B8=8E=20Discovery=20=E5=88=86?= =?UTF-8?q?=E7=A6=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/atframe/atapp.h | 16 +- include/atframe/etcdcli/etcd_discovery.h | 7 +- include/atframe/modules/etcd_module.h | 272 +-- .../modules/service_discovery_module.h | 309 ++++ src/CMakeLists.txt | 2 + src/atframe/atapp.cpp | 215 ++- .../connectors/atapp_connector_atbus.cpp | 6 +- src/atframe/etcdcli/etcd_discovery.cpp | 9 +- src/atframe/modules/etcd_module.cpp | 1601 ++--------------- .../modules/service_discovery_module.cpp | 1556 ++++++++++++++++ test/case/atapp_direct_connect_test.cpp | 31 +- test/case/atapp_discovery_reconnect_test.cpp | 17 +- test/case/atapp_discovery_test.cpp | 24 +- test/case/atapp_downstream_send_test.cpp | 29 +- test/case/atapp_error_recovery_test.cpp | 17 +- test/case/atapp_etcd_cluster_test.cpp | 145 +- test/case/atapp_etcd_module_test.cpp | 545 +++--- test/case/atapp_etcd_module_unit_test.cpp | 49 +- test/case/atapp_message_test.cpp | 15 +- test/case/atapp_topology_change_test.cpp | 57 +- test/case/atapp_upstream_forward_test.cpp | 59 +- 21 files changed, 2633 insertions(+), 2348 deletions(-) create mode 100644 include/atframe/modules/service_discovery_module.h create mode 100644 src/atframe/modules/service_discovery_module.cpp diff --git a/include/atframe/atapp.h b/include/atframe/atapp.h index d4d31f5..e128876 100644 --- a/include/atframe/atapp.h +++ b/include/atframe/atapp.h @@ -50,9 +50,11 @@ ATBUS_MACRO_NAMESPACE_END LIBATAPP_MACRO_NAMESPACE_BEGIN class etcd_module; +class service_discovery_module; class worker_pool_module; class atapp_connector_atbus; class atapp_connector_loopback; +struct curl_multi_guard_type; class app { public: @@ -394,7 +396,8 @@ class app { LIBATAPP_MACRO_API void pack(atapp::protocol::atapp_discovery &out) const; - LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::etcd_module> &get_etcd_module() const noexcept; + LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::service_discovery_module> & + get_service_discovery_module() const noexcept; LIBATAPP_MACRO_API const std::shared_ptr<::atframework::atapp::worker_pool_module> &get_worker_pool_module() const noexcept; @@ -601,6 +604,8 @@ class app { int setup_atbus(); + int setup_curl_multi(); + static void close_timer(timer_ptr_t &t); int setup_tick_timer(); @@ -663,8 +668,8 @@ class app { int command_handler_stop(atfw::util::cli::callback_param params); int command_handler_reload(atfw::util::cli::callback_param params); int command_handler_invalid(atfw::util::cli::callback_param params); - int command_handler_disable_etcd(atfw::util::cli::callback_param params); - int command_handler_enable_etcd(atfw::util::cli::callback_param params); + int command_handler_disable_discovery(atfw::util::cli::callback_param params); + int command_handler_enable_discovery(atfw::util::cli::callback_param params); int command_handler_list_discovery(atfw::util::cli::callback_param params); private: @@ -786,9 +791,12 @@ class app { }; stats_data_t stats_; + std::shared_ptr curl_multi_guard_; + atfw::util::network::http_request::curl_m_bind_ptr_t curl_multi_; + // inner modules std::shared_ptr<::atframework::atapp::worker_pool_module> internal_module_worker_pool_; - std::shared_ptr<::atframework::atapp::etcd_module> internal_module_etcd_; + std::shared_ptr<::atframework::atapp::service_discovery_module> internal_module_service_discovery_; etcd_discovery_set internal_empty_discovery_set_; // inner endpoints diff --git a/include/atframe/etcdcli/etcd_discovery.h b/include/atframe/etcdcli/etcd_discovery.h index a633867..f98f8e7 100644 --- a/include/atframe/etcdcli/etcd_discovery.h +++ b/include/atframe/etcdcli/etcd_discovery.h @@ -49,10 +49,12 @@ class etcd_discovery_node { ATFW_UTIL_FORCEINLINE const atapp::protocol::atapp_discovery &get_discovery_info() const { return node_info_; } ATFW_UTIL_FORCEINLINE const node_version &get_version() const noexcept { return node_version_; } - LIBATAPP_MACRO_API void copy_from(const atapp::protocol::atapp_discovery &input, const node_version &version); - LIBATAPP_MACRO_API void update_version(const node_version &version, bool upgrade); + LIBATAPP_MACRO_API void copy_from(const atapp::protocol::atapp_discovery &input, const node_version &version, + uintptr_t context_ptr); + LIBATAPP_MACRO_API void update_version(const node_version &version, bool upgrade, uintptr_t context_ptr); LIBATAPP_MACRO_API void copy_to(atapp::protocol::atapp_discovery &output) const; LIBATAPP_MACRO_API void copy_key_to(atapp::protocol::atapp_discovery &output) const; + LIBATAPP_MACRO_API uintptr_t get_context_ptr() const noexcept { return context_ptr_; } ATFW_UTIL_FORCEINLINE const std::pair &get_name_hash() const noexcept { return name_hash_; } @@ -78,6 +80,7 @@ class etcd_discovery_node { private: atapp::protocol::atapp_discovery node_info_; node_version node_version_; + uintptr_t context_ptr_; std::pair name_hash_; union { diff --git a/include/atframe/modules/etcd_module.h b/include/atframe/modules/etcd_module.h index 9cfcf15..30fbdd9 100644 --- a/include/atframe/modules/etcd_module.h +++ b/include/atframe/modules/etcd_module.h @@ -6,293 +6,65 @@ #include -#include - -#include - -#include - #include #include -#include #include