diff --git a/.github/import_generation.txt b/.github/import_generation.txt index 81b5c5d06c..e522732c77 100644 --- a/.github/import_generation.txt +++ b/.github/import_generation.txt @@ -1 +1 @@ -37 +38 diff --git a/.github/last_commit.txt b/.github/last_commit.txt index 4e27be1413..7b3760d4b2 100644 --- a/.github/last_commit.txt +++ b/.github/last_commit.txt @@ -1 +1 @@ -abde8e11540c2ccb2c8b2d9d15b2bed3d3fbaf7e +688edef85e4e4e55828630ef3943a91ca9306799 diff --git a/CHANGELOG.md b/CHANGELOG.md index 599b3e6c04..49353fbbda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added support for the new inverted index type: JSON, intended to speed-up queries on Json or JsonDocument columns. + * EXPERIMENTAL! Added `IProducer` interface to the SDK. This interface is used to write messages to a topic. Each message can be associated with a partitioning key, which is used to determine the partition to which the message will be written. diff --git a/examples/topic_writer/producer/basic_write/main.cpp b/examples/topic_writer/producer/basic_write/main.cpp index 83af470330..b5f8e9ddfb 100644 --- a/examples/topic_writer/producer/basic_write/main.cpp +++ b/examples/topic_writer/producer/basic_write/main.cpp @@ -11,7 +11,7 @@ std::shared_ptr CreateProducer(const std::string& topic producerSettings.ProducerIdPrefix("producer_basic"); producerSettings.PartitionChooserStrategy(NYdb::NTopic::TProducerSettings::EPartitionChooserStrategy::Bound); producerSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - producerSettings.MaxBlock(TDuration::Seconds(30)); + producerSettings.MaxBlockTimeout(TDuration::Seconds(30)); producerSettings.MaxMemoryUsage(100_MB); return topicClient.CreateProducer(producerSettings); } @@ -66,7 +66,7 @@ void WriteWithHandlingResult(std::shared_ptr producer, for (size_t retries = 0; retries < MAX_RETRIES; retries++) { auto writeResult = producer->Write(std::move(writeMessage)); - if (writeResult.IsSuccess()) { + if (writeResult.IsQueued()) { // if write was successful, we can continue writing messages continue; } @@ -115,8 +115,7 @@ int main() { auto messageData = std::string(1_KB, 'a'); for (int i = 0; i < 10; i++) { - NYdb::NTopic::TWriteMessage writeMessage(messageData); - writeMessage.Key("key" + ToString(i)); + NYdb::NTopic::TWriteMessage writeMessage("key" + ToString(i), messageData); WriteWithHandlingResult(producer, std::move(writeMessage)); } return 0; diff --git a/include/ydb-cpp-sdk/client/table/table.h b/include/ydb-cpp-sdk/client/table/table.h index ab3466100d..26aca1647a 100644 --- a/include/ydb-cpp-sdk/client/table/table.h +++ b/include/ydb-cpp-sdk/client/table/table.h @@ -815,6 +815,9 @@ class TTableDescription { // fulltext void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const TFulltextIndexSettings& indexSettings); void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector& indexColumns, const std::vector& dataColumns, const TFulltextIndexSettings& indexSettings); + // json + void AddJsonIndex(const std::string& indexName, const std::vector& indexColumns); + void AddJsonIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns); // default void AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns); diff --git a/include/ydb-cpp-sdk/client/table/table_enum.h b/include/ydb-cpp-sdk/client/table/table_enum.h index a53e9e1e86..e9ed2d0112 100644 --- a/include/ydb-cpp-sdk/client/table/table_enum.h +++ b/include/ydb-cpp-sdk/client/table/table_enum.h @@ -37,6 +37,7 @@ enum class EIndexType { GlobalVectorKMeansTree, GlobalFulltextPlain, GlobalFulltextRelevance, + GlobalJson, Unknown = std::numeric_limits::max() }; diff --git a/include/ydb-cpp-sdk/client/topic/producer.h b/include/ydb-cpp-sdk/client/topic/producer.h index de48becd82..3efb9c6aa2 100644 --- a/include/ydb-cpp-sdk/client/topic/producer.h +++ b/include/ydb-cpp-sdk/client/topic/producer.h @@ -12,7 +12,7 @@ struct TProducerSettings : public TWriteSessionSettings { enum class EPartitionChooserStrategy { Bound, - Hash, + KafkaHash, }; TProducerSettings() = default; @@ -39,14 +39,9 @@ struct TProducerSettings : public TWriteSessionSettings { //! ProducerId is generated as ProducerIdPrefix + partition id. FLUENT_SETTING(std::string, ProducerIdPrefix); - //! SessionID to use. - FLUENT_SETTING_DEFAULT(std::string, SessionId, ""); - - //! Maximum block time for write. If set, write will block for up to MaxBlockMs when the buffer is overloaded. - FLUENT_SETTING_DEFAULT(TDuration, MaxBlock, TDuration::Zero()); - - //! Key producer function. - FLUENT_SETTING_OPTIONAL(std::function, KeyProducer); + //! Maximum block timeout for write. If set, write will block for up to MaxBlockTimeout when the buffer is overloaded. + //! If not set, Write will block until the message is written to the buffer. + FLUENT_SETTING_DEFAULT(TDuration, MaxBlockTimeout, TDuration::Max()); private: using TWriteSessionSettings::ProducerId; @@ -88,7 +83,7 @@ struct TWriteResult { //! Value is std::nullopt if the session is not closed. std::optional ClosedDescription; - bool IsSuccess() const { + bool IsQueued() const { return Status == EWriteStatus::Queued; } diff --git a/include/ydb-cpp-sdk/client/topic/write_session.h b/include/ydb-cpp-sdk/client/topic/write_session.h index 89752c6feb..d5b6b16d70 100644 --- a/include/ydb-cpp-sdk/client/topic/write_session.h +++ b/include/ydb-cpp-sdk/client/topic/write_session.h @@ -203,6 +203,16 @@ struct TWriteMessage { : Data(data) {} + TWriteMessage(const std::string& key, std::string_view data) + : Data(data) + , Key(key) + {} + + TWriteMessage(uint32_t partition, std::string_view data) + : Data(data) + , Partition(partition) + {} + TWriteMessage(const TWriteMessage& other) : DataHolder(other.DataHolder) , Data(other.DataHolder ? std::string_view(*DataHolder) : other.Data) @@ -211,9 +221,9 @@ struct TWriteMessage { , SeqNo_(other.SeqNo_) , CreateTimestamp_(other.CreateTimestamp_) , MessageMeta_(other.MessageMeta_) - , Key_(other.Key_) - , Partition_(other.Partition_) , Tx_(other.Tx_) + , Key(other.Key) + , Partition(other.Partition) {} TWriteMessage(TWriteMessage&& other) noexcept @@ -224,9 +234,9 @@ struct TWriteMessage { , SeqNo_(std::move(other.SeqNo_)) , CreateTimestamp_(std::move(other.CreateTimestamp_)) , MessageMeta_(std::move(other.MessageMeta_)) - , Key_(std::move(other.Key_)) - , Partition_(std::move(other.Partition_)) , Tx_(std::move(other.Tx_)) + , Key(std::move(other.Key)) + , Partition(std::move(other.Partition)) {} TWriteMessage& operator=(const TWriteMessage& other) { @@ -241,8 +251,8 @@ struct TWriteMessage { SeqNo_ = other.SeqNo_; CreateTimestamp_ = other.CreateTimestamp_; MessageMeta_ = other.MessageMeta_; - Key_ = other.Key_; - Partition_ = other.Partition_; + Key = other.Key; + Partition = other.Partition; Tx_ = other.Tx_; return *this; @@ -260,8 +270,8 @@ struct TWriteMessage { SeqNo_ = std::move(other.SeqNo_); CreateTimestamp_ = std::move(other.CreateTimestamp_); MessageMeta_ = std::move(other.MessageMeta_); - Key_ = std::move(other.Key_); - Partition_ = std::move(other.Partition_); + Key = std::move(other.Key); + Partition = std::move(other.Partition); Tx_ = std::move(other.Tx_); return *this; @@ -304,12 +314,6 @@ struct TWriteMessage { //! Message metadata. Limited to 4096 characters overall (all keys and values combined). FLUENT_SETTING(TMessageMeta, MessageMeta); - //! Message key. It will be used to route message to the partition. - FLUENT_SETTING_OPTIONAL(std::string, Key); - - //! Partition to write to. It is not recommended to use this option, use Key instead. - FLUENT_SETTING_OPTIONAL(std::uint32_t, Partition); - //! Transaction id FLUENT_SETTING_OPTIONAL(std::reference_wrapper, Tx); @@ -317,6 +321,18 @@ struct TWriteMessage { { return Tx_ ? &Tx_->get() : nullptr; } + + const std::optional& GetKey() const { + return Key; + } + + const std::optional& GetPartition() const { + return Partition; + } + +private: + std::optional Key; + std::optional Partition; }; //! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. diff --git a/src/api/protos/draft/ydb_nbs.proto b/src/api/protos/draft/ydb_nbs.proto index 411c656768..3352ed432d 100644 --- a/src/api/protos/draft/ydb_nbs.proto +++ b/src/api/protos/draft/ydb_nbs.proto @@ -71,8 +71,8 @@ message DeletePartitionResult { message GetLoadActorAdapterActorIdRequest { Ydb.Operations.OperationParams operation_params = 1; - // Partition tablet id (actor id string). - string TabletId = 2; + // Disk id. + string DiskId = 2; } message GetLoadActorAdapterActorIdResponse { diff --git a/src/api/protos/draft/ydb_replication.proto b/src/api/protos/draft/ydb_replication.proto index 62106c4d02..51d03d9b5a 100644 --- a/src/api/protos/draft/ydb_replication.proto +++ b/src/api/protos/draft/ydb_replication.proto @@ -7,6 +7,7 @@ import "src/api/protos/ydb_operation.proto"; import "src/api/protos/ydb_scheme.proto"; import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; package Ydb.Replication; option java_package = "com.yandex.ydb.replication"; @@ -101,6 +102,8 @@ message DescribeTransferRequest { Ydb.Operations.OperationParams operation_params = 1; // Replication path. string path = 2 [(required) = true]; + // Include detailed statistics for each worker. + optional bool include_stats = 3; } message DescribeTransferResponse { @@ -108,6 +111,14 @@ message DescribeTransferResponse { Ydb.Operations.Operation operation = 1; } +// Message representing sliding window statistics by several windows. +message MultipleWindowsStat { + // Average per minute. + google.protobuf.Duration avg_per_minute = 1; + // Average per hour. + google.protobuf.Duration avg_per_hour = 2; +} + message DescribeTransferResult { message RunningState { } @@ -122,6 +133,74 @@ message DescribeTransferResult { message PausedState { } + message Stats { + enum WorkState { + // No reported state. + STATE_UNSPECIFIED = 0; + // Reading data from topic. + STATE_READ = 1; + // Decompressing read data. + STATE_DECOMPRESS = 2; + // Rrocessing data with lambda. + STATE_PROCESS = 3; + // Writing data to table. + STATE_WRITE = 4; + } + + message WorkerStats { + // unique worker identifier + string worker_id = 1; + // last reported work state + WorkState state = 2; + // timestamp of last state change + google.protobuf.Timestamp last_state_change = 3; + // partition of the topic that worker is reading + int64 partition_id = 4; + // current read offset in partition + int64 read_offset = 5; + // how long this worker is being run for + google.protobuf.Duration uptime = 6; + // total cumulative number of worker restarts + int64 restart_count = 7; + // worker restarts per time, sliding window + MultipleWindowsStat restarts = 8; + // topic read speed in sliding window, bytes + MultipleWindowsStat read_bytes = 9; + // topic read speed in sliding window, messages + MultipleWindowsStat read_messages = 10; + // table write speed in sliding window, bytes + MultipleWindowsStat write_bytes = 11; + // table write speed in sliding window, rows + MultipleWindowsStat write_rows = 12; + + // cpu usage time on decompression, microseconds + MultipleWindowsStat decompression_cpu_time = 13; + // cpu usage time on processing, microseconds + MultipleWindowsStat processing_cpu_time = 14; + } + + // detailed stats of every worker, only included if DescribeTransferRequest.include_stats flag = true in request + repeated WorkerStats workers_stats = 1; + // minimal uptime of all current workers + google.protobuf.Duration min_worker_uptime = 2; + + // topic read speed in sliding window, overall for transfer, bytes + MultipleWindowsStat read_bytes = 3; + // topic read speed in sliding window, overall for transfer, messages + MultipleWindowsStat read_messages = 4; + // table write speed in sliding window, overall for transfer, bytes + MultipleWindowsStat write_bytes = 5; + // table write speed in sliding window, overall for transfer, rows + MultipleWindowsStat write_rows = 6; + + // cpu usage time on decompression, overall for transfer, microseconds + MultipleWindowsStat decompression_cpu_time = 7; + // cpu usage time on processing, overall for transfer, microseconds + MultipleWindowsStat processing_cpu_time = 8; + // moment when stats collection was last started (or reset) + google.protobuf.Timestamp stats_collection_start = 9; + } + // Description of scheme object. Ydb.Scheme.Entry self = 1; @@ -145,4 +224,5 @@ message DescribeTransferResult { } optional BatchSettings batch_settings = 11; + optional Stats stats = 12; } diff --git a/src/api/protos/ydb_query.proto b/src/api/protos/ydb_query.proto index afc60032e6..c795eeaafa 100644 --- a/src/api/protos/ydb_query.proto +++ b/src/api/protos/ydb_query.proto @@ -45,9 +45,21 @@ message AttachSessionRequest { string session_id = 1 [(Ydb.length).le = 1024]; } +message SessionShutdownHint { +} + +message NodeShutdownHint { +} + message SessionState { StatusIds.StatusCode status = 1; repeated Ydb.Issue.IssueMessage issues = 2; + + // The reason the session is ending, for SDK-side handling + oneof session_hint { + SessionShutdownHint session_shutdown = 3; + NodeShutdownHint node_shutdown = 4; + } } message SerializableModeSettings { diff --git a/src/api/protos/ydb_table.proto b/src/api/protos/ydb_table.proto index 86f806de75..05102c9e7b 100644 --- a/src/api/protos/ydb_table.proto +++ b/src/api/protos/ydb_table.proto @@ -328,6 +328,10 @@ message LocalBloomNgramFilterIndex { optional bool case_sensitive = 5; } +message GlobalJsonIndex { + GlobalIndexSettings settings = 1; +} + // Represent table index message TableIndex { // Name of index @@ -344,6 +348,7 @@ message TableIndex { GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 9; LocalBloomFilterIndex local_bloom_filter_index = 10; LocalBloomNgramFilterIndex local_bloom_ngram_filter_index = 11; + GlobalJsonIndex global_json_index = 12; } // list of columns content to be copied in to index table repeated string data_columns = 5; @@ -372,6 +377,7 @@ message TableIndexDescription { GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 11; LocalBloomFilterIndex local_bloom_filter_index = 12; LocalBloomNgramFilterIndex local_bloom_ngram_filter_index = 13; + GlobalJsonIndex global_json_index = 14; } Status status = 4; // list of columns content to be copied in to index table diff --git a/src/client/federated_topic/ut/basic_usage_ut.cpp b/src/client/federated_topic/ut/basic_usage_ut.cpp index ce87d140bd..5d46c29299 100644 --- a/src/client/federated_topic/ut/basic_usage_ut.cpp +++ b/src/client/federated_topic/ut/basic_usage_ut.cpp @@ -20,6 +20,38 @@ namespace NYdb::NFederatedTopic::NTests { +void WriteMessages(std::shared_ptr setup, const TString& path, const TString& messageBase, size_t count) { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(path).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + IExecutor::TPtr executor = NPersQueue::CreateSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& pqClient = setup->GetPersQueueClient(); + auto writeSession = pqClient.CreateSimpleBlockingWriteSession(writeSettings); + + for (size_t i = 0; i < count; ++i) { + UNIT_ASSERT(writeSession->Write(messageBase + ToString(i))); + } + writeSession->Close(); +} + +void WriteMessages(std::shared_ptr setup, const TString& messageBase, size_t count) { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + IExecutor::TPtr executor = NPersQueue::CreateSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& pqClient = setup->GetPersQueueClient(); + auto writeSession = pqClient.CreateSimpleBlockingWriteSession(writeSettings); + + for (size_t i = 0; i < count; ++i) { + UNIT_ASSERT(writeSession->Write(messageBase + ToString(i))); + } + writeSession->Close(); +} + Y_UNIT_TEST_SUITE(BasicUsage) { Y_UNIT_TEST(GetAllStartPartitionSessions) { @@ -403,6 +435,52 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession->Close(TDuration::MilliSeconds(10)); } + Y_UNIT_TEST(SimpleDataHandlersAndGetEvent) { + auto setup = std::make_shared(TEST_CASE_NAME, false); + setup->Start(true, true); + + const TString topic1 = setup->GetTestTopic(); + const TString topic2 = setup->GetTestTopic() + "-second"; + setup->CreateTopic(topic2, setup->GetLocalCluster()); + + auto driverConfig = NYdb::TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << setup->GetGrpcPort()) + .SetDatabase("/Root"); + NYdb::TDriver driver(driverConfig); + NYdb::NFederatedTopic::TFederatedTopicClient client(driver); + + TString messageBase = "hello-"; + WriteMessages(setup, topic1, messageBase, 5); + WriteMessages(setup, topic2, messageBase + "t2-", 3); + + TVector receivedMessages; + NYdb::NFederatedTopic::TFederatedReadSessionSettings settings; + settings + .ConsumerName("test-consumer") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(NYdb::NTopic::TTopicReadSettings(topic1)) + .AppendTopics(NYdb::NTopic::TTopicReadSettings(topic2)); + + settings.EventHandlers_.SimpleDataHandlers( + [&receivedMessages](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { + for (const auto& message: event.GetMessages()) { + receivedMessages.push_back(TString(message.GetData())); + } + }, + true // commit on receive + ); + + auto session = client.CreateReadSession(settings); + std::jthread thread([&] { + Sleep(TDuration::Seconds(3)); + UNIT_ASSERT(session->Close(TDuration::MilliSeconds(10))); + }); + + auto event = session->GetEvent(/* block = */true); + UNIT_ASSERT(event.has_value()); + UNIT_ASSERT_VALUES_EQUAL(receivedMessages.size(), 8); + } + Y_UNIT_TEST(FallbackToSingleDbAfterBadRequest) { auto setup = std::make_shared(TEST_CASE_NAME, false); setup->Start(true, true); diff --git a/src/client/persqueue_public/ut/ut_utils/test_server.h b/src/client/persqueue_public/ut/ut_utils/test_server.h index a2c731384c..492a947782 100644 --- a/src/client/persqueue_public/ut/ut_utils/test_server.h +++ b/src/client/persqueue_public/ut/ut_utils/test_server.h @@ -23,6 +23,7 @@ class TTestServer { : PortManager(portManager.GetOrElse(MakeSimpleShared())) , Port(PortManager->GetPort(2134)) , GrpcPort(PortManager->GetPort(2135)) + , Endpoint("localhost:" + ToString(GrpcPort)) , ServerSettings(settings) , GrpcServerOptions(NYdbGrpc::TServerOptions().SetHost("[::1]").SetPort(GrpcPort)) { @@ -62,6 +63,11 @@ class TTestServer { CleverServer = MakeHolder(ServerSettings); CleverServer->EnableGRpc(GrpcServerOptions); + auto driverConfig = NYdb::TDriverConfig() + .SetEndpoint(Endpoint) + .SetDatabase("/" + ServerSettings.DomainName); + Driver = MakeHolder(driverConfig); + Log << TLOG_INFO << "TTestServer started on Port " << Port << " GrpcPort " << GrpcPort; AnnoyingClient = MakeHolder(ServerSettings, GrpcPort, databaseName); @@ -121,7 +127,7 @@ class TTestServer { } const NYdb::TDriver& GetDriver() const { - return CleverServer->GetDriver(); + return *Driver; } void KillTopicPqrbTablet(const TString& topicPath) { @@ -164,6 +170,7 @@ class TTestServer { TSimpleSharedPtr PortManager; ui16 Port; ui16 GrpcPort; + TString Endpoint; THolder CleverServer; NKikimr::Tests::TServerSettings ServerSettings; @@ -176,6 +183,8 @@ class TTestServer { static const TVector LOGGED_SERVICES; +private: + THolder Driver; }; } // namespace NPersQueue diff --git a/src/client/table/table.cpp b/src/client/table/table.cpp index 63bc32c6ce..b24afacee6 100644 --- a/src/client/table/table.cpp +++ b/src/client/table/table.cpp @@ -800,6 +800,14 @@ void TTableDescription::AddFulltextIndex(const std::string& indexName, EIndexTyp Impl_->AddFulltextIndex(indexName, indexType, indexColumns, dataColumns, indexSettings); } +void TTableDescription::AddJsonIndex(const std::string& indexName, const std::vector& indexColumns) { + AddSecondaryIndex(indexName, EIndexType::GlobalJson, indexColumns); +} + +void TTableDescription::AddJsonIndex(const std::string& indexName, const std::vector& indexColumns, const std::vector& dataColumns) { + AddSecondaryIndex(indexName, EIndexType::GlobalJson, indexColumns, dataColumns); +} + void TTableDescription::AddSecondaryIndex(const std::string& indexName, const std::vector& indexColumns) { AddSyncSecondaryIndex(indexName, indexColumns); } @@ -2796,6 +2804,10 @@ TIndexDescription TIndexDescription::FromProto(const TProto& proto) { specializedIndexSettings = TFulltextIndexSettings::FromProto(fulltextProto.fulltext_settings()); break; } + case TProto::kGlobalJsonIndex: + type = EIndexType::GlobalJson; + globalIndexSettings.emplace_back(TGlobalIndexSettings::FromProto(proto.global_json_index().settings())); + break; default: // fallback to global sync type = EIndexType::GlobalSync; globalIndexSettings.resize(1); @@ -2886,6 +2898,13 @@ void TIndexDescription::SerializeTo(Ydb::Table::TableIndex& proto) const { } break; } + case EIndexType::GlobalJson: { + auto& settings = *proto.mutable_global_json_index()->mutable_settings(); + if (GlobalIndexSettings_.size() == 1) { + GlobalIndexSettings_.at(0).SerializeTo(settings); + } + break; + } case EIndexType::Unknown: break; } @@ -2911,6 +2930,7 @@ void TIndexDescription::Out(IOutputStream& o) const { case EIndexType::GlobalSync: case EIndexType::GlobalAsync: case EIndexType::GlobalUnique: + case EIndexType::GlobalJson: case EIndexType::Unknown: break; case EIndexType::GlobalVectorKMeansTree: diff --git a/src/client/topic/impl/event_handlers.cpp b/src/client/topic/impl/event_handlers.cpp index 1a4fad20fa..e91024f677 100644 --- a/src/client/topic/impl/event_handlers.cpp +++ b/src/client/topic/impl/event_handlers.cpp @@ -1,6 +1,7 @@ #include #include +#include namespace NYdb::inline V3::NTopic { @@ -22,7 +23,8 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { TDeferredCommit deferredCommit; { std::lock_guard guard(Lock); - auto& offsetSet = PartitionStreamToUncommittedOffsets[event.GetPartitionSession()->GetPartitionSessionId()]; + const std::string key = GetEventKey(event); + auto& offsetSet = PartitionStreamToUncommittedOffsets[key]; // Messages could contain holes in offset, but later commit ack will tell us right border. // So we can easily insert the whole interval with holes included. // It will be removed from set by specifying proper right border. @@ -41,16 +43,16 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { void OnCommitAcknowledgement(TReadSessionEvent::TCommitOffsetAcknowledgementEvent& event) { std::lock_guard guard(Lock); - const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); - auto& offsetSet = PartitionStreamToUncommittedOffsets[partitionStreamId]; + const std::string key = GetEventKey(event); + auto& offsetSet = PartitionStreamToUncommittedOffsets[key]; if (offsetSet.EraseInterval(0, event.GetCommittedOffset() + 1)) { // Remove some offsets. if (offsetSet.Empty()) { // No offsets left. - auto unconfirmedDestroyIt = UnconfirmedDestroys.find(partitionStreamId); + auto unconfirmedDestroyIt = UnconfirmedDestroys.find(key); if (unconfirmedDestroyIt != UnconfirmedDestroys.end()) { // Confirm and forget about this partition stream. unconfirmedDestroyIt->second.Confirm(); UnconfirmedDestroys.erase(unconfirmedDestroyIt); - PartitionStreamToUncommittedOffsets.erase(partitionStreamId); + PartitionStreamToUncommittedOffsets.erase(key); } } } @@ -59,20 +61,21 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { void OnCreatePartitionStream(TReadSessionEvent::TStartPartitionSessionEvent& event) { { std::lock_guard guard(Lock); - Y_ABORT_UNLESS(PartitionStreamToUncommittedOffsets[event.GetPartitionSession()->GetPartitionSessionId()].Empty()); + const std::string key = GetEventKey(event); + Y_ABORT_UNLESS(PartitionStreamToUncommittedOffsets[key].Empty()); } event.Confirm(); } void OnDestroyPartitionStream(TReadSessionEvent::TStopPartitionSessionEvent& event) { std::lock_guard guard(Lock); - const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); - Y_ABORT_UNLESS(UnconfirmedDestroys.find(partitionStreamId) == UnconfirmedDestroys.end()); - if (PartitionStreamToUncommittedOffsets[partitionStreamId].Empty()) { - PartitionStreamToUncommittedOffsets.erase(partitionStreamId); + auto key = GetEventKey(event); + Y_ABORT_UNLESS(UnconfirmedDestroys.find(key) == UnconfirmedDestroys.end()); + if (PartitionStreamToUncommittedOffsets[key].Empty()) { + PartitionStreamToUncommittedOffsets.erase(key); event.Confirm(); } else { - UnconfirmedDestroys.emplace(partitionStreamId, std::move(event)); + UnconfirmedDestroys.emplace(key, std::move(event)); } } @@ -82,17 +85,22 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase { void OnPartitionStreamClosed(TReadSessionEvent::TPartitionSessionClosedEvent& event) { std::lock_guard guard(Lock); - const ui64 partitionStreamId = event.GetPartitionSession()->GetPartitionSessionId(); - PartitionStreamToUncommittedOffsets.erase(partitionStreamId); - UnconfirmedDestroys.erase(partitionStreamId); + const std::string key = GetEventKey(event); + PartitionStreamToUncommittedOffsets.erase(key); + UnconfirmedDestroys.erase(key); } private: + template + std::string GetEventKey(const TEvent& event) { + return event.GetPartitionSession()->GetReadSessionId() + "_" + ToString(event.GetPartitionSession()->GetPartitionSessionId()); + } + TAdaptiveLock Lock; // For the case when user gave us multithreaded executor. const std::function DataHandler; const bool CommitAfterProcessing; - std::unordered_map> PartitionStreamToUncommittedOffsets; // Partition stream id -> set of offsets. - std::unordered_map UnconfirmedDestroys; // Partition stream id -> destroy events. + std::unordered_map> PartitionStreamToUncommittedOffsets; // Session id + Partition stream id -> set of offsets. + std::unordered_map UnconfirmedDestroys; // Session id + Partition stream id -> destroy events. }; TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::SimpleDataHandlers(std::function dataHandler, diff --git a/src/client/topic/impl/producer.cpp b/src/client/topic/impl/producer.cpp index 8057902e0f..18b8e64221 100644 --- a/src/client/topic/impl/producer.cpp +++ b/src/client/topic/impl/producer.cpp @@ -1,8 +1,12 @@ #include #include #include +#include #include #include +#include + +#include namespace NYdb::inline V3::NTopic { @@ -71,9 +75,10 @@ TWriteMessage TProducer::TMessageInfo::BuildMessage() const { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TProducer::TWriteSessionWrapper -TProducer::TWriteSessionWrapper::TWriteSessionWrapper(WriteSessionPtr session, std::uint32_t partition) +TProducer::TWriteSessionWrapper::TWriteSessionWrapper(WriteSessionPtr session, std::uint32_t partition, bool directToPartition) : Session(std::move(session)) , Partition(partition) + , DirectToPartition(directToPartition) {} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -171,6 +176,10 @@ void TProducer::TSplittedPartitionWorker::DoWork() { Producer->Partitions[child].Locked(false); } Producer->Partitions[PartitionId].Locked_ = false; + + for (const auto& [partitionId, maxSeqNo] : CachedMaxSeqNos) { + Producer->Partitions[partitionId].CachedMaxSeqNo = maxSeqNo; + } MoveTo(EState::Done); break; } @@ -181,7 +190,8 @@ void TProducer::TSplittedPartitionWorker::MoveTo(EState state) { LOG_LAZY(Producer->DbDriverState->Log, TLOG_INFO, Producer->LogPrefix() << "Moving splitted partition worker for partition " << PartitionId << " to state " << GetStateName()); } -void TProducer::TSplittedPartitionWorker::UpdateMaxSeqNo(std::uint64_t maxSeqNo) { +void TProducer::TSplittedPartitionWorker::UpdateMaxSeqNo(std::uint32_t partitionId, std::uint64_t maxSeqNo) { + CachedMaxSeqNos[partitionId] = maxSeqNo; MaxSeqNo = std::max(MaxSeqNo, maxSeqNo); } @@ -269,7 +279,12 @@ void TProducer::TSplittedPartitionWorker::LaunchGetMaxSeqNoFutures(std::unique_l NotReadyFutures = ancestors.size(); for (const auto& ancestor : ancestors) { - auto wrappedSession = Producer->SessionsWorker->GetWriteSession(ancestor, false); + if (Producer->Partitions[ancestor].CachedMaxSeqNo.has_value()) { + --NotReadyFutures; + UpdateMaxSeqNo(ancestor, Producer->Partitions[ancestor].CachedMaxSeqNo.value()); + continue; + } + auto wrappedSession = Producer->SessionsWorker->GetOrCreateWriteSession(ancestor, false); Y_ABORT_UNLESS(wrappedSession, "Write session not found"); WriteSessions.push_back(wrappedSession); @@ -297,7 +312,7 @@ void TProducer::TSplittedPartitionWorker::LaunchGetMaxSeqNoFutures(std::unique_l return; } - UpdateMaxSeqNo(result.GetValue()); + UpdateMaxSeqNo(ancestor, result.GetValue()); if (--NotReadyFutures == 0) { MoveTo(EState::GotMaxSeqNo); gotMaxSeqNo = true; @@ -307,6 +322,7 @@ void TProducer::TSplittedPartitionWorker::LaunchGetMaxSeqNoFutures(std::unique_l if (gotMaxSeqNo) { producerPtr->RunMainWorker(static_cast(PartitionId)); } + producerPtr->SessionsWorker->DestroyWriteSession(ancestor); }); lock.lock(); GetMaxSeqNoFutures.push_back(future); @@ -315,6 +331,12 @@ void TProducer::TSplittedPartitionWorker::LaunchGetMaxSeqNoFutures(std::unique_l if (ancestors.empty()) { LOG_LAZY(Producer->DbDriverState->Log, TLOG_INFO, Producer->LogPrefix() << "No ancestors found for partition " << PartitionId); MoveTo(EState::Init); + return; + } + + if (NotReadyFutures == 0) { + MoveTo(EState::GotMaxSeqNo); + Producer->RunMainWorker(static_cast(PartitionId)); } } @@ -373,7 +395,6 @@ bool TProducer::TEventsWorker::RunEventLoop(WrappedWriteSessionPtr wrappedSessio } if (auto acksEvent = std::get_if(&*event)) { - // Producer->SessionsWorker->OnReadFromSession(wrappedSession, acksEvent->Acks.size()); HandleAcksEvent(partition, std::move(*acksEvent)); continue; } @@ -386,16 +407,22 @@ std::optional> TProducer::TEventsWorker::DoWork() { std::unique_lock lock(Lock); while (!ReadyFutures.empty()) { - auto idx = *ReadyFutures.begin(); - ReadyFutures.erase(idx); + auto partition = *ReadyFutures.begin(); + ReadyFutures.erase(partition); + auto session = Producer->SessionsWorker->GetWriteSession(partition); + if (!session) { + continue; + } + lock.unlock(); // RunEventLoop without Lock: sub-session's WaitEvent() completion may run the Subscribe // callback (ReadyFutures.insert) synchronously; that callback takes Lock -> same-thread deadlock. - auto isSessionClosed = RunEventLoop(Producer->SessionsWorker->GetWriteSession(idx), idx); + auto isSessionClosed = RunEventLoop(session, partition); if (!isSessionClosed) { - SubscribeToPartition(idx); + SubscribeToPartition(partition); } else { - UnsubscribeFromPartition(idx); + session->Closed = true; + Producer->SessionsWorker->DestroyWriteSession(partition); } lock.lock(); } @@ -413,7 +440,7 @@ void TProducer::TEventsWorker::SubscribeToPartition(std::uint32_t partition) { return; } - auto wrappedSession = Producer->SessionsWorker->GetWriteSession(partition); + auto wrappedSession = Producer->SessionsWorker->GetOrCreateWriteSession(partition); auto newFuture = wrappedSession->Session->WaitEvent(); std::weak_ptr producer = Producer->shared_from_this(); std::weak_ptr self = shared_from_this(); @@ -490,7 +517,7 @@ bool TProducer::TEventsWorker::TransferEventsToOutputQueue() { TWriteSessionEvent::TAcksEvent ackEvent; if (expectedSeqNo.has_value()) { - Y_ENSURE(acksQueue.front().SeqNo == expectedSeqNo.value(), TStringBuilder() << "Expected seqNo=" << expectedSeqNo.value() << " but got " << acksQueue.front().SeqNo << " for partition " << Producer->Partitions[partition].PartitionId_); + Y_ENSURE(acksQueue.front().SeqNo == expectedSeqNo.value(), TStringBuilder() << "Expected seqNo=" << expectedSeqNo.value() << " but got " << acksQueue.front().SeqNo << " for partition " << partition); } auto ack = std::move(acksQueue.front()); @@ -682,12 +709,36 @@ TProducer::TSessionsWorker::TSessionsWorker(TProducer* producer) : Producer(producer) {} -TProducer::WrappedWriteSessionPtr TProducer::TSessionsWorker::GetWriteSession(std::uint32_t partition, bool directToPartition) { +TProducer::WrappedWriteSessionPtr TProducer::TSessionsWorker::GetOrCreateWriteSession(std::uint32_t partition, bool directToPartition) { auto sessionIter = SessionsIndex.find(partition); - if (sessionIter == SessionsIndex.end() || !directToPartition) { + if (sessionIter == SessionsIndex.end()) { return CreateWriteSession(partition, directToPartition); } + SessionsToRemove.erase(partition); + if (!directToPartition && sessionIter->second->DirectToPartition) { + Y_ABORT_UNLESS(sessionIter->second->Closed, "Session is not closed for partition: %u", partition); + ClosedSessionsToRemove.push_back(sessionIter->second); + SessionsIndex.erase(sessionIter); + return CreateWriteSession(partition, directToPartition); + } + + if (!sessionIter->second->DirectToPartition) { + sessionIter->second->NonDirectToPartitionOwnership++; + } + + return sessionIter->second; +} + +TProducer::WrappedWriteSessionPtr TProducer::TSessionsWorker::GetWriteSession(std::uint32_t partition, bool directToPartition) { + auto sessionIter = SessionsIndex.find(partition); + if (sessionIter == SessionsIndex.end()) { + return nullptr; + } + + SessionsToRemove.erase(partition); + Y_ABORT_UNLESS(directToPartition == sessionIter->second->DirectToPartition, "DirectToPartition mismatch: %s != %s", directToPartition ? "true" : "false", sessionIter->second->DirectToPartition ? "true" : "false"); + return sessionIter->second; } @@ -716,27 +767,40 @@ TProducer::WrappedWriteSessionPtr TProducer::TSessionsWorker::CreateWriteSession } auto writeSession = std::make_shared( Producer->Client->CreateWriteSession(alteredSettings), - partition); + partition, + directToPartition + ); + SessionsIndex.emplace(partition, writeSession); if (directToPartition) { - SessionsIndex.emplace(partition, writeSession); Producer->EventsWorker->SubscribeToPartition(partition); } return writeSession; } -void TProducer::TSessionsWorker::DestroyWriteSession(TSessionsIndexIterator& it, TDuration closeTimeout) { +void TProducer::TSessionsWorker::DestroyWriteSession(std::uint32_t partition) { + auto it = SessionsIndex.find(partition); if (it == SessionsIndex.end() || !it->second) { return; } - it->second->Session->Close(closeTimeout); - const auto partition = it->second->Partition; + if (!it->second->DirectToPartition && --it->second->NonDirectToPartitionOwnership > 0) { + return; + } + + if (it->second->DirectToPartition) { + Producer->EventsWorker->UnsubscribeFromPartition(partition); + } + + // Remove idle bookkeeping before erasing the session from SessionsIndex so stale + // idle markers cannot later evict a new session created for the same partition. + RemoveIdleSession(partition); + if (static_cast(partition) == Producer->MainWorkerOwner) { - SessionsToRemove.push_back(it->second); + SessionsToRemove.emplace(partition); + } else { + SessionsIndex.erase(it); } - it = SessionsIndex.erase(it); - Producer->EventsWorker->UnsubscribeFromPartition(partition); } size_t TProducer::TSessionsWorker::GetSessionsCount() const { @@ -770,23 +834,40 @@ void TProducer::TSessionsWorker::RemoveIdleSession(std::uint32_t partition) { return; } + const auto idleSession = *itIdle->second; + IdlerSessions.erase(itIdle->second); + IdlerSessionsIndex.erase(itIdle); + auto wrappedSession = SessionsIndex.find(partition); if (wrappedSession == SessionsIndex.end()) { return; } - IdlerSessions.erase(itIdle->second); - IdlerSessionsIndex.erase(itIdle); - wrappedSession->second->IdleSession.reset(); + if (wrappedSession->second.get() == idleSession->Session) { + wrappedSession->second->IdleSession.reset(); + } } void TProducer::TSessionsWorker::DoWork() { - while (!SessionsToRemove.empty()) { - if (static_cast(SessionsToRemove.front()->Partition) == Producer->MainWorkerOwner) { - break; + for (auto it = SessionsToRemove.begin(); it != SessionsToRemove.end();) { + auto partition = *it; + if (static_cast(partition) == Producer->MainWorkerOwner) { + ++it; + continue; + } + + SessionsIndex.erase(partition); + it = SessionsToRemove.erase(it); + } + + for (auto it = ClosedSessionsToRemove.begin(); it != ClosedSessionsToRemove.end();) { + auto session = *it; + if (static_cast(session->Partition) == Producer->MainWorkerOwner) { + ++it; + continue; } - SessionsToRemove.pop_front(); + it = ClosedSessionsToRemove.erase(it); } while (!IdlerSessions.empty()) { @@ -795,11 +876,10 @@ void TProducer::TSessionsWorker::DoWork() { break; } - LOG_LAZY(Producer->DbDriverState->Log, TLOG_DEBUG, TStringBuilder() << Producer->LogPrefix() << "Removing idle session for partition " << (*it)->Session->Partition); - - const auto partition = (*it)->Session->Partition; + auto expiredIdleSession = *it; + const auto partition = expiredIdleSession->Session->Partition; if (Producer->Partitions[partition].Locked_) { - continue; + break; } // Remove idle tracking first to keep containers consistent even if the session @@ -809,8 +889,11 @@ void TProducer::TSessionsWorker::DoWork() { auto sessionIter = SessionsIndex.find(partition); if (sessionIter != SessionsIndex.end()) { + if (sessionIter->second.get() != expiredIdleSession->Session) { + continue; + } sessionIter->second->IdleSession.reset(); - DestroyWriteSession(sessionIter, TDuration::Zero()); + DestroyWriteSession(partition); } } } @@ -834,9 +917,114 @@ void TProducer::TMessagesWorker::RechoosePartitionIfNeeded(MessageIter message) message->Partition = newPartition; } +void TProducer::TMessagesWorker::HandleReadyInitSeqNoFutures() { + std::unique_lock lock(InitLock); + for (const auto& partition : GotInitSeqNoPartitions) { + auto it = InitGetMaxSeqNoFutures.find(partition); + Y_ABORT_UNLESS(it != InitGetMaxSeqNoFutures.end(), "Init get max seq no future not found"); + Y_ABORT_UNLESS(it->second.IsReady(), "Init get max seq no future is not ready"); + + auto gotMaxSeqNo = it->second.GetValue(); + CurrentSeqNo = std::max(CurrentSeqNo, gotMaxSeqNo); + Producer->Partitions[partition].CachedMaxSeqNo = gotMaxSeqNo; + InitGetMaxSeqNoFutures.erase(it); + } + + GotInitSeqNoPartitions.clear(); +} + +void TProducer::TMessagesWorker::FinishInit() { + for (const auto& partition : Producer->Partitions) { + if (!partition.second.IsSplitted() && !InFlightMessagesIndex.contains(partition.first)) { + Producer->SessionsWorker->AddIdleSession(partition.first); + } + + if (partition.second.IsSplitted()) { + Producer->SessionsWorker->DestroyWriteSession(partition.first); + } + } + + InitWriteSessions.clear(); + InitGetMaxSeqNoFutures.clear(); +} + +bool TProducer::TMessagesWorker::LazyInit() { + if (State == EState::Ready) { + return true; + } + + if (Producer->SeqNoStrategy == ESeqNoStrategy::WithSeqNo) { + MoveTo(EState::Ready); + return true; + } + + if (State == EState::PendingSeqNo) { + HandleReadyInitSeqNoFutures(); + if (InitGetMaxSeqNoFutures.empty()) { + FinishInit(); + MoveTo(EState::Ready); + return true; + } + + return false; + } + + std::weak_ptr producer = Producer->shared_from_this(); + std::weak_ptr self = shared_from_this(); + for (const auto& partition : Producer->Partitions) { + auto partitionId = partition.first; + WrappedWriteSessionPtr wrappedSession = nullptr; + if (partition.second.IsSplitted()) { + wrappedSession = Producer->SessionsWorker->GetOrCreateWriteSession(partition.first, false); + } else { + wrappedSession = Producer->SessionsWorker->GetOrCreateWriteSession(partition.first); + } + + InitWriteSessions.push_back(wrappedSession); + auto initGetMaxSeqNoFuture = wrappedSession->Session->GetInitSeqNo(); + + initGetMaxSeqNoFuture.Subscribe([self, producer, partitionId](NThreading::TFuture future) { + auto selfPtr = self.lock(); + if (!selfPtr) { + return; + } + + auto producerPtr = producer.lock(); + if (!producerPtr) { + return; + } + + if (!future.IsReady()) { + return; + } + + { + std::lock_guard lock(selfPtr->InitLock); + selfPtr->GotInitSeqNoPartitions.push_back(partitionId); + } + producerPtr->RunMainWorker(partitionId); + }); + InitGetMaxSeqNoFutures.emplace(partition.first, initGetMaxSeqNoFuture); + } + + MoveTo(EState::PendingSeqNo); + return false; +} + +void TProducer::TMessagesWorker::MoveTo(EState state) { + State = state; +} + void TProducer::TMessagesWorker::DoWork() { - auto sessionsWorker = Producer->SessionsWorker; + if (MessagesToResendIndex.empty() && PendingMessagesIndex.empty()) { + return; + } + + if (!LazyInit()) { + return; + } + auto sessionsWorker = Producer->SessionsWorker; auto iterateMessagesIndex = [&](std::unordered_map>& messagesIndex, auto stopCondition) { std::vector partitionsProcessed; for (auto& [partition, messages] : messagesIndex) { @@ -846,14 +1034,17 @@ void TProducer::TMessagesWorker::DoWork() { break; } - auto wrappedSession = sessionsWorker->GetWriteSession(head->Partition); + if (!head->SeqNo.has_value()) { + head->SeqNo.emplace(++CurrentSeqNo); + } + + auto wrappedSession = sessionsWorker->GetOrCreateWriteSession(head->Partition); if (!SendMessage(wrappedSession, *head)) { break; } Producer->Metrics.AddWriteLag((TInstant::Now() - head->CreateTimestamp.value_or(TInstant::Now())).MilliSeconds()); head->Sent = true; - // sessionsWorker->OnWriteToSession(wrappedSession); messages.pop_front(); } @@ -1238,22 +1429,27 @@ TProducer::TProducer( std::shared_ptr client, std::shared_ptr connections, TDbDriverStatePtr dbDriverState) - : Connections(connections), + : Id(CreateGuidAsString()), + Connections(connections), Client(client), DbDriverState(dbDriverState), Metrics(this), Settings(settings) { if (settings.ProducerIdPrefix_.empty()) { - ythrow TContractViolation("ProducerIdPrefix is required for KeyedWriteSession"); + ythrow TContractViolation("ProducerIdPrefix is required for Producer"); } if (!settings.ProducerId_.empty()) { - ythrow TContractViolation("ProducerId should be empty for KeyedWriteSession, use ProducerIdPrefix instead"); + ythrow TContractViolation("ProducerId should be empty for Producer, use ProducerIdPrefix instead"); } if (!settings.MessageGroupId_.empty()) { - ythrow TContractViolation("MessageGroupId should be empty for KeyedWriteSession"); + ythrow TContractViolation("MessageGroupId should be empty for Producer"); + } + + if (IsFederation(DbDriverState->DiscoveryEndpoint)) { + ythrow TContractViolation("Producer is not supported for federation"); } TDescribeTopicSettings describeTopicSettings; @@ -1263,7 +1459,7 @@ TProducer::TProducer( return a.GetPartitionId() < b.GetPartitionId(); }); - PartitionChooserStrategy = settings.PartitionChooserStrategy_; + auto partitionChooserStrategy = settings.PartitionChooserStrategy_; auto strategy = topicConfig.GetTopicDescription().GetPartitioningSettings().GetAutoPartitioningSettings().GetStrategy(); auto autoPartitioningEnabled = (strategy != EAutoPartitioningStrategy::Disabled && strategy != EAutoPartitioningStrategy::Unspecified); @@ -1302,25 +1498,26 @@ TProducer::TProducer( } } - switch (PartitionChooserStrategy) { + switch (partitionChooserStrategy) { case TProducerSettings::EPartitionChooserStrategy::Bound: PartitioningKeyHasher = settings.PartitioningKeyHasher_; PartitionChooser = std::make_unique(this); - for (size_t i = 0; i < Partitions.size(); ++i) { - if (i > 0 && Partitions[i].FromBound_.empty() && !Partitions[i].ToBound_.has_value()) { + for (size_t i = 0; i < partitions.size(); ++i) { + const auto& partition = partitions[i]; + if (i > 0 && !partition.GetFromBound().has_value() && !partition.GetToBound().has_value()) { ythrow TContractViolation("Unbounded partition is not supported for Bound partition chooser strategy"); } - if (!Partitions[i].Children_.empty()) { + if (!partition.GetChildPartitionIds().empty()) { continue; } - PartitionsIndex[Partitions[i].FromBound_] = Partitions[i].PartitionId_; + PartitionsIndex[partition.GetFromBound().value_or("")] = partition.GetPartitionId(); } break; - case TProducerSettings::EPartitionChooserStrategy::Hash: + case TProducerSettings::EPartitionChooserStrategy::KafkaHash: if (autoPartitioningEnabled) { - throw TContractViolation("Hash partition chooser strategy is not supported with auto partitioning enabled"); + throw TContractViolation("KafkaHash partition chooser strategy is not supported with auto partitioning enabled"); } std::vector partitionsIds; @@ -1353,8 +1550,7 @@ TProducer::TProducer( }); RunMainWorker(-1); - - LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Keyed write session created"); + LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Producer created"); } std::vector TProducer::GetPartitions() const { @@ -1375,11 +1571,15 @@ std::map TProducer::GetPartitionsIndex() const { } size_t TProducer::GetSessionsCount() { + RunMainWorker(-1); + std::lock_guard lock(GlobalLock); return SessionsWorker->GetSessionsCount(); } size_t TProducer::GetIdleSessionsCount() { + RunMainWorker(-1); + std::lock_guard lock(GlobalLock); return SessionsWorker->GetIdleSessionsCount(); } @@ -1430,6 +1630,11 @@ void TProducer::SetCloseDeadline(const TDuration& closeTimeout) { TProducer::~TProducer() { auto _ = Close(TDuration::Zero()); // Ignore the result, because we are destroying the producer Settings.EventHandlers_.HandlersExecutor_->Stop(); + + if (MainWorkerState.load() == 0) { + ShutdownPromise.TrySetValue(); + } + ShutdownFuture.Wait(); } @@ -1565,6 +1770,11 @@ void TProducer::RunUserEventLoop() { } } +bool TProducer::IsFederation(const std::string& endpoint) { + std::string_view host = GetHost(endpoint); + return host == "logbroker.yandex.net" || host == "logbroker-prestable.yandex.net"; +} + void TProducer::GetSessionClosedEventAndDie(WrappedWriteSessionPtr wrappedSession, std::optional sessionClosedEvent) { std::optional receivedSessionClosedEvent; while (true) { @@ -1588,7 +1798,7 @@ void TProducer::GetSessionClosedEventAndDie(WrappedWriteSessionPtr wrappedSessio } TStringBuilder TProducer::LogPrefix() { - return TStringBuilder() << " SessionId: " << Settings.SessionId_ << " Epoch: " << Epoch.load() << " "; + return TStringBuilder() << " Id: " << Id << " Epoch: " << Epoch.load() << " "; } void TProducer::NextEpoch() { @@ -1729,29 +1939,25 @@ TWriteResult TProducer::WriteInternal(TContinuationToken&&, TWriteMessage&& mess } std::uint32_t chosenPartition; - if (message.Partition_.has_value()) { - if (!Partitions[message.Partition_.value()].Children_.empty()) { + std::string key; + if (message.GetPartition().has_value()) { + if (!Partitions[message.GetPartition().value()].Children_.empty()) { return TWriteResult{ .Status = EWriteStatus::Error, .ErrorMessage = "Partition was split", }; } - chosenPartition = message.Partition_.value(); - } else if (!message.Key_.has_value()) { - std::string key; - if (Settings.KeyProducer_) { - key = (*Settings.KeyProducer_)(message); - } else { - key = Settings.ProducerIdPrefix_; - } - message.Key(key); - chosenPartition = PartitionChooser->ChoosePartition(key); + chosenPartition = message.GetPartition().value(); + } else if (!message.GetKey().has_value()) { + key = Settings.ProducerIdPrefix_; + chosenPartition = PartitionChooser->ChoosePartition(Settings.ProducerIdPrefix_); } else { - chosenPartition = PartitionChooser->ChoosePartition(*message.Key_); + chosenPartition = PartitionChooser->ChoosePartition(*message.GetKey()); + key = *message.GetKey(); } - MessagesWorker->AddMessage(message.Key_.value_or(""), std::move(message), chosenPartition); + MessagesWorker->AddMessage(key, std::move(message), chosenPartition); eventsPromise = EventsWorker->HandleNewMessage(); RunUserEventLoop(); } @@ -1767,7 +1973,7 @@ TWriteResult TProducer::WriteInternal(TContinuationToken&&, TWriteMessage&& mess } TWriteResult TProducer::Write(TWriteMessage&& message) { - auto remainingTimeout = Settings.MaxBlock_; + auto remainingTimeout = Settings.MaxBlockTimeout_; auto sleepTimeMs = DEFAULT_START_BLOCK_TIMEOUT; for (;;) { if (Closed.load()) { @@ -1841,15 +2047,10 @@ TInstant TProducer::GetCloseDeadline() { } void TProducer::HandleAutoPartitioning(std::uint32_t partition) { - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "HandleAutoPartitioning: " << partition); auto splittedPartitionWorker = std::make_shared(this, partition); SplittedPartitionWorkers.try_emplace(partition, splittedPartitionWorker); } -std::string TProducer::GetProducerId(std::uint32_t partition) { - return std::format("{}_{}", Settings.ProducerIdPrefix_, partition); -} - TWriterCounters::TPtr TProducer::GetCounters() { return nullptr; } diff --git a/src/client/topic/impl/producer.h b/src/client/topic/impl/producer.h index ff0c997132..dca301c72a 100644 --- a/src/client/topic/impl/producer.h +++ b/src/client/topic/impl/producer.h @@ -37,6 +37,8 @@ class TProducer : public IProducer, FLUENT_SETTING(std::vector, Children); FLUENT_SETTING_DEFAULT(bool, Locked, false); FLUENT_SETTING_DEFAULT(NThreading::TFuture, Future, NThreading::MakeFuture()); + + std::optional CachedMaxSeqNo; }; struct TMessageInfo { @@ -63,8 +65,13 @@ class TProducer : public IProducer, WriteSessionPtr Session; const std::uint32_t Partition; std::shared_ptr IdleSession = nullptr; + bool DirectToPartition = true; + bool Closed = false; + + // This field is used only when DirectToPartition is false. + size_t NonDirectToPartitionOwnership = 1; - TWriteSessionWrapper(WriteSessionPtr session, std::uint32_t partition); + TWriteSessionWrapper(WriteSessionPtr session, std::uint32_t partition, bool directToPartition = true); }; using WrappedWriteSessionPtr = std::shared_ptr; @@ -118,18 +125,19 @@ class TProducer : public IProducer, struct TSessionsWorker { TSessionsWorker(TProducer* producer); + WrappedWriteSessionPtr GetOrCreateWriteSession(std::uint32_t partition, bool directToPartition = true); WrappedWriteSessionPtr GetWriteSession(std::uint32_t partition, bool directToPartition = true); void AddIdleSession(std::uint32_t partition); - void RemoveIdleSession(std::uint32_t partition); void DoWork(); size_t GetSessionsCount() const; size_t GetIdleSessionsCount() const; + void DestroyWriteSession(std::uint32_t partition); + void RemoveIdleSession(std::uint32_t partition); private: WrappedWriteSessionPtr CreateWriteSession(std::uint32_t partition, bool directToPartition = true); using TSessionsIndexIterator = std::unordered_map::iterator; - void DestroyWriteSession(TSessionsIndexIterator& it, TDuration closeTimeout); std::string GetProducerId(std::uint32_t partitionId); @@ -138,12 +146,11 @@ class TProducer : public IProducer, using IdlerSessionsIterator = std::set::iterator; std::unordered_map IdlerSessionsIndex; std::unordered_map SessionsIndex; - std::deque SessionsToRemove; - - static constexpr TDuration SESSION_REMOVE_DELAY = TDuration::Seconds(5); + std::unordered_set SessionsToRemove; + std::list ClosedSessionsToRemove; }; - struct TMessagesWorker { + struct TMessagesWorker : public std::enable_shared_from_this { TMessagesWorker(TProducer* producer); void DoWork(); @@ -160,6 +167,12 @@ class TProducer : public IProducer, void SetClosedStatusToFlushPromises(std::optional closedDescription); private: + enum class EState : std::uint8_t { + Init = 0, + PendingSeqNo = 1, + Ready = 2, + }; + using MessageIter = std::list::iterator; void PushInFlightMessage(std::uint32_t partition, TMessageInfo&& message); @@ -167,6 +180,10 @@ class TProducer : public IProducer, bool SendMessage(WrappedWriteSessionPtr wrappedSession, const TMessageInfo& message); std::optional GetContinuationToken(std::uint32_t partition); void RechoosePartitionIfNeeded(MessageIter message); + bool LazyInit(); + void MoveTo(EState state); + void HandleReadyInitSeqNoFutures(); + void FinishInit(); TProducer* Producer; @@ -177,6 +194,14 @@ class TProducer : public IProducer, std::unordered_map> ContinuationTokens; std::uint64_t MemoryUsage = 0; + std::uint64_t CurrentSeqNo = 0; + EState State = EState::Init; + + std::vector InitWriteSessions; + std::unordered_map> InitGetMaxSeqNoFutures; + + std::mutex InitLock; + std::vector GotInitSeqNoPartitions; friend class TProducer; }; @@ -193,7 +218,7 @@ class TProducer : public IProducer, }; void MoveTo(EState state); - void UpdateMaxSeqNo(uint64_t maxSeqNo); + void UpdateMaxSeqNo(std::uint32_t partitionId, std::uint64_t maxSeqNo); void LaunchGetMaxSeqNoFutures(std::unique_lock& lock); void HandleDescribeResult(); @@ -212,6 +237,7 @@ class TProducer : public IProducer, std::uint64_t MaxSeqNo = 0; std::vector WriteSessions; std::vector> GetMaxSeqNoFutures; + std::unordered_map CachedMaxSeqNos; std::mutex Lock; std::uint64_t NotReadyFutures = 0; size_t Retries = 0; @@ -239,11 +265,11 @@ class TProducer : public IProducer, std::list::iterator AckQueueEnd(std::uint32_t partition); std::optional GetContinuationToken(); std::optional GetSessionClosedEvent(); + bool RunEventLoop(WrappedWriteSessionPtr wrappedSession, std::uint32_t partition); private: void HandleSessionClosedEvent(TSessionClosedEvent&& event, std::uint32_t partition); void HandleReadyToAcceptEvent(std::uint32_t partition, TWriteSessionEvent::TReadyToAcceptEvent&& event); - bool RunEventLoop(WrappedWriteSessionPtr wrappedSession, std::uint32_t partition); bool TransferEventsToOutputQueue(); void AddContinuationToken(); bool AddSessionClosedIfNeeded(); @@ -338,8 +364,6 @@ class TProducer : public IProducer, TDuration GetCloseTimeout(); - std::string GetProducerId(std::uint32_t partition); - void HandleAutoPartitioning(std::uint32_t partition); bool RunSplittedPartitionWorkers(); @@ -356,6 +380,8 @@ class TProducer : public IProducer, TWriteResult WriteInternal(TContinuationToken&&, TWriteMessage&& message); + bool IsFederation(const std::string& endpoint); + public: TProducer(const TProducerSettings& settings, std::shared_ptr client, @@ -393,6 +419,9 @@ class TProducer : public IProducer, ~TProducer(); private: + // for logging + std::string Id; + std::shared_ptr Connections; std::shared_ptr Client; TDbDriverStatePtr DbDriverState; @@ -404,7 +433,6 @@ class TProducer : public IProducer, TProducerSettings Settings; ESeqNoStrategy SeqNoStrategy = ESeqNoStrategy::NotInitialized; - TProducerSettings::EPartitionChooserStrategy PartitionChooserStrategy = TProducerSettings::EPartitionChooserStrategy::Hash; NThreading::TPromise ClosePromise; NThreading::TFuture CloseFuture; diff --git a/src/client/topic/impl/read_session_impl.h b/src/client/topic/impl/read_session_impl.h index c60558caab..99e96bf241 100644 --- a/src/client/topic/impl/read_session_impl.h +++ b/src/client/topic/impl/read_session_impl.h @@ -161,6 +161,7 @@ class TDeferredActions { void DeferReconnection(TCallbackContextPtr cbContext, TPlainStatus&& status); void DeferStartSession(TCallbackContextPtr cbContext); void DeferSignalWaiter(TWaiter&& waiter); + void DeferOnUserRetrievedEvent(TUserRetrievedEventsInfoAccumulator&& accumulator); void DeferDestroyDecompressionInfos(std::vector>&& infos); private: @@ -175,6 +176,7 @@ class TDeferredActions { void Reconnect(); void SignalWaiters(); void StartSessions(); + void OnUserRetrievedEvent(); void DestroyDecompressionInfos(); private: @@ -219,6 +221,9 @@ class TDeferredActions { // Contexts for sessions to start std::vector> CbContexts; + // User retrieved events info accumulator. + std::vector> UserRetrievedEventsInfoAccumulator; + std::vector> DecompressionInfos; }; diff --git a/src/client/topic/impl/read_session_impl.ipp b/src/client/topic/impl/read_session_impl.ipp index 78c3b6bbee..770dee20e1 100644 --- a/src/client/topic/impl/read_session_impl.ipp +++ b/src/client/topic/impl/read_session_impl.ipp @@ -237,7 +237,7 @@ void TRawPartitionStreamEventQueue::DeleteNotReadyTail(TDe } deferred.DeferDestroyDecompressionInfos(std::move(infos)); - accumulator.OnUserRetrievedEvent(); + deferred.DeferOnUserRetrievedEvent(std::move(accumulator)); swap(ready, NotReady); } @@ -3375,6 +3375,11 @@ void TDeferredActions::DeferSignalWaiter(TWaiter&& waiter) Waiters.emplace_back(std::move(waiter)); } +template +void TDeferredActions::DeferOnUserRetrievedEvent(TUserRetrievedEventsInfoAccumulator&& accumulator) { + UserRetrievedEventsInfoAccumulator.push_back(std::move(accumulator)); +} + template void TDeferredActions::DeferDestroyDecompressionInfos(std::vector>&& infos) { @@ -3392,6 +3397,7 @@ void TDeferredActions::DoActions() { Reconnect(); SignalWaiters(); StartSessions(); + OnUserRetrievedEvent(); DestroyDecompressionInfos(); } @@ -3478,6 +3484,13 @@ void TDeferredActions::SignalWaiters() { } } +template +void TDeferredActions::OnUserRetrievedEvent() { + for (auto& accumulator : UserRetrievedEventsInfoAccumulator) { + accumulator.OnUserRetrievedEvent(); + } +} + template void TDeferredActions::DestroyDecompressionInfos() { for (const auto& info : DecompressionInfos) { diff --git a/src/client/topic/ut/basic_usage_ut.cpp b/src/client/topic/ut/basic_usage_ut.cpp index f4c9367780..6d34cd7c1f 100644 --- a/src/client/topic/ut/basic_usage_ut.cpp +++ b/src/client/topic/ut/basic_usage_ut.cpp @@ -53,9 +53,8 @@ TString SerializeDataChunk(ui64 seqNo, const TString& payload) { } TWriteMessage CreateMessage(std::string_view payload, const std::string& key, ui64 seqNo) { - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(seqNo); - msg.Key(key); return msg; } @@ -68,6 +67,72 @@ std::string Serialize(const TExample& value) { return value.Payload; } +// Reads exactly expectedCount messages from the topic and asserts that within each partition +// messages are ordered by seqNo (strictly increasing). Uses provided client, topic path and consumer. +void ReadMessagesAndAssertOrderedBySeqNo(TTopicClient& client, + const std::string& topicPath, + const std::string& consumerName, + const std::string& expectedPayload, + size_t expectedCount, + TDuration timeout = TDuration::Seconds(30)) { + struct TMessageInfo { + ui64 PartitionId; + std::string ProducerId; + ui64 SeqNo; + std::string Data; + }; + std::vector messages; + messages.reserve(expectedCount); + NThreading::TPromise donePromise = NThreading::NewPromise(); + + TTopicReadSettings topicSettings(topicPath); + topicSettings.ReadFromTimestamp(TInstant::Zero()); + + auto readSettings = TReadSessionSettings() + .ConsumerName(consumerName) + .AutoPartitioningSupport(true) + .AppendTopics(topicSettings); + + readSettings.EventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) { + for (auto& msg : ev.GetMessages()) { + messages.push_back(TMessageInfo{ + msg.GetPartitionSession()->GetPartitionId(), + TString(msg.GetProducerId()), + msg.GetSeqNo(), + TString(msg.GetData()), + }); + } + if (messages.size() >= expectedCount) { + donePromise.SetValue(); + } + }, true); + + auto readSession = client.CreateReadSession(readSettings); + UNIT_ASSERT_C(donePromise.GetFuture().Wait(timeout), + "Expected to read " << expectedCount << " messages within " << timeout << ", got " << messages.size()); + readSession->Close(TDuration::Seconds(5)); + + UNIT_ASSERT_VALUES_EQUAL_C(messages.size(), expectedCount, + "Read message count mismatch: got " << messages.size() << ", expected " << expectedCount); + + // SeqNo ordering is guaranteed within one producer stream. + // Multiple producers can write into the same partition with independent seqNo sequences. + std::map, std::vector> byPartitionAndProducer; + for (const auto& m : messages) { + UNIT_ASSERT_VALUES_EQUAL(m.Data, expectedPayload); + byPartitionAndProducer[{m.PartitionId, m.ProducerId}].push_back(m.SeqNo); + } + for (const auto& [key, seqNos] : byPartitionAndProducer) { + const auto& [partitionId, producerId] = key; + for (size_t i = 1; i < seqNos.size(); ++i) { + UNIT_ASSERT_C(seqNos[i] > seqNos[i - 1], + "Partition " << partitionId << ", producerId " << producerId + << ": expected seqNo strictly increasing, got " + << seqNos[i - 1] << " then " << seqNos[i] << " at index " << i); + } + } +} + // Write a message with binary (non-UTF8) producer ID using direct tablet communication // This bypasses gRPC string validation by sending directly to the PQ tablet // The SourceId field in TCmdWrite is defined as 'bytes' in protobuf, so it supports binary data @@ -164,6 +229,9 @@ static std::string FindKeyForBucket(size_t bucket, size_t bucketsCount) { void CreateTopicWithAutoPartitioning(TTopicClient& client) { TCreateTopicSettings createSettings; createSettings + .BeginAddConsumer() + .ConsumerName(TEST_CONSUMER) + .EndAddConsumer() .BeginConfigurePartitioningSettings() .MinActivePartitions(2) .MaxActivePartitions(100) @@ -178,7 +246,7 @@ void CreateTopicWithAutoPartitioning(TTopicClient& client) { } void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSessionSettings writeSettings, const std::string& message, std::uint32_t count, - TTopicSdkTestSetup& setup, std::shared_ptr decompressor, ui32 restartPeriod = 7, ui32 maxRestartsCount = 10) + TTopicSdkTestSetup& setup, std::shared_ptr decompressor, ui32 restartPeriod = 7, ui32 maxRestartsCount = 10, ui64 shuffleRatio = 1, TDuration shuffleDelay = TDuration::MilliSeconds(10)) { auto client = setup.MakeClient(); auto session = client.CreateSimpleBlockingWriteSession(writeSettings); @@ -194,9 +262,21 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess TTopicClient topicClient = setup.MakeClient(); - auto WaitTasks = [&, timeout = TInstant::Now() + TDuration::Seconds(60)](auto f, size_t c) { - while (f() < c) { - UNIT_ASSERT(timeout > TInstant::Now()); + auto WaitTasks = [&](auto f, size_t c) { + const auto hardTimeout = TInstant::Now() + TDuration::Seconds(60); + const auto shuffleTimeout = TInstant::Now() + shuffleDelay; + while (true) { + const auto fVal = f(); + if (fVal >= c * shuffleRatio) { + return; + } + + const auto now = TInstant::Now(); + if (fVal >= c && now > shuffleTimeout) { + return; + } + + UNIT_ASSERT_GE(hardTimeout, now); ReadSession->WaitEvent(); std::this_thread::sleep_for(100ms); }; @@ -208,28 +288,23 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess WaitTasks([&]() { return e->GetExecutedCount(); }, count); }; - auto RunTasks = [&](auto e, const std::vector& tasks) { - size_t n = tasks.size(); - WaitPlannedTasks(e, n); + auto RunTask = [&](auto e) { + WaitPlannedTasks(e, 1); size_t completed = e->GetExecutedCount(); - e->StartFuncs(tasks); - WaitExecutedTasks(e, completed + n); + e->StartRandomFunc(); + WaitExecutedTasks(e, completed + 1); }; - Y_UNUSED(RunTasks); - auto PlanTasksAndRestart = [&](auto e, const std::vector& tasks) { - size_t n = tasks.size(); - WaitPlannedTasks(e, n); + auto PlanTaskAndRestart = [&](auto e) { + WaitPlannedTasks(e, 1); size_t completed = e->GetExecutedCount(); setup.GetServer().KillTopicPqrbTablet(JoinPath({TString(setup.MakeDriverConfig().GetDatabase()), TString(setup.GetTopicPath())})); std::this_thread::sleep_for(100ms); - e->StartFuncs(tasks); - WaitExecutedTasks(e, completed + n); + e->StartRandomFunc(); + WaitExecutedTasks(e, completed + 1); }; - Y_UNUSED(PlanTasksAndRestart); - NThreading::TPromise checkedPromise = NThreading::NewPromise(); TAtomic lastOffset = 0u; @@ -250,11 +325,12 @@ void WriteAndReadToEndWithRestarts(TReadSessionSettings readSettings, TWriteSess ui32 restartCount = 0; while (AtomicGet(lastOffset) + 1 < count) { if (restartCount < maxRestartsCount && i % restartPeriod == 1) { - PlanTasksAndRestart(decompressor, {i++}); + PlanTaskAndRestart(decompressor); restartCount++; } else { - RunTasks(decompressor, {i++}); + RunTask(decompressor); } + i++; } ReadSession->Close(TDuration::MilliSeconds(10)); @@ -945,6 +1021,32 @@ Y_UNIT_TEST_SUITE(BasicUsage) { WriteAndReadToEndWithRestarts(readSettings, writeSettings, message, count, setup, decompressor); } + Y_UNIT_TEST(ReadWithRestartsAndLargeDataAndShuffle) { + TTopicSdkTestSetup setup(TEST_CASE_NAME); + auto compressor = std::make_shared(); + auto decompressor = CreateThreadPoolManagedExecutor(1); + + TReadSessionSettings readSettings; + readSettings + .ConsumerName(setup.GetConsumerName()) + .MaxMemoryUsageBytes(10_MB) + .DecompressionExecutor(decompressor) + .AppendTopics(setup.GetTopicPath()) + // .DirectRead(EnableDirectRead) + ; + + TWriteSessionSettings writeSettings; + writeSettings + .Path(setup.GetTopicPath()).MessageGroupId(TEST_MESSAGE_GROUP_ID) + .Codec(ECodec::RAW) + .CompressionExecutor(compressor); + + std::uint32_t count = 3000; + std::string message(8'000, 'x'); + + WriteAndReadToEndWithRestarts(readSettings, writeSettings, message, count, setup, decompressor, 7, 10, 10); + } + Y_UNIT_TEST(ConflictingWrites) { TTopicSdkTestSetup setup(TEST_CASE_NAME); @@ -995,9 +1097,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Path(setup.GetTopicPath(TEST_TOPIC)) .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); std::atomic acksCount{0}; std::atomic closedCount{0}; @@ -1021,10 +1123,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { const ui64 messages = 5; for (ui64 i = 0; i < messages; ++i) { std::string payload = "payload"; - TWriteMessage msg(payload); + TWriteMessage msg("key-" + ToString(i), payload); msg.SeqNo(i + 1); - msg.Key("key-" + ToString(i)); - UNIT_ASSERT_C(session->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(session->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(session->Close(TDuration::Seconds(30)).IsSuccess(), "Failed to close keyed write session"); @@ -1041,12 +1142,32 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings .Path(setup.GetTopicPath(TEST_TOPIC)) .Codec(ECodec::RAW); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); UNIT_ASSERT_EXCEPTION(setup.MakeClient().CreateProducer(writeSettings), TContractViolation); } + Y_UNIT_TEST(Producer_IsNotSupportedForFederation) { + TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; + + auto config = setup.MakeDriverConfig(); + config.SetEndpoint("logbroker.yandex.net:2135"); + + TDriver driver(config); + TTopicClient federatedLikeClient(driver); + + TProducerSettings writeSettings; + writeSettings + .Path(setup.GetTopicPath(TEST_TOPIC)) + .Codec(ECodec::RAW); + writeSettings.ProducerIdPrefix(CreateGuidAsString()); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); + + UNIT_ASSERT_EXCEPTION(federatedLikeClient.CreateProducer(writeSettings), TContractViolation); + } + Y_UNIT_TEST(Producer_SessionClosedDueToUserError) { TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); @@ -1057,17 +1178,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Path(setup.GetTopicPath(TEST_TOPIC)) .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto session = publicClient.CreateProducer(writeSettings); std::string payload = "msg0"; - TWriteMessage msg(payload); + TWriteMessage msg("key", payload); msg.SeqNo(0); - msg.Key("key"); - UNIT_ASSERT_C(session->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(session->Write(std::move(msg)).IsQueued(), "Failed to write message"); auto flushResult = session->Flush().GetValueSync(); UNIT_ASSERT_C(flushResult.IsClosed(), "Failed to flush producer"); UNIT_ASSERT_C(flushResult.ClosedDescription->GetStatus() == EStatus::BAD_REQUEST, "Status is not BAD_REQUEST"); @@ -1094,9 +1214,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Path(setup.GetTopicPath(TEST_TOPIC)) .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto session = publicClient.CreateProducer(writeSettings); @@ -1109,17 +1229,15 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto seqNo = 1; for (ui64 i = 0; i < count0; ++i) { std::string payload = "msg0"; - TWriteMessage msg(payload); + TWriteMessage msg(key0, payload); msg.SeqNo(seqNo++); - msg.Key(key0); - UNIT_ASSERT_C(session->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(session->Write(std::move(msg)).IsQueued(), "Failed to write message"); } for (ui64 i = 0; i < count1; ++i) { std::string payload = "msg1"; - TWriteMessage msg(payload); + TWriteMessage msg(key1, payload); msg.SeqNo(seqNo++); - msg.Key(key1); - UNIT_ASSERT_C(session->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(session->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(session->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close keyed write session"); @@ -1177,7 +1295,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.PartitioningKeyHasher([](const std::string_view key) -> std::string { return std::string{key}; }); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = publicClient.CreateProducer(writeSettings); auto rawProducer = std::dynamic_pointer_cast(producer); @@ -1198,10 +1316,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } std::string payload = "msg"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(i + 1); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); @@ -1223,7 +1340,39 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } } - Y_UNIT_TEST(Producer_EventLoop_Acks) { + Y_UNIT_TEST(Producer_WriteManyMessages) { + TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; + setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 4); + + auto client = setup.MakeClient(); + + TProducerSettings writeSettings; + writeSettings + .Path(setup.GetTopicPath(TEST_TOPIC)) + .Codec(ECodec::RAW); + writeSettings.ProducerIdPrefix(CreateGuidAsString()); + writeSettings.SubSessionIdleTimeout(TDuration::Seconds(10)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); + + auto producer = client.CreateProducer(writeSettings); + std::string payload = "data"; + + const ui64 count = 3000; + for (ui64 i = 1; i <= count; ++i) { + auto key = CreateGuidAsString(); + TWriteMessage msg(key, payload); + msg.SeqNo(i); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); + } + + UNIT_ASSERT_C(producer->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); + UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); + + ReadMessagesAndAssertOrderedBySeqNo(client, setup.GetTopicPath(TEST_TOPIC), setup.GetConsumerName(), payload, count); + } + + Y_UNIT_TEST(Producer_AutoSeqNo) { TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 4); @@ -1235,8 +1384,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(10)); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); @@ -1244,14 +1393,25 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 1; i <= count; ++i) { auto key = CreateGuidAsString(); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(i); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(producer->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); + + auto producer2 = client.CreateProducer(writeSettings); + std::string payload = "data"; + for (ui64 i = 1; i <= count; ++i) { + auto key = CreateGuidAsString(); + TWriteMessage msg(key, payload); + UNIT_ASSERT_C(producer2->Write(std::move(msg)).IsQueued(), "Failed to write message"); + } + UNIT_ASSERT_C(producer2->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); + UNIT_ASSERT_C(producer2->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); + + ReadMessagesAndAssertOrderedBySeqNo(client, setup.GetTopicPath(TEST_TOPIC), setup.GetConsumerName(), payload, count * 2); } Y_UNIT_TEST(Producer_WriteToClosedProducer) { @@ -1266,8 +1426,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(10)); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); @@ -1275,19 +1435,17 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 1; i <= count; ++i) { auto key = CreateGuidAsString(); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(i); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(producer->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(CreateGuidAsString(), payload); msg.SeqNo(count + 1); - msg.Key(CreateGuidAsString()); auto writeResult = producer->Write(std::move(msg)); UNIT_ASSERT_C(writeResult.IsError(), "Failed to write message"); UNIT_ASSERT_C(writeResult.ErrorMessage == "producer is closed", "Error message is not correct"); @@ -1306,8 +1464,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); @@ -1324,12 +1482,11 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 0; i < perThread; ++i) { const ui64 seqNo = nextSeqNo.fetch_add(1); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(seqNo); - msg.Key(key); auto writeResult = producer->Write(std::move(msg)); UNIT_ASSERT_C( - writeResult.IsSuccess(), + writeResult.IsQueued(), "Failed to write message" ); } @@ -1356,8 +1513,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(5)); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); @@ -1367,10 +1524,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 0; i < messages; ++i) { auto key = CreateGuidAsString(); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(seqNo++); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(producer->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); @@ -1379,17 +1535,16 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 0; i < messages; ++i) { auto key = CreateGuidAsString(); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(seqNo++); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } UNIT_ASSERT_C(producer->Flush().GetValueSync().IsSuccess(), "Failed to flush producer"); UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); } - Y_UNIT_TEST(KeyedWriteSession_BoundPartitionChooser_SplitPartition_MultiThreadedAcksOrder) { + Y_UNIT_TEST(Producer_BoundPartitionChooser_SplitPartition_MultiThreadedAcksOrder) { NKikimr::NPQ::NTest::TTopicSdkTestSetup setup = NKikimr::NPQ::NTest::CreateSetup(); setup.CreateTopicWithAutoscale(TEST_TOPIC, TEST_CONSUMER, 1, 100); @@ -1405,7 +1560,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.PartitioningKeyHasher([](const std::string_view key) -> std::string { return std::string{key}; }); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); @@ -1415,10 +1570,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 1; i <= messages; ++i) { auto key = CreateGuidAsString(); std::string payload = "data"; - TWriteMessage msg(payload); + TWriteMessage msg(key, payload); msg.SeqNo(i); - msg.Key(key); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } }); @@ -1435,7 +1589,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT_C(producer->Close(TDuration::Seconds(30)).IsSuccess(), "Failed to close producer"); } - Y_UNIT_TEST(KeyedWriteSession_CloseTimeout) { + Y_UNIT_TEST(Producer_CloseTimeout) { TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 3); @@ -1446,18 +1600,17 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Path(setup.GetTopicPath(TEST_TOPIC)) .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); for (int i = 0; i < 1000; ++i) { std::string payload = "message-" + ToString(i); - TWriteMessage msg(payload); + TWriteMessage msg("key1", payload); msg.SeqNo(i + 1); - msg.Key("key1"); - UNIT_ASSERT_C(producer->Write(std::move(msg)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(producer->Write(std::move(msg)).IsQueued(), "Failed to write message"); } // Test Close timeout @@ -1469,7 +1622,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { // block longer than the timeout and that the session eventually closes successfully. UNIT_ASSERT_C( result.IsSuccess() || result.IsTimeout(), - TStringBuilder() << "Failed to close keyed write session, status: " << static_cast(result.Status) + TStringBuilder() << "Failed to close producer, status: " << static_cast(result.Status) ); const TDuration actualDuration = TInstant::Now() - startTime; @@ -1499,7 +1652,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings1.ProducerIdPrefix("autopartitioning_keyed_1"); writeSettings1.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Bound); writeSettings1.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings1.MaxBlock(TDuration::Seconds(30)); + writeSettings1.MaxBlockTimeout(TDuration::Seconds(30)); TProducerSettings writeSettings2 = writeSettings1; writeSettings2.ProducerIdPrefix("autopartitioning_keyed_2"); @@ -1518,7 +1671,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { if (key.empty()) { key = "lalala"; } - UNIT_ASSERT_C(s->Write(CreateMessage(payload, key, seqNo)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(s->Write(CreateMessage(payload, key, seqNo)).IsQueued(), "Failed to write message"); }; { @@ -1589,6 +1742,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT(producer1->Close(TDuration::Seconds(30)).IsSuccess()); UNIT_ASSERT(producer2->Close(TDuration::Seconds(30)).IsSuccess()); UNIT_ASSERT(producer3->Close(TDuration::Seconds(30)).IsSuccess()); + + ReadMessagesAndAssertOrderedBySeqNo(client, setup.GetTopicPath(TEST_TOPIC), setup.GetConsumerName(), msgData, 14); } Y_UNIT_TEST(AutoPartitioning_Producer_SmallMessages) { @@ -1609,7 +1764,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings1.ProducerIdPrefix("autopartitioning_keyed_small_1"); writeSettings1.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Bound); writeSettings1.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings1.MaxBlock(TDuration::Seconds(30)); + writeSettings1.MaxBlockTimeout(TDuration::Seconds(30)); TProducerSettings writeSettings2 = writeSettings1; writeSettings2.ProducerIdPrefix("autopartitioning_keyed_small_2"); @@ -1628,7 +1783,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto writeMessage = [&](std::shared_ptr s, std::string_view payload, ui64 seqNo) { auto key = keys[seqNo % keys.size()]; if (key.empty()) key = "a"; - UNIT_ASSERT_C(s->Write(CreateMessage(payload, key, seqNo)).IsSuccess(), "Failed to write message"); + UNIT_ASSERT_C(s->Write(CreateMessage(payload, key, seqNo)).IsQueued(), "Failed to write message"); }; { @@ -1665,6 +1820,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT(producer1->Close(TDuration::Seconds(30)).IsSuccess()); UNIT_ASSERT(producer2->Close(TDuration::Seconds(30)).IsSuccess()); + + ReadMessagesAndAssertOrderedBySeqNo(client, setup.GetTopicPath(TEST_TOPIC), setup.GetConsumerName(), msgData, totalMessages); } Y_UNIT_TEST(Producer_BasicWrite) { @@ -1678,13 +1835,13 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.Path(setup.GetTopicPath(TEST_TOPIC)); writeSettings.Codec(ECodec::RAW); writeSettings.ProducerIdPrefix("producer_basic_write"); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); auto producer = client.CreateProducer(writeSettings); auto msgData = TString(10_KB, 'a'); for (ui64 i = 0; i < 100; ++i) { - UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsSuccess()); + UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsQueued()); } UNIT_ASSERT(producer->Flush().GetValueSync().IsSuccess()); @@ -1701,6 +1858,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT_EQUAL(messagesWritten, 100); UNIT_ASSERT_C(producer->Close(TDuration::Seconds(1)).IsSuccess(), "Failed to close producer"); + + ReadMessagesAndAssertOrderedBySeqNo(client, setup.GetTopicPath(TEST_TOPIC), setup.GetConsumerName(), msgData, 100); } Y_UNIT_TEST(TypedProducer_BasicWrite) { @@ -1716,11 +1875,8 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.Path(setup.GetTopicPath(TEST_TOPIC)); writeSettings.Codec(ECodec::RAW); writeSettings.ProducerIdPrefix("producer_basic_write"); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.KeyProducer([](const TWriteMessage& message) -> std::string { - return ToString(MurmurHash(message.Data.data(), message.Data.size())); - }); - writeSettings.MaxBlock(TDuration::Seconds(1)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(1)); auto producer = client.CreateTypedProducer(writeSettings); @@ -1729,7 +1885,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { for (ui64 i = 0; i < messageCount; ++i) { auto payload = CreateGuidAsString(); sentPayloads.push_back(payload); - UNIT_ASSERT(producer->Write(TExample{.Payload = payload}).IsSuccess()); + UNIT_ASSERT(producer->Write(TExample{.Payload = payload}).IsQueued()); } UNIT_ASSERT(producer->Flush().GetValueSync().IsSuccess()); @@ -1776,9 +1932,9 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.Path(setup.GetTopicPath(TEST_TOPIC)); writeSettings.Codec(ECodec::RAW); writeSettings.ProducerIdPrefix("producer_basic_write"); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::MilliSeconds(500)); - writeSettings.MaxBlock(TDuration::Seconds(1)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(1)); auto describeResult = client.DescribeTopic(TEST_TOPIC).GetValueSync(); const auto& partitions = describeResult.GetTopicDescription().GetPartitions(); @@ -1787,20 +1943,26 @@ Y_UNIT_TEST_SUITE(BasicUsage) { auto producer = client.CreateProducer(writeSettings); auto producerRaw = dynamic_cast(producer.get()); auto msgData = TString(10_KB, 'a'); + for (const auto& partition : partitions) { + for (ui64 i = 0; i < 10; ++i) { + TWriteMessage msg(partition.GetPartitionId(), msgData); + UNIT_ASSERT(producer->Write(std::move(msg)).IsQueued()); + } + } + UNIT_ASSERT(producer->Flush().GetValueSync().IsSuccess()); - for (ui64 i = 0; i < 3; ++i) { - for (const auto& partition : partitions) { - for (ui64 i = 0; i < 10; ++i) { - TWriteMessage msg(msgData); - msg.Partition(partition.GetPartitionId()); - UNIT_ASSERT(producer->Write(std::move(msg)).IsSuccess()); - } - UNIT_ASSERT(producer->Flush().GetValueSync().IsSuccess()); - UNIT_ASSERT((producerRaw->GetIdleSessionsCount() == 1 && producerRaw->GetSessionsCount() == 1) || - (producerRaw->GetIdleSessionsCount() == 0 && producerRaw->GetSessionsCount() == 0)); - Sleep(TDuration::Seconds(1)); + size_t idleSessionsCount = 0; + size_t sessionsCount = 0; + for (int i = 0; i < 5; ++i) { + idleSessionsCount = producerRaw->GetIdleSessionsCount(); + sessionsCount = producerRaw->GetSessionsCount(); + if (idleSessionsCount == 0 && sessionsCount == 0) { + break; } + + Sleep(TDuration::Seconds(1)); } + UNIT_ASSERT_C(idleSessionsCount == 0 && sessionsCount == 0, "Idle session count: " << idleSessionsCount << ", sessions count: " << sessionsCount); { auto describeResult = client.DescribeTopic(TEST_TOPIC, TDescribeTopicSettings().IncludeStats(true)).GetValueSync(); @@ -1810,104 +1972,11 @@ Y_UNIT_TEST_SUITE(BasicUsage) { UNIT_ASSERT(stats); messagesWritten += stats->GetEndOffset() - stats->GetStartOffset(); } - UNIT_ASSERT_EQUAL(messagesWritten, 150); + UNIT_ASSERT_EQUAL(messagesWritten, 50); } UNIT_ASSERT(producer->Close(TDuration::Seconds(1)).IsSuccess()); } - Y_UNIT_TEST(Producer_CustomKeyProducerFunction) { - TTopicSdkTestSetup setup{TEST_CASE_NAME, TTopicSdkTestSetup::MakeServerSettings(), false}; - setup.CreateTopic(TEST_TOPIC, TEST_CONSUMER, 2); - - // Capture partition ids in the same order as DescribeTopic returns them - // (the keyed session uses the same DescribeTopic ordering to map hash bucket -> partition id). - auto publicClient = setup.MakeClient(); - auto describeTopicSettings = TDescribeTopicSettings().IncludeStats(true); - auto before = publicClient.DescribeTopic(setup.GetTopicPath(TEST_TOPIC), describeTopicSettings).GetValueSync(); - UNIT_ASSERT_C(before.IsSuccess(), before.GetIssues().ToOneLineString()); - const auto& beforePartitions = before.GetTopicDescription().GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(beforePartitions.size(), 2); - const ui64 partitionId0 = beforePartitions[0].GetPartitionId(); - const ui64 partitionId1 = beforePartitions[1].GetPartitionId(); - - constexpr auto keyAttributeName = "__key"; - - TProducerSettings writeSettings; - writeSettings - .Path(setup.GetTopicPath(TEST_TOPIC)) - .Codec(ECodec::RAW); - writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); - writeSettings.KeyProducer([](const TWriteMessage& message) -> std::string { - for (const auto& [attributeName, attributeValue] : message.MessageMeta_) { - if (attributeName == keyAttributeName) { - return attributeValue; - } - } - return ""; - }); - writeSettings.MaxBlock(TDuration::Seconds(1)); - - auto producer = publicClient.CreateProducer(writeSettings); - - const std::string key0 = FindKeyForBucket(0, 2); - const std::string key1 = FindKeyForBucket(1, 2); - - const ui64 count0 = 7; - const ui64 count1 = 11; - - auto seqNo = 1; - for (ui64 i = 0; i < count0; ++i) { - std::string payload = "msg0"; - TWriteMessage msg(payload); - msg.SeqNo(seqNo++); - msg.MessageMeta_.emplace_back(keyAttributeName, key0); - UNIT_ASSERT(producer->Write(std::move(msg)).IsSuccess()); - } - for (ui64 i = 0; i < count1; ++i) { - std::string payload = "msg1"; - TWriteMessage msg(payload); - msg.SeqNo(seqNo++); - msg.MessageMeta_.emplace_back(keyAttributeName, key1); - UNIT_ASSERT(producer->Write(std::move(msg)).IsSuccess()); - } - - UNIT_ASSERT_C(producer->Close(TDuration::Seconds(10)).IsSuccess(), "Failed to close producer"); - UNIT_ASSERT_VALUES_EQUAL(producer->GetWriteStats().MessagesWritten, count0 + count1); - UNIT_ASSERT_VALUES_EQUAL(producer->GetWriteStats().LastWrittenSeqNo, seqNo - 1); - - auto after = publicClient.DescribeTopic(setup.GetTopicPath(TEST_TOPIC), describeTopicSettings).GetValueSync(); - UNIT_ASSERT_C(after.IsSuccess(), after.GetIssues().ToOneLineString()); - const auto& afterPartitions = after.GetTopicDescription().GetPartitions(); - UNIT_ASSERT_VALUES_EQUAL(afterPartitions.size(), 2); - - std::unordered_map endOffsets; - for (const auto& p : afterPartitions) { - auto stats = p.GetPartitionStats(); - UNIT_ASSERT(stats.has_value()); - endOffsets[p.GetPartitionId()] = stats->GetEndOffset(); - } - - auto it0 = endOffsets.find(partitionId0); - auto it1 = endOffsets.find(partitionId1); - UNIT_ASSERT(it0 != endOffsets.end()); - UNIT_ASSERT(it1 != endOffsets.end()); - - const ui64 endOffset0 = it0->second; - const ui64 endOffset1 = it1->second; - - // Partition ordering in DescribeTopic is not a part of public API contract, so allow swapping. - UNIT_ASSERT_VALUES_EQUAL(endOffset0 + endOffset1, count0 + count1); - UNIT_ASSERT_C( - (endOffset0 == count0 && endOffset1 == count1) || (endOffset0 == count1 && endOffset1 == count0), - TStringBuilder() << "Unexpected end offsets distribution: " - << "partitionId0=" << partitionId0 << " endOffset0=" << endOffset0 << ", " - << "partitionId1=" << partitionId1 << " endOffset1=" << endOffset1 << ", " - << "expected (" << count0 << "," << count1 << ") in any order" - ); - } - Y_UNIT_TEST(Producer_BlockingWrite) { auto settings = TTopicSdkTestSetup::MakeServerSettings(); settings.PQConfig.SetUseSrcIdMetaMappingInFirstClass(true); @@ -1919,14 +1988,14 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.Path(setup.GetTopicPath(TEST_TOPIC)); writeSettings.Codec(ECodec::RAW); writeSettings.ProducerIdPrefix("simple_blocking_producer_basic_write"); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); - writeSettings.MaxBlock(TDuration::Seconds(1)); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); + writeSettings.MaxBlockTimeout(TDuration::Seconds(1)); auto producer = client.CreateProducer(writeSettings); auto msgData = TString(10_KB, 'a'); for (ui64 i = 0; i < 100; ++i) { - UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsSuccess()); + UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsQueued()); } UNIT_ASSERT(producer->Flush().GetValueSync().IsSuccess()); @@ -1945,14 +2014,14 @@ Y_UNIT_TEST_SUITE(BasicUsage) { writeSettings.Path(setup.GetTopicPath(TEST_TOPIC)); writeSettings.Codec(ECodec::RAW); writeSettings.ProducerIdPrefix("simple_blocking_producer_basic_write"); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.MaxMemoryUsage(100_KB); - writeSettings.MaxBlock(TDuration::MilliSeconds(1)); + writeSettings.MaxBlockTimeout(TDuration::MilliSeconds(1)); auto producer = client.CreateProducer(writeSettings); auto msgData = TString(1_MB, 'a'); - UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsSuccess()); + UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsQueued()); UNIT_ASSERT(producer->Write(TWriteMessage(msgData)).IsTimeout()); UNIT_ASSERT(producer->Close(TDuration::Seconds(10)).IsSuccess()); } diff --git a/src/client/topic/ut/local_partition_ut.cpp b/src/client/topic/ut/local_partition_ut.cpp index b0f94be2c8..12f1c0dd18 100644 --- a/src/client/topic/ut/local_partition_ut.cpp +++ b/src/client/topic/ut/local_partition_ut.cpp @@ -34,7 +34,6 @@ namespace NYdb::inline V3::NTopic::NTests { TTopicSdkTestSetup CreateSetupForSplitMerge(const std::string& testCaseName) { NKikimrConfig::TFeatureFlags ff; - ff.SetEnableTopicSplitMerge(true); auto settings = TTopicSdkTestSetup::MakeServerSettings(); settings.SetFeatureFlags(ff); auto setup = TTopicSdkTestSetup(testCaseName, settings, false); diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.cpp b/src/client/topic/ut/ut_utils/txusage_fixture.cpp index 3b33e5f506..3a601c0dcb 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.cpp +++ b/src/client/topic/ut/ut_utils/txusage_fixture.cpp @@ -36,10 +36,6 @@ TFixture::TTableRecord::TTableRecord(const std::string& key, const std::string& void TFixture::SetUp(NUnitTest::TTestContext&) { NKikimr::Tests::TServerSettings settings = TTopicSdkTestSetup::MakeServerSettings(); - settings.SetEnableTopicServiceTx(true); - settings.SetEnableTopicSplitMerge(true); - settings.SetEnableOltpSink(GetEnableOltpSink()); - settings.SetEnableOlapSink(GetEnableOlapSink()); settings.SetEnableHtapTx(GetEnableHtapTx()); settings.SetAllowOlapDataQuery(GetAllowOlapDataQuery()); @@ -1088,16 +1084,6 @@ auto TFixture::GetAvgWriteBytes(const std::string& topicName, return result; } -bool TFixture::GetEnableOltpSink() const -{ - return false; -} - -bool TFixture::GetEnableOlapSink() const -{ - return false; -} - bool TFixture::GetEnableHtapTx() const { return false; @@ -2103,16 +2089,6 @@ void TFixtureSinks::CreateColumnTable(const std::string& tablePath) UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); } -bool TFixtureSinks::GetEnableOltpSink() const -{ - return true; -} - -bool TFixtureSinks::GetEnableOlapSink() const -{ - return true; -} - bool TFixtureSinks::GetEnableHtapTx() const { return true; diff --git a/src/client/topic/ut/ut_utils/txusage_fixture.h b/src/client/topic/ut/ut_utils/txusage_fixture.h index 5ee506491b..634492186b 100644 --- a/src/client/topic/ut/ut_utils/txusage_fixture.h +++ b/src/client/topic/ut/ut_utils/txusage_fixture.h @@ -268,8 +268,6 @@ class TFixture : public NUnitTest::TBaseFixture { std::uint32_t partitionId, const std::string& boundary); - virtual bool GetEnableOltpSink() const; - virtual bool GetEnableOlapSink() const; virtual bool GetEnableHtapTx() const; virtual bool GetAllowOlapDataQuery() const; @@ -404,8 +402,6 @@ class TFixtureSinks : public TFixture { void CreateRowTable(const std::string& path); void CreateColumnTable(const std::string& tablePath); - bool GetEnableOltpSink() const override; - bool GetEnableOlapSink() const override; bool GetEnableHtapTx() const override; bool GetAllowOlapDataQuery() const override; diff --git a/tests/integration/topic/basic_usage_it.cpp b/tests/integration/topic/basic_usage_it.cpp index d8f63572e1..ceef48a74a 100644 --- a/tests/integration/topic/basic_usage_it.cpp +++ b/tests/integration/topic/basic_usage_it.cpp @@ -875,22 +875,22 @@ TEST_F(BasicUsage, TEST_NAME(TProducerBasicWrite_NoAutoPartitioning)) { .Path(GetTopicPath(TOPIC_NAME)) .Codec(ECodec::RAW); writeSettings.ProducerIdPrefix(CreateGuidAsString()); - writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::Hash); + writeSettings.PartitionChooserStrategy(TProducerSettings::EPartitionChooserStrategy::KafkaHash); writeSettings.SubSessionIdleTimeout(TDuration::Seconds(30)); writeSettings.PartitioningKeyHasher([](const std::string_view key) -> std::string { return std::string{key}; }); - writeSettings.MaxBlock(TDuration::Seconds(30)); + writeSettings.MaxBlockTimeout(TDuration::Seconds(30)); auto producer = client.CreateProducer(writeSettings); auto keyedSession = std::dynamic_pointer_cast(producer); + std::string data = "message"; for (size_t i = 0; i < 100; ++i) { auto key = CreateGuidAsString(); - TWriteMessage msg("msg"); + TWriteMessage msg(key, data); msg.SeqNo(i + 1); - msg.Key(key); - ASSERT_TRUE(producer->Write(std::move(msg)).IsSuccess()); + ASSERT_TRUE(producer->Write(std::move(msg)).IsQueued()); } ASSERT_TRUE(producer->Close(TDuration::Seconds(10)).IsSuccess()); diff --git a/tests/integration/topic/utils/managed_executor.cpp b/tests/integration/topic/utils/managed_executor.cpp index c96bc1d071..fa51953932 100644 --- a/tests/integration/topic/utils/managed_executor.cpp +++ b/tests/integration/topic/utils/managed_executor.cpp @@ -1,5 +1,6 @@ #include "managed_executor.h" +#include namespace NYdb::inline V3::NTopic::NTests { @@ -50,6 +51,26 @@ void TManagedExecutor::RunTask(TFunction&& func) Executor->Post(MakeTask(std::move(func))); } +void TManagedExecutor::StartRandomFunc() { + std::lock_guard lock(Mutex); + + Y_ABORT_UNLESS(Planned > 0); + size_t index = RandomNumber(Planned); + + for (size_t i = 0; i < Funcs.size(); ++i) { + if (Funcs[i] != nullptr) { + if (index == 0) { + RunTask(std::move(Funcs[i])); + Funcs[i] = nullptr; + return; + } + --index; + } + } + + Y_ABORT("No functions to start"); +} + void TManagedExecutor::StartFuncs(const std::vector& indicies) { std::lock_guard lock(Mutex); diff --git a/tests/integration/topic/utils/managed_executor.h b/tests/integration/topic/utils/managed_executor.h index d379f469a3..3076ae4044 100644 --- a/tests/integration/topic/utils/managed_executor.h +++ b/tests/integration/topic/utils/managed_executor.h @@ -19,6 +19,7 @@ class TManagedExecutor : public IExecutor { void Stop() override; + void StartRandomFunc(); void StartFuncs(const std::vector& indicies); size_t GetFuncsCount() const;