From 6a3db4698d3369334cc2dc1b6a86862edcb17ae8 Mon Sep 17 00:00:00 2001 From: Ilya Repin Date: Wed, 4 Feb 2026 18:32:06 +0000 Subject: [PATCH 1/3] add dynconfig and get partition --- configs/config_vars.testing.yaml | 5 +- configs/config_vars.yaml | 5 +- configs/static_config.yaml | 28 ++++- src/api/http/v1/admin/get_partition.cpp | 47 ++++++++ src/api/http/v1/admin/get_partition.hpp | 34 ++++++ src/app/dto/admin/get_partition.hpp | 19 ++++ src/app/dto/leader/coordination.hpp | 1 + src/app/services/admin/admin_service.cpp | 6 + src/app/services/admin/admin_service.hpp | 4 + .../admin/get_partition/get_partition.cpp | 49 +++++++++ .../admin/get_partition/get_partition.hpp | 30 +++++ .../get_partition_map/get_partition_map.cpp | 2 - .../leader/coordination/coordination.cpp | 6 + src/core/common/hub_params.hpp | 2 +- src/core/hash_ring/hash_ring.cpp | 27 ++++- src/core/hash_ring/hash_ring.hpp | 1 + src/core/hash_ring/hash_ring_ut.cpp | 18 +++ src/core/partition/partition_map.cpp | 26 +++++ src/core/partition/partition_map.hpp | 2 + src/core/partition/partition_map_ut.cpp | 104 ++++++++++++++++++ src/infra/components/components.cpp | 7 +- .../coordination_gateway_component.cpp | 4 + .../coordination_repository_component.cpp | 6 +- .../leader/leader_dist_lock_component.cpp | 41 ++----- .../leader/leader_dist_lock_component.hpp | 3 + .../kesus_coordination_gateway.cpp | 18 ++- .../kesus_coordination_gateway.hpp | 6 +- .../ydb_coordination_repository.cpp | 28 +++-- .../ydb_coordination_repository.hpp | 7 +- src/infra/dynconfig/leader/leader_config.cpp | 100 +++++++++++++++++ src/infra/dynconfig/leader/leader_config.hpp | 72 ++++++++++++ .../dynconfig/leader/repository_config.cpp | 49 +++++++++ .../dynconfig/leader/repository_config.hpp | 50 +++++++++ tests/test_get_partition.py | 7 ++ tests/test_get_partition_map.py | 18 ++- 35 files changed, 762 insertions(+), 70 deletions(-) create mode 100644 src/api/http/v1/admin/get_partition.cpp create mode 100644 src/api/http/v1/admin/get_partition.hpp create mode 100644 src/app/dto/admin/get_partition.hpp create mode 100644 src/app/use_cases/admin/get_partition/get_partition.cpp create mode 100644 src/app/use_cases/admin/get_partition/get_partition.hpp create mode 100644 src/core/partition/partition_map.cpp create mode 100644 src/core/partition/partition_map_ut.cpp create mode 100644 src/infra/dynconfig/leader/leader_config.cpp create mode 100644 src/infra/dynconfig/leader/leader_config.hpp create mode 100644 src/infra/dynconfig/leader/repository_config.cpp create mode 100644 src/infra/dynconfig/leader/repository_config.hpp create mode 100644 tests/test_get_partition.py diff --git a/configs/config_vars.testing.yaml b/configs/config_vars.testing.yaml index 3d6d194..22d06ac 100644 --- a/configs/config_vars.testing.yaml +++ b/configs/config_vars.testing.yaml @@ -4,4 +4,7 @@ logger-level: debug is-testing: true -server-port: 8080 \ No newline at end of file +server-port: 8080 + +config-cache: ~/cache/ +config-server-url: http://localhost:8083 \ No newline at end of file diff --git a/configs/config_vars.yaml b/configs/config_vars.yaml index 8fb9718..1e8ce1f 100644 --- a/configs/config_vars.yaml +++ b/configs/config_vars.yaml @@ -4,4 +4,7 @@ logger-level: info is-testing: false -server-port: 8080 \ No newline at end of file +server-port: 8080 + +config-cache: ~/cache/ +config-server-url: http://localhost:8083 diff --git a/configs/static_config.yaml b/configs/static_config.yaml index 63959cd..b319944 100644 --- a/configs/static_config.yaml +++ b/configs/static_config.yaml @@ -24,13 +24,23 @@ components_manager: level: $logger-level overflow_behavior: discard # Drop logs if the system is too busy to write them down. - # Dynamic config options. Cache is disabled, updates are disabled. dynamic-config: - # For most of userver dynamic configs, defaults are used, some are overridden here. - # See userver "dynamic config" docs for what configs exist. - defaults: - HTTP_CLIENT_CONNECTION_POOL_SIZE: 1000 - + updates-enabled: true + fs-cache-path: $config-cache + fs-task-processor: fs-task-processor + + dynamic-config-client: + config-url: $config-server-url + http-retries: 5 + http-timeout: 20s + service-name: chat + + dynamic-config-client-updater: + config-settings: false + first-update-fail-ok: true + full-update-interval: 1m + update-interval: 5s + default-secdist-provider: config: secure_data.json @@ -119,6 +129,12 @@ components_manager: method: GET task_processor: main-task-processor + handler-get-partition: + load-enabled: true + path: /admin/partition + method: GET + task_processor: main-task-processor + handler-get-hub-reports: load-enabled: true path: /admin/hub_reports diff --git a/src/api/http/v1/admin/get_partition.cpp b/src/api/http/v1/admin/get_partition.cpp new file mode 100644 index 0000000..1340f68 --- /dev/null +++ b/src/api/http/v1/admin/get_partition.cpp @@ -0,0 +1,47 @@ +#include "get_partition.hpp" + +#include + +#include +#include + +#include + +#include + +namespace NCoordinator::NApi::NHandlers { + +//////////////////////////////////////////////////////////////////////////////// + +TGetPartitionHandler::TGetPartitionHandler( + const userver::components::ComponentConfig& config, + const userver::components::ComponentContext& context) + : HttpHandlerJsonBase(config, context) + , AdminService_(context.FindComponent().GetService()) +{ } + +userver::formats::json::Value TGetPartitionHandler::HandleRequestJsonThrow( + const userver::server::http::HttpRequest& request, + const userver::formats::json::Value& /*request_json*/, + userver::server::request::RequestContext& /*request_context*/) const +{ + NApp::NDto::TGetPartitionResponse result; + + const auto& channel = request.GetPathArg("channel"); + + try { + result = AdminService_.GetPartition(NApp::NDto::TGetPartitionRequest{channel}); + } catch (const NApp::NUseCase::TGetPartitionTemporaryUnavailable& ex) { + LOG_ERROR() << "Get partition unavailable: " << ex.what(); + throw TServerException("Get partition temporary unavailable"); + } + + userver::formats::json::ValueBuilder builder; + builder["partition"] = result.Partition.GetUnderlying(); + + return builder.ExtractValue(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NChat::NApi::NHandlers diff --git a/src/api/http/v1/admin/get_partition.hpp b/src/api/http/v1/admin/get_partition.hpp new file mode 100644 index 0000000..ce9fe52 --- /dev/null +++ b/src/api/http/v1/admin/get_partition.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include + +#include +#include +#include + +namespace NCoordinator::NApi::NHandlers { + +//////////////////////////////////////////////////////////////////////////////// + +class TGetPartitionHandler final + : public userver::server::handlers::HttpHandlerJsonBase +{ +public: + static constexpr std::string_view kName = "handler-get-partition"; + + TGetPartitionHandler( + const userver::components::ComponentConfig&, + const userver::components::ComponentContext&); + + userver::formats::json::Value HandleRequestJsonThrow( + const userver::server::http::HttpRequest& request, + const userver::formats::json::Value& request_json, + userver::server::request::RequestContext& context) const override; + +private: + NApp::NService::TAdminService& AdminService_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NChat::NApi::NHandlers diff --git a/src/app/dto/admin/get_partition.hpp b/src/app/dto/admin/get_partition.hpp new file mode 100644 index 0000000..036aff6 --- /dev/null +++ b/src/app/dto/admin/get_partition.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include + +namespace NCoordinator::NApp::NDto { + +//////////////////////////////////////////////////////////////////////////////// + +struct TGetPartitionRequest { + std::string ChannelId; +}; + +struct TGetPartitionResponse { + NCore::NDomain::TPartitionId Partition; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NApp::NDto diff --git a/src/app/dto/leader/coordination.hpp b/src/app/dto/leader/coordination.hpp index 0502f54..aa34cfd 100644 --- a/src/app/dto/leader/coordination.hpp +++ b/src/app/dto/leader/coordination.hpp @@ -8,6 +8,7 @@ namespace NCoordinator::NApp::NDto { //////////////////////////////////////////////////////////////////////////////// struct TCoordinationRequest { + std::size_t DefaultPartitionsAmount; NCore::NDomain::TStateBuildingSettings StateBuildingSettings; NCore::TBalancingSettings BalancingSettings; }; diff --git a/src/app/services/admin/admin_service.cpp b/src/app/services/admin/admin_service.cpp index c6fae23..06d16a6 100644 --- a/src/app/services/admin/admin_service.cpp +++ b/src/app/services/admin/admin_service.cpp @@ -10,6 +10,7 @@ TAdminService::TAdminService( NCore::NDomain::IHubGateway& hubGateway) : GetContextUseCase_(coordinationRepository) , GetPartitionMapUseCase_(coordinationGateway) + , GetPartitionUseCase_(coordinationGateway) , GetHubReportsUseCase_(coordinationGateway, hubGateway) { } @@ -23,6 +24,11 @@ NDto::TGetPartitionMapResponse TAdminService::GetPartitionMap() const return GetPartitionMapUseCase_.Execute(); } +NDto::TGetPartitionResponse TAdminService::GetPartition(const NDto::TGetPartitionRequest& request) const +{ + return GetPartitionUseCase_.Execute(request); +} + NDto::TGetHubReportsResponse TAdminService::GetHubReports() const { return GetHubReportsUseCase_.Execute(); diff --git a/src/app/services/admin/admin_service.hpp b/src/app/services/admin/admin_service.hpp index 0cd7831..d559168 100644 --- a/src/app/services/admin/admin_service.hpp +++ b/src/app/services/admin/admin_service.hpp @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -24,11 +26,13 @@ class TAdminService final NDto::TGetContextResponse GetCoordinationContext() const; NDto::TGetPartitionMapResponse GetPartitionMap() const; + NDto::TGetPartitionResponse GetPartition(const NDto::TGetPartitionRequest& request) const; NDto::TGetHubReportsResponse GetHubReports() const; private: NUseCase::TGetContextUseCase GetContextUseCase_; NUseCase::TGetPartitionMapUseCase GetPartitionMapUseCase_; + NUseCase::TGetPartitionUseCase GetPartitionUseCase_; NUseCase::TGetHubReportsUseCase GetHubReportsUseCase_; }; diff --git a/src/app/use_cases/admin/get_partition/get_partition.cpp b/src/app/use_cases/admin/get_partition/get_partition.cpp new file mode 100644 index 0000000..e34f160 --- /dev/null +++ b/src/app/use_cases/admin/get_partition/get_partition.cpp @@ -0,0 +1,49 @@ +#include "get_partition.hpp" + +#include +#include + +#include + +#include + +#include + +namespace NCoordinator::NApp::NUseCase { + +//////////////////////////////////////////////////////////////////////////////// + +TGetPartitionUseCase::TGetPartitionUseCase(NCore::NDomain::ICoordinationGateway& coordinationGateway) + : CoordinationGateway_(coordinationGateway) +{ } + +NDto::TGetPartitionResponse TGetPartitionUseCase::Execute(const NDto::TGetPartitionRequest& request) const +{ + NCore::NDomain::TPartitionMap partitionMap; + + try { + partitionMap = CoordinationGateway_.GetPartitionMap(); + } catch (std::exception& ex) { + throw TGetPartitionTemporaryUnavailable(fmt::format("Failed to get partition map: {}", ex.what())); + } + + std::vector partitions; + partitions.reserve(partitionMap.Partitions.size()); + + for (const auto& [partition, hub] : partitionMap.Partitions) { + partitions.push_back(partition); + } + + NCore::NDomain::THashRing hashRing(partitions); + auto partition = hashRing.GetPartition(request.ChannelId); + + NDto::TGetPartitionResponse response{ + .Partition = partition, + }; + + return response; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NApp::NUseCase diff --git a/src/app/use_cases/admin/get_partition/get_partition.hpp b/src/app/use_cases/admin/get_partition/get_partition.hpp new file mode 100644 index 0000000..d97b348 --- /dev/null +++ b/src/app/use_cases/admin/get_partition/get_partition.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include +#include +#include + +namespace NCoordinator::NApp::NUseCase { + +//////////////////////////////////////////////////////////////////////////////// + +class TGetPartitionTemporaryUnavailable + : public TApplicationException +{ + using TApplicationException::TApplicationException; +}; + +class TGetPartitionUseCase final +{ +public: + TGetPartitionUseCase(NCore::NDomain::ICoordinationGateway& coordinationGateway); + + NDto::TGetPartitionResponse Execute(const NDto::TGetPartitionRequest& request) const; + +private: + NCore::NDomain::ICoordinationGateway& CoordinationGateway_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NApp::NUseCase diff --git a/src/app/use_cases/admin/get_partition_map/get_partition_map.cpp b/src/app/use_cases/admin/get_partition_map/get_partition_map.cpp index ee47eb7..7580230 100644 --- a/src/app/use_cases/admin/get_partition_map/get_partition_map.cpp +++ b/src/app/use_cases/admin/get_partition_map/get_partition_map.cpp @@ -1,7 +1,5 @@ #include "get_partition_map.hpp" -#include - #include #include diff --git a/src/app/use_cases/leader/coordination/coordination.cpp b/src/app/use_cases/leader/coordination/coordination.cpp index e7c3c64..a73da0d 100644 --- a/src/app/use_cases/leader/coordination/coordination.cpp +++ b/src/app/use_cases/leader/coordination/coordination.cpp @@ -1,6 +1,7 @@ #include "coordination.hpp" #include +#include #include @@ -33,6 +34,11 @@ void TCoordinationUseCase::Execute(const NDto::TCoordinationRequest& request) co return; } + if (partitionMap.Partitions.empty()) { + auto startingPartitionMap = NCore::NDomain::BuildStartingPartitionMap(request.DefaultPartitionsAmount); + std::swap(partitionMap.Partitions, startingPartitionMap.Partitions); + } + auto coordinationContext = CoordinationRepository_.GetCoordinationContext(); auto coordinationState = NCore::NDomain::TCoordinationState( diff --git a/src/core/common/hub_params.hpp b/src/core/common/hub_params.hpp index c6d2936..6318725 100644 --- a/src/core/common/hub_params.hpp +++ b/src/core/common/hub_params.hpp @@ -21,7 +21,7 @@ using THubDC = userver::utils::StrongTypedef< using TLoadFactor = userver::utils::StrongTypedef< struct TLoadFactorTag, - std::uint64_t, + std::uint32_t, userver::utils::StrongTypedefOps::kCompareTransparent>; enum class EHubStatus { diff --git a/src/core/hash_ring/hash_ring.cpp b/src/core/hash_ring/hash_ring.cpp index 8d44a72..2ace167 100644 --- a/src/core/hash_ring/hash_ring.cpp +++ b/src/core/hash_ring/hash_ring.cpp @@ -13,6 +13,31 @@ THashRing::THashRing(const std::vector& partitions, HashFunction h LoadPartitions(partitions); } +THashRing::THashRing(std::size_t partitionCount, HashFunction hasher) + : Hasher_(std::move(hasher)) +{ + if (partitionCount == 0) { + throw std::invalid_argument("Partition count must be at least 1"); + } + + Partitions_.reserve(partitionCount); + + const std::uint64_t maxVal = std::numeric_limits::max(); + const std::uint64_t step = maxVal / partitionCount; + + for (std::size_t i = 1; i <= partitionCount; ++i) { + std::uint64_t boundary; + + if (i == partitionCount) { + boundary = maxVal; + } else { + boundary = step * i; + } + + Partitions_.emplace_back(boundary); + } +} + void THashRing::LoadPartitions(const std::vector& partitions) { if (partitions.empty()) { @@ -25,7 +50,7 @@ void THashRing::LoadPartitions(const std::vector& partitions) TPartitionId THashRing::GetPartition(const std::string& key) const { - const uint64_t hash = Hasher_(key); + const std::uint64_t hash = Hasher_(key); auto it = std::lower_bound(Partitions_.begin(), Partitions_.end(), hash); diff --git a/src/core/hash_ring/hash_ring.hpp b/src/core/hash_ring/hash_ring.hpp index 23ec077..172fb06 100644 --- a/src/core/hash_ring/hash_ring.hpp +++ b/src/core/hash_ring/hash_ring.hpp @@ -16,6 +16,7 @@ class THashRing { using HashFunction = std::function; explicit THashRing(const std::vector& partitions, HashFunction hasher = std::hash{}); + explicit THashRing(std::size_t partitionCount, HashFunction hasher = std::hash{}); void LoadPartitions(const std::vector& partitions); diff --git a/src/core/hash_ring/hash_ring_ut.cpp b/src/core/hash_ring/hash_ring_ut.cpp index 7cc7984..d81c164 100644 --- a/src/core/hash_ring/hash_ring_ut.cpp +++ b/src/core/hash_ring/hash_ring_ut.cpp @@ -31,6 +31,24 @@ TEST(THashRing, ThrowsOnEmptyPartitions) }, std::invalid_argument); } +TEST(THashRing, DistributesPartitionsEvenly) +{ + THashRing ring(2, MockHasher); + const auto& partitions = ring.GetAllPartitions(); + + ASSERT_EQ(partitions.size(), 2); + + std::uint64_t maxVal = std::numeric_limits::max(); + std::uint64_t expectedMid = maxVal / 2; + + std::string keyLow = std::to_string(expectedMid - 100); + std::string keyHigh = std::to_string(expectedMid + 100); + + EXPECT_EQ(ring.GetPartition(keyLow), partitions[0]); + + EXPECT_EQ(ring.GetPartition(keyHigh), partitions[1]); +} + TEST(THashRing, SortsAndDeduplicatesPartitions) { std::vector raw = { diff --git a/src/core/partition/partition_map.cpp b/src/core/partition/partition_map.cpp new file mode 100644 index 0000000..c4c9d57 --- /dev/null +++ b/src/core/partition/partition_map.cpp @@ -0,0 +1,26 @@ +#include "partition_map.hpp" + +#include + +namespace NCoordinator::NCore::NDomain { + +//////////////////////////////////////////////////////////////////////////////// + +TPartitionMap BuildStartingPartitionMap(std::size_t partitionsAmount) +{ + NCore::NDomain::THashRing hashRing(partitionsAmount); + auto partitions = hashRing.GetAllPartitions(); + + TPartitionMap result; + result.Partitions.reserve(partitions.size()); + + for (const auto& partition : partitions) { + result.Partitions.emplace_back(partition, NCore::NDomain::THubEndpoint{}); + } + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NCore::NDomain diff --git a/src/core/partition/partition_map.hpp b/src/core/partition/partition_map.hpp index 72c26d2..41687cb 100644 --- a/src/core/partition/partition_map.hpp +++ b/src/core/partition/partition_map.hpp @@ -17,6 +17,8 @@ struct TPartitionMap { TEpoch Epoch; }; +TPartitionMap BuildStartingPartitionMap(std::size_t partitionsAmount); + //////////////////////////////////////////////////////////////////////////////// } // namespace NCoordinator::NCore::NDomain diff --git a/src/core/partition/partition_map_ut.cpp b/src/core/partition/partition_map_ut.cpp new file mode 100644 index 0000000..d757225 --- /dev/null +++ b/src/core/partition/partition_map_ut.cpp @@ -0,0 +1,104 @@ +#include "partition_map.hpp" + +#include +#include + +#include + +#include +#include +#include + +namespace NCoordinator::NCore::NDomain { + +//////////////////////////////////////////////////////////////////////////////// + +TEST(TPartitionMap, ThrowsOnZeroPartitions) +{ + EXPECT_THROW({ + BuildStartingPartitionMap(0); + }, std::invalid_argument); +} + +TEST(TPartitionMap, OnePartition) +{ + const auto result = BuildStartingPartitionMap(1); + + ASSERT_EQ(result.Partitions.size(), 1); + ASSERT_EQ(result.Epoch.GetUnderlying(), 0); + + const auto& [partition, hub] = result.Partitions.front(); + + EXPECT_EQ(partition.GetUnderlying(), + std::numeric_limits::max()); + EXPECT_TRUE(hub.empty()); +} + +TEST(TPartitionMap, CreatesCorrectAmountOfPartitions) +{ + constexpr std::size_t partitionsAmount = 10; + + const auto result = BuildStartingPartitionMap(partitionsAmount); + + ASSERT_EQ(result.Partitions.size(), partitionsAmount); + ASSERT_EQ(result.Epoch.GetUnderlying(), 0); + + for (const auto& [partition, hub] : result.Partitions) { + EXPECT_TRUE(hub.empty()); + } +} + +TEST(TPartitionMap, PartitionsAreSortedByBoundary) +{ + constexpr std::size_t partitionsAmount = 8; + + const auto result = BuildStartingPartitionMap(partitionsAmount); + + ASSERT_EQ(result.Epoch.GetUnderlying(), 0); + + for (std::size_t i = 1; i < result.Partitions.size(); ++i) { + EXPECT_LT( + result.Partitions[i - 1].first.GetUnderlying(), + result.Partitions[i].first.GetUnderlying() + ); + } +} + +TEST(TPartitionMap, LastPartitionHasMaxBoundary) +{ + constexpr std::size_t partitionsAmount = 16; + + const auto result = BuildStartingPartitionMap(partitionsAmount); + + ASSERT_FALSE(result.Partitions.empty()); + ASSERT_EQ(result.Epoch.GetUnderlying(), 0); + + EXPECT_EQ( + result.Partitions.back().first.GetUnderlying(), + std::numeric_limits::max() + ); +} + +TEST(TPartitionMap, UsesHashRingPartitionsAsIs) +{ + constexpr std::size_t partitionsAmount = 5; + + NCore::NDomain::THashRing hashRing(partitionsAmount); + const auto expectedPartitions = hashRing.GetAllPartitions(); + + const auto result = BuildStartingPartitionMap(partitionsAmount); + + ASSERT_EQ(result.Epoch.GetUnderlying(), 0); + ASSERT_EQ(result.Partitions.size(), expectedPartitions.size()); + + for (std::size_t i = 0; i < result.Partitions.size(); ++i) { + EXPECT_EQ( + result.Partitions[i].first.GetUnderlying(), + expectedPartitions[i].GetUnderlying() + ); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NCore::NDomain diff --git a/src/infra/components/components.cpp b/src/infra/components/components.cpp index f3e67f6..d5bf100 100644 --- a/src/infra/components/components.cpp +++ b/src/infra/components/components.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ void RegisterUserverComponents(userver::components::ComponentList& list) list.Append() .Append() .AppendComponentList(userver::clients::http::ComponentList()) + .AppendComponentList(userver::dynamic_config::updater::ComponentList()) .Append() .Append() .Append(); @@ -66,8 +68,9 @@ void RegisterServices(userver::components::ComponentList& list) void RegisterHandlers(userver::components::ComponentList& list) { list.Append() - .Append() - .Append(); + .Append() + .Append() + .Append(); } } // namespace NCoordinator::NInfra::NComponents diff --git a/src/infra/components/coordination/coordination_gateway_component.cpp b/src/infra/components/coordination/coordination_gateway_component.cpp index 7a91cbd..a135edd 100644 --- a/src/infra/components/coordination/coordination_gateway_component.cpp +++ b/src/infra/components/coordination/coordination_gateway_component.cpp @@ -4,6 +4,8 @@ #include #include +#include +#include #include #include @@ -23,9 +25,11 @@ TCoordinationGatewayComponent::TCoordinationGatewayComponent( auto initialSetup = config["initial-setup"].As(true); auto coordinationClient = context.FindComponent().GetCoordinationClient(dbname); + auto configSource = context.FindComponent().GetSource(); Gateway_ = std::make_unique( std::move(coordinationClient), + std::move(configSource), coordinationNode, partitionMapSemaphore, discoverySemaphore, diff --git a/src/infra/components/coordination/coordination_repository_component.cpp b/src/infra/components/coordination/coordination_repository_component.cpp index 2d31f9a..b19ca20 100644 --- a/src/infra/components/coordination/coordination_repository_component.cpp +++ b/src/infra/components/coordination/coordination_repository_component.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -19,8 +20,11 @@ TCoordinationRepositoryComponent::TCoordinationRepositoryComponent( auto dbname = config["dbname"].As(); auto tableClient = context.FindComponent().GetTableClient(dbname); + auto configSource = context.FindComponent().GetSource(); - Repository_ = std::make_unique(std::move(tableClient)); + Repository_ = std::make_unique( + std::move(tableClient), + std::move(configSource)); } NCore::NDomain::ICoordinationRepository& TCoordinationRepositoryComponent::GetRepository() diff --git a/src/infra/components/leader/leader_dist_lock_component.cpp b/src/infra/components/leader/leader_dist_lock_component.cpp index 880edf3..308b840 100644 --- a/src/infra/components/leader/leader_dist_lock_component.cpp +++ b/src/infra/components/leader/leader_dist_lock_component.cpp @@ -2,22 +2,15 @@ #include "leader_service_component.hpp" +#include + #include +#include #include #include #include -namespace { - -//////////////////////////////////////////////////////////////////////////////// - -const auto DEFAULT_COORDINATION_PAUSE_SECONDS = std::chrono::seconds(10); // TODO replace with dynamic config - -//////////////////////////////////////////////////////////////////////////////// - -} // anonymous namespace - namespace NCoordinator::NInfra::NComponents { //////////////////////////////////////////////////////////////////////////////// @@ -27,6 +20,7 @@ TLeaderDistLockComponent::TLeaderDistLockComponent( const userver::components::ComponentContext& context) : DistLockComponentBase(config, context) , Service_(context.FindComponent().GetService()) + , ConfigSource_(context.FindComponent().GetSource()) { Start(); } @@ -38,32 +32,17 @@ TLeaderDistLockComponent::~TLeaderDistLockComponent() void TLeaderDistLockComponent::DoWork() { while (!userver::engine::current_task::ShouldCancel()) { - - // TODO replace to settings from dynconfig - NCore::NDomain::TStateBuildingSettings stateBuildingSettings{ - .BlockedDCs = {}, - .BlockedHubs = {}, - .OverloadThreshold = NCore::NDomain::TLoadFactor{90}, - }; - - NCore::TBalancingSettings balancingSettings{ - .MaxRebalancePhases = 5, - .MigratingWeightLimit = NCore::NDomain::TPartitionWeight{1500}, - .MinLoadFactorDelta = NCore::NDomain::TLoadFactor{10}, - .MigrationBudgetThreshold = NCore::NDomain::TPartitionWeight{100}, - .BalancingThresholdCV = 25, - .BalancingTargetCV = 5, - .MinMigrationCooldown = NCore::NDomain::TEpoch{5}, - .MigrationWeightPenaltyCoeff = 0.5, - }; + const auto snapshot = ConfigSource_.GetSnapshot(); + const auto config = snapshot[LEADER_CONFIG]; NApp::NDto::TCoordinationRequest request{ - .StateBuildingSettings = std::move(stateBuildingSettings), - .BalancingSettings = std::move(balancingSettings), + .DefaultPartitionsAmount = config.DefaultPartitionsAmount, + .StateBuildingSettings = config.StateBuildingSettings, + .BalancingSettings = config.BalancingSettings, }; Service_.Coordinate(std::move(request)); - userver::engine::InterruptibleSleepFor(DEFAULT_COORDINATION_PAUSE_SECONDS); // TODO replace with dynamic config + userver::engine::InterruptibleSleepFor(config.CoordinationPeriod); } } diff --git a/src/infra/components/leader/leader_dist_lock_component.hpp b/src/infra/components/leader/leader_dist_lock_component.hpp index a76a22a..6b7ad34 100644 --- a/src/infra/components/leader/leader_dist_lock_component.hpp +++ b/src/infra/components/leader/leader_dist_lock_component.hpp @@ -3,6 +3,7 @@ #include #include +#include namespace NCoordinator::NInfra::NComponents { @@ -24,6 +25,8 @@ class TLeaderDistLockComponent private: NApp::NService::TLeaderService& Service_; + + userver::dynamic_config::Source ConfigSource_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/coordination_gateway/kesus_coordination_gateway.cpp b/src/infra/coordination_gateway/kesus_coordination_gateway.cpp index e180ca2..53c6a7e 100644 --- a/src/infra/coordination_gateway/kesus_coordination_gateway.cpp +++ b/src/infra/coordination_gateway/kesus_coordination_gateway.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -18,7 +19,6 @@ namespace { //////////////////////////////////////////////////////////////////////////////// -constexpr inline std::string_view DEFAULT_PARTITION_MAP_DATA = "{\"partitions\": [],\"epoch\": 0}"; constexpr inline std::uint64_t PARTITION_MAP_SEMAPHORE_LIMIT = 1; constexpr inline std::uint64_t DISCOVERY_SEMAPHORE_LIMIT = std::numeric_limits::max(); @@ -32,13 +32,15 @@ namespace NCoordinator::NInfra::NGateway { TKesusCoordinationGateway::TKesusCoordinationGateway( std::shared_ptr coordinationClient, + userver::dynamic_config::Source configSource, const std::string& coordinationNode, const std::string& partitionMapSemaphore, const std::string& discoverySemaphore, const bool initialSetup) - : PartitionMapSemaphore_(partitionMapSemaphore) + : CoordinationSession_(nullptr) + , ConfigSource_(std::move(configSource)) + , PartitionMapSemaphore_(partitionMapSemaphore) , DiscoverySemaphore_(discoverySemaphore) - , CoordinationSession_(nullptr) { if (initialSetup) { InitialSetup(*coordinationClient, coordinationNode); @@ -109,11 +111,17 @@ void TKesusCoordinationGateway::InitialSetup( auto session = coordinationClient.StartSession(coordinationNode, {}); try { + const auto snapshot = ConfigSource_.GetSnapshot(); + const auto config = snapshot[LEADER_CONFIG]; + auto partitionMap = NCore::NDomain::BuildStartingPartitionMap(config.DefaultPartitionsAmount); + session.CreateSemaphore( PartitionMapSemaphore_, PARTITION_MAP_SEMAPHORE_LIMIT); - // std::string{DEFAULT_PARTITION_MAP_DATA}); - session.UpdateSemaphore(PartitionMapSemaphore_, std::string{DEFAULT_PARTITION_MAP_DATA}); + // userver::formats::json::ToStableString(SerializePartitionMap(partitionMap))); + session.UpdateSemaphore( + PartitionMapSemaphore_, + userver::formats::json::ToStableString(SerializePartitionMap(partitionMap))); } catch (const userver::ydb::YdbResponseError& ex) { LOG_WARNING() << "Could not create partition map semaphore: " << ex; } diff --git a/src/infra/coordination_gateway/kesus_coordination_gateway.hpp b/src/infra/coordination_gateway/kesus_coordination_gateway.hpp index eec6bf4..df9d813 100644 --- a/src/infra/coordination_gateway/kesus_coordination_gateway.hpp +++ b/src/infra/coordination_gateway/kesus_coordination_gateway.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -19,6 +20,7 @@ class TKesusCoordinationGateway public: TKesusCoordinationGateway( std::shared_ptr coordinationClient, + userver::dynamic_config::Source configSource, const std::string& coordinationNode, const std::string& partitionMapSemaphore, const std::string& discoverySemaphore, @@ -35,9 +37,11 @@ class TKesusCoordinationGateway const std::string& coordinationNode) const; private: + std::unique_ptr CoordinationSession_; + userver::dynamic_config::Source ConfigSource_; + const std::string PartitionMapSemaphore_; const std::string DiscoverySemaphore_; - std::unique_ptr CoordinationSession_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/coordination_repository/ydb_coordination_repository.cpp b/src/infra/coordination_repository/ydb_coordination_repository.cpp index 725b757..5c1818c 100644 --- a/src/infra/coordination_repository/ydb_coordination_repository.cpp +++ b/src/infra/coordination_repository/ydb_coordination_repository.cpp @@ -2,6 +2,7 @@ #include #include +#include #include @@ -63,19 +64,19 @@ namespace NCoordinator::NInfra::NRepository { //////////////////////////////////////////////////////////////////////////////// -TYdbCoordinationRepository::TYdbCoordinationRepository(std::shared_ptr ydbClient) +TYdbCoordinationRepository::TYdbCoordinationRepository( + std::shared_ptr ydbClient, + userver::dynamic_config::Source configSource) : YdbClient_(std::move(ydbClient)) + , ConfigSource_(std::move(configSource)) { } NCore::NDomain::TCoordinationContext TYdbCoordinationRepository::GetCoordinationContext() const { - const userver::ydb::OperationSettings queryParams = { - 3, // retries - TODO take from dynconfig - std::chrono::milliseconds(1000), // operation_timeout - TODO take from dynconfig - std::chrono::milliseconds(1000), // cancel_after - TODO take from dynconfig - std::chrono::milliseconds(1100), // client_timeout - TODO take from dynconfig - userver::ydb::TransactionMode::kOnlineRO - }; + const auto snapshot = ConfigSource_.GetSnapshot(); + + userver::ydb::OperationSettings queryParams = snapshot[REPOSITORY_CONFIG].GetContextSettings; + queryParams.tx_mode = userver::ydb::TransactionMode::kOnlineRO; auto response = YdbClient_->ExecuteDataQuery( queryParams, @@ -140,13 +141,10 @@ void TYdbCoordinationRepository::SetCoordinationContext(const NCore::NDomain::TC activeIds.emplace_back(partitionId); } - const userver::ydb::OperationSettings queryParams = { - 3, // retries - TODO take from dynconfig - std::chrono::milliseconds(1000), // operation_timeout - TODO take from dynconfig - std::chrono::milliseconds(1000), // cancel_after - TODO take from dynconfig - std::chrono::milliseconds(1100), // client_timeout - TODO take from dynconfig - userver::ydb::TransactionMode::kSerializableRW - }; + const auto snapshot = ConfigSource_.GetSnapshot(); + + userver::ydb::OperationSettings queryParams = snapshot[REPOSITORY_CONFIG].SetContextSettings; + queryParams.tx_mode = userver::ydb::TransactionMode::kSerializableRW; auto response = YdbClient_->ExecuteDataQuery( queryParams, diff --git a/src/infra/coordination_repository/ydb_coordination_repository.hpp b/src/infra/coordination_repository/ydb_coordination_repository.hpp index 998c274..c02017a 100644 --- a/src/infra/coordination_repository/ydb_coordination_repository.hpp +++ b/src/infra/coordination_repository/ydb_coordination_repository.hpp @@ -3,6 +3,7 @@ #include #include +#include #include namespace NCoordinator::NInfra::NRepository { @@ -13,14 +14,16 @@ class TYdbCoordinationRepository : public NCore::NDomain::ICoordinationRepository { public: - TYdbCoordinationRepository(std::shared_ptr ydbClient); + TYdbCoordinationRepository( + std::shared_ptr ydbClient, + userver::dynamic_config::Source configSource); NCore::NDomain::TCoordinationContext GetCoordinationContext() const override; void SetCoordinationContext(const NCore::NDomain::TCoordinationContext& context) const override; private: std::shared_ptr YdbClient_; - // TODO addd dynconfig + userver::dynamic_config::Source ConfigSource_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/dynconfig/leader/leader_config.cpp b/src/infra/dynconfig/leader/leader_config.cpp new file mode 100644 index 0000000..2f8f8dc --- /dev/null +++ b/src/infra/dynconfig/leader/leader_config.cpp @@ -0,0 +1,100 @@ +#include "leader_config.hpp" + +#include +#include + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +NCore::NDomain::THubEndpoint Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + return NCore::NDomain::THubEndpoint{ + value.As() + }; +} + +NCore::NDomain::THubDC Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + return NCore::NDomain::THubDC{ + value.As() + }; +} + +NCore::NDomain::TEpoch Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + return NCore::NDomain::TEpoch{ + value.As() + }; +} + +NCore::NDomain::TPartitionWeight Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + return NCore::NDomain::TPartitionWeight{ + value.As() + }; +} + +NCore::NDomain::TLoadFactor Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + return NCore::NDomain::TLoadFactor{ + value.As() + }; +} + +TLeaderSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + const auto& state = value["state_building_settings"]; + const auto& balancing = value["balancing_settings"]; + + NCore::NDomain::TStateBuildingSettings stateSettings{ + .BlockedDCs = state["blocked_dcs"] + .As>(), + .BlockedHubs = state["blocked_hubs"] + .As>(), + .OverloadThreshold = NCore::NDomain::TLoadFactor{state["overload_threshold"] + .As()}, + }; + + NCore::TBalancingSettings balancingSettings{ + .MaxRebalancePhases = balancing["max_rebalance_phases"] + .As(), + .MigratingWeightLimit = balancing["migrating_weight_limit"] + .As(), + .MinLoadFactorDelta = balancing["min_load_factor_delta"] + .As(), + .MigrationBudgetThreshold = balancing["migration_budget_threshold"] + .As(), + .BalancingThresholdCV = balancing["balancing_threshold_cv"] + .As(), + .BalancingTargetCV = balancing["balancing_target_cv"] + .As(), + .MinMigrationCooldown = balancing["min_migration_cooldown"] + .As(), + .MigrationWeightPenaltyCoeff = balancing["migration_weight_penalty_coeff"] + .As(), + }; + + return TLeaderSettings{ + std::chrono::seconds{value["coordination_period_seconds"].As()}, + value["default_partitions_amount"].As(), + std::move(stateSettings), + std::move(balancingSettings), + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/src/infra/dynconfig/leader/leader_config.hpp b/src/infra/dynconfig/leader/leader_config.hpp new file mode 100644 index 0000000..ed01dc3 --- /dev/null +++ b/src/infra/dynconfig/leader/leader_config.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include +#include + +#include +#include + +#include + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +struct TLeaderSettings { + std::chrono::seconds CoordinationPeriod{10}; + std::size_t DefaultPartitionsAmount{100}; + NCore::NDomain::TStateBuildingSettings StateBuildingSettings; + NCore::TBalancingSettings BalancingSettings; +}; + +inline const userver::dynamic_config::Key LEADER_CONFIG{ + "LEADER_CONFIG", + userver::dynamic_config::DefaultAsJsonString{R"( +{ + "coordination_period_seconds": 10, + "default_partitions_amount": 100, + "state_building_settings": { + "blocked_dcs": [], + "blocked_hubs": [], + "overload_threshold": 90 + }, + "balancing_settings": { + "max_rebalance_phases": 5, + "migrating_weight_limit": 1500, + "min_load_factor_delta": 10, + "migration_budget_threshold": 100, + "balancing_threshold_cv": 25, + "balancing_target_cv": 5, + "min_migration_cooldown": 5, + "migration_weight_penalty_coeff": 0.5 + } +} +)"}}; + +NCore::NDomain::THubEndpoint Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +NCore::NDomain::THubDC Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +NCore::NDomain::TEpoch Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +NCore::NDomain::TPartitionWeight Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +NCore::NDomain::TLoadFactor Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +TLeaderSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/src/infra/dynconfig/leader/repository_config.cpp b/src/infra/dynconfig/leader/repository_config.cpp new file mode 100644 index 0000000..840d97a --- /dev/null +++ b/src/infra/dynconfig/leader/repository_config.cpp @@ -0,0 +1,49 @@ +#include "repository_config.hpp" + +#include +#include +#include + +namespace userver::ydb { + +//////////////////////////////////////////////////////////////////////////////// + +OperationSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + OperationSettings result; + result.retries = value["retries"].As(); + result.operation_timeout_ms = value["operation-timeout"].As(); + result.cancel_after_ms = value["cancel-after"].As(); + result.client_timeout_ms = value["client-timeout"].As(); + result.get_session_timeout_ms = value["get-session-timeout"].As(); + result.tx_mode = userver::ydb::TransactionMode::kSerializableRW; + + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace userver::ydb + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +TRepositorySettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + const auto getContextSettings = value["get_context_settings"].As(); + const auto setContextSettings = value["set_context_settings"].As(); + + return TRepositorySettings{ + .GetContextSettings = getContextSettings, + .SetContextSettings = setContextSettings, + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/src/infra/dynconfig/leader/repository_config.hpp b/src/infra/dynconfig/leader/repository_config.hpp new file mode 100644 index 0000000..a41f03e --- /dev/null +++ b/src/infra/dynconfig/leader/repository_config.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include +#include + +#include +#include +#include + +#include + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +struct TRepositorySettings { + userver::ydb::OperationSettings GetContextSettings; + userver::ydb::OperationSettings SetContextSettings; +}; + +inline const userver::dynamic_config::Key REPOSITORY_CONFIG{ + "REPOSITORY_CONFIG", + userver::dynamic_config::DefaultAsJsonString{R"( +{ + "get_context_settings": { + "operation_timeout_ms": 3, + "cancel_after_ms": 1000, + "client_timeout_ms": 1100, + "get_session_timeout_ms": 1000 + }, + "set_context_settings": { + "operation_timeout_ms": 3, + "cancel_after_ms": 1000, + "client_timeout_ms": 1100, + "get_session_timeout_ms": 1000 + } +} +)"}}; + +userver::ydb::OperationSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +TRepositorySettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/tests/test_get_partition.py b/tests/test_get_partition.py new file mode 100644 index 0000000..2999e8b --- /dev/null +++ b/tests/test_get_partition.py @@ -0,0 +1,7 @@ +import pytest + +async def test_get_partition(service_client): + response = await service_client.get('/admin/partition?channel=test') + assert response.status == 200 + assert 'application/json' in response.headers['Content-Type'] + assert response.text == '{"partition":6271892985061247544}' diff --git a/tests/test_get_partition_map.py b/tests/test_get_partition_map.py index b1fe4ca..25e935b 100644 --- a/tests/test_get_partition_map.py +++ b/tests/test_get_partition_map.py @@ -4,4 +4,20 @@ async def test_get_partition_map(service_client): response = await service_client.get('/admin/partition_map') assert response.status == 200 assert 'application/json' in response.headers['Content-Type'] - assert response.text == '{"partition_map":{"partitions":[],"epoch":0}}' + + data = response.json() + + assert 'partition_map' in data + assert 'partitions' in data['partition_map'] + assert 'epoch' in data['partition_map'] + + partitions = data['partition_map']['partitions'] + assert len(partitions) == 100 + + for partition in partitions: + assert 'id' in partition + assert isinstance(partition['id'], int) + assert 'hub' in partition + assert isinstance(partition['hub'], str) + + assert data['partition_map']['epoch'] == 0 \ No newline at end of file From 5294f0c6676a1b1acaa73661a8651d33f3094fab Mon Sep 17 00:00:00 2001 From: Ilya Repin Date: Thu, 5 Feb 2026 18:36:49 +0000 Subject: [PATCH 2/3] add predictor config --- .../load_factor_predictor_component.cpp | 4 +- .../ydb_coordination_repository.cpp | 2 +- .../dynconfig/predictor/predictor_config.cpp | 28 +++++++++ .../dynconfig/predictor/predictor_config.hpp | 30 ++++++++++ .../repository_config.cpp | 0 .../repository_config.hpp | 0 .../heuristic_predictor.cpp | 9 ++- .../heuristic_predictor.hpp | 6 +- .../heuristic_predictor_ut.cpp | 59 +++++++++++++++---- 9 files changed, 124 insertions(+), 14 deletions(-) create mode 100644 src/infra/dynconfig/predictor/predictor_config.cpp create mode 100644 src/infra/dynconfig/predictor/predictor_config.hpp rename src/infra/dynconfig/{leader => repository}/repository_config.cpp (100%) rename src/infra/dynconfig/{leader => repository}/repository_config.hpp (100%) diff --git a/src/infra/components/partition_balancing/load_factor_predictor_component.cpp b/src/infra/components/partition_balancing/load_factor_predictor_component.cpp index 612b198..c7af302 100644 --- a/src/infra/components/partition_balancing/load_factor_predictor_component.cpp +++ b/src/infra/components/partition_balancing/load_factor_predictor_component.cpp @@ -4,6 +4,7 @@ #include #include +#include #include namespace NCoordinator::NInfra::NComponents { @@ -17,8 +18,9 @@ TLoadFactorPredictorComponent::TLoadFactorPredictorComponent( { // TODO other types of predictors // auto type = config["type"].As(); + auto configSource = context.FindComponent().GetSource(); - Predictor_ = std::make_unique(); + Predictor_ = std::make_unique(std::move(configSource)); } NCore::ILoadFactorPredictor& TLoadFactorPredictorComponent::GetPredictor() diff --git a/src/infra/coordination_repository/ydb_coordination_repository.cpp b/src/infra/coordination_repository/ydb_coordination_repository.cpp index 5c1818c..d153110 100644 --- a/src/infra/coordination_repository/ydb_coordination_repository.cpp +++ b/src/infra/coordination_repository/ydb_coordination_repository.cpp @@ -2,7 +2,7 @@ #include #include -#include +#include #include diff --git a/src/infra/dynconfig/predictor/predictor_config.cpp b/src/infra/dynconfig/predictor/predictor_config.cpp new file mode 100644 index 0000000..e2830a7 --- /dev/null +++ b/src/infra/dynconfig/predictor/predictor_config.cpp @@ -0,0 +1,28 @@ +#include "predictor_config.hpp" + +#include + +#include +#include +#include + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +TPredictorSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To) +{ + const auto defaultFirstLoadFactor = NCore::NDomain::TLoadFactor{ + value["default_first_load_factor"].As() + }; + + return TPredictorSettings{ + .DefaultFirstLoadFactor = defaultFirstLoadFactor, + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/src/infra/dynconfig/predictor/predictor_config.hpp b/src/infra/dynconfig/predictor/predictor_config.hpp new file mode 100644 index 0000000..f0522c5 --- /dev/null +++ b/src/infra/dynconfig/predictor/predictor_config.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include + +#include +#include + +namespace NCoordinator::NInfra { + +//////////////////////////////////////////////////////////////////////////////// + +struct TPredictorSettings { + NCore::NDomain::TLoadFactor DefaultFirstLoadFactor; +}; + +inline const userver::dynamic_config::Key PREDICTOR_CONFIG{ + "PREDICTOR_CONFIG", + userver::dynamic_config::DefaultAsJsonString{R"( +{ + "default_first_load_factor": 5 +} +)"}}; + +TPredictorSettings Parse( + const userver::formats::json::Value& value, + userver::formats::parse::To); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NCoordinator::NInfra diff --git a/src/infra/dynconfig/leader/repository_config.cpp b/src/infra/dynconfig/repository/repository_config.cpp similarity index 100% rename from src/infra/dynconfig/leader/repository_config.cpp rename to src/infra/dynconfig/repository/repository_config.cpp diff --git a/src/infra/dynconfig/leader/repository_config.hpp b/src/infra/dynconfig/repository/repository_config.hpp similarity index 100% rename from src/infra/dynconfig/leader/repository_config.hpp rename to src/infra/dynconfig/repository/repository_config.hpp diff --git a/src/infra/load_factor_predictor/heuristic_predictor.cpp b/src/infra/load_factor_predictor/heuristic_predictor.cpp index b8dfaab..6cfd161 100644 --- a/src/infra/load_factor_predictor/heuristic_predictor.cpp +++ b/src/infra/load_factor_predictor/heuristic_predictor.cpp @@ -1,6 +1,7 @@ #include "heuristic_predictor.hpp" #include +#include #include #include @@ -10,6 +11,10 @@ namespace NCoordinator::NInfra { //////////////////////////////////////////////////////////////////////////////// +THeuristicPredictor::THeuristicPredictor(userver::dynamic_config::Source configSource) + : ConfigSource_(configSource) +{ } + NCore::NDomain::TLoadFactor THeuristicPredictor::PredictLoadFactor( const NCore::TPredictionParams& params) const { @@ -17,7 +22,9 @@ NCore::NDomain::TLoadFactor THeuristicPredictor::PredictLoadFactor( if (currentTotalWeight <= std::numeric_limits::epsilon() || params.TotalPartitions == 0) { if (params.Increasing) { - return DefaultFirstLoadFactor; + const auto snapshot = ConfigSource_.GetSnapshot(); + auto loadFactor = snapshot[PREDICTOR_CONFIG].DefaultFirstLoadFactor; + return loadFactor; } else { return NCore::NDomain::TLoadFactor{0}; } diff --git a/src/infra/load_factor_predictor/heuristic_predictor.hpp b/src/infra/load_factor_predictor/heuristic_predictor.hpp index 12cd8ca..009d083 100644 --- a/src/infra/load_factor_predictor/heuristic_predictor.hpp +++ b/src/infra/load_factor_predictor/heuristic_predictor.hpp @@ -3,6 +3,8 @@ #include #include +#include + namespace NCoordinator::NInfra { //////////////////////////////////////////////////////////////////////////////// @@ -11,10 +13,12 @@ class THeuristicPredictor : public NCore::ILoadFactorPredictor { public: + explicit THeuristicPredictor(userver::dynamic_config::Source configSource); + NCore::NDomain::TLoadFactor PredictLoadFactor(const NCore::TPredictionParams& params) const override; private: - NCore::NDomain::TLoadFactor DefaultFirstLoadFactor{5}; // TODO replace with dynconfig + userver::dynamic_config::Source ConfigSource_; }; //////////////////////////////////////////////////////////////////////////////// diff --git a/src/infra/load_factor_predictor/heuristic_predictor_ut.cpp b/src/infra/load_factor_predictor/heuristic_predictor_ut.cpp index 749363e..543693b 100644 --- a/src/infra/load_factor_predictor/heuristic_predictor_ut.cpp +++ b/src/infra/load_factor_predictor/heuristic_predictor_ut.cpp @@ -1,5 +1,10 @@ #include "heuristic_predictor.hpp" +#include + +#include +#include + #include namespace { @@ -28,6 +33,8 @@ TPredictionParams MakeParams( return p; } +const auto DEFAULT_JSON = userver::formats::json::FromString(R"( {"default_first_load_factor": 5} )"); + //////////////////////////////////////////////////////////////////////////////// } // anonymous namespace @@ -38,7 +45,11 @@ namespace NCoordinator::NInfra { TEST(THeuristicPredictor, LinearGrowth) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; auto params = MakeParams(50, 100, 20, true); auto result = predictor.PredictLoadFactor(params); @@ -48,7 +59,11 @@ TEST(THeuristicPredictor, LinearGrowth) TEST(THeuristicPredictor, LinearDecline) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; auto params = MakeParams(50, 100, 20, false); auto result = predictor.PredictLoadFactor(params); @@ -58,7 +73,11 @@ TEST(THeuristicPredictor, LinearDecline) TEST(THeuristicPredictor, FirstPartitionInsertionUsesDefault) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; TPredictionParams params; params.LoadFactor = TLoadFactor{0}; @@ -74,7 +93,11 @@ TEST(THeuristicPredictor, FirstPartitionInsertionUsesDefault) TEST(THeuristicPredictor, ClampsToMaxLoadFactor) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; auto params = MakeParams(80, 100, 50, true); auto result = predictor.PredictLoadFactor(params); @@ -84,7 +107,11 @@ TEST(THeuristicPredictor, ClampsToMaxLoadFactor) TEST(THeuristicPredictor, ClampsToZeroLoadFactor) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; auto params = MakeParams(10, 100, 100, false); auto result = predictor.PredictLoadFactor(params); @@ -94,7 +121,11 @@ TEST(THeuristicPredictor, ClampsToZeroLoadFactor) TEST(THeuristicPredictor, RoundsUpPessimistically) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; auto params = MakeParams(10, 100, 1, true); auto result = predictor.PredictLoadFactor(params); @@ -103,9 +134,13 @@ TEST(THeuristicPredictor, RoundsUpPessimistically) } TEST(THeuristicPredictor, HandlesZeroDeltaWeight) -{ - const THeuristicPredictor predictor; - +{ + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; + auto params = MakeParams(50, 100, 0, true); auto result = predictor.PredictLoadFactor(params); @@ -114,7 +149,11 @@ TEST(THeuristicPredictor, HandlesZeroDeltaWeight) TEST(THeuristicPredictor, HandlesRemovalFromEmptySafeGuard) { - const THeuristicPredictor predictor; + userver::dynamic_config::StorageMock storage{ + {PREDICTOR_CONFIG, DEFAULT_JSON}, + }; + + THeuristicPredictor predictor{storage.GetSource()}; TPredictionParams params; params.PartitionsWeight = TPartitionWeight{0}; From d8a2e0fed04678ab40188a53bcdd5c532289ed0a Mon Sep 17 00:00:00 2001 From: Ilya Repin Date: Thu, 5 Feb 2026 18:38:32 +0000 Subject: [PATCH 3/3] fix --- tests/test_get_partition_map.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_get_partition_map.py b/tests/test_get_partition_map.py index 25e935b..9210eba 100644 --- a/tests/test_get_partition_map.py +++ b/tests/test_get_partition_map.py @@ -20,4 +20,4 @@ async def test_get_partition_map(service_client): assert 'hub' in partition assert isinstance(partition['hub'], str) - assert data['partition_map']['epoch'] == 0 \ No newline at end of file + assert data['partition_map']['epoch'] == 0