Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/import_generation.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
37
38
2 changes: 1 addition & 1 deletion .github/last_commit.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
abde8e11540c2ccb2c8b2d9d15b2bed3d3fbaf7e
688edef85e4e4e55828630ef3943a91ca9306799
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added support for the new inverted index type: JSON, intended to speed-up queries on Json or JsonDocument columns.

* EXPERIMENTAL! Added `IProducer` interface to the SDK. This interface is used to write messages to a topic.
Each message can be associated with a partitioning key, which is used to determine the partition to which the message will be written.

Expand Down
7 changes: 3 additions & 4 deletions examples/topic_writer/producer/basic_write/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ std::shared_ptr<NYdb::NTopic::IProducer> CreateProducer(const std::string& topic
producerSettings.ProducerIdPrefix("producer_basic");
producerSettings.PartitionChooserStrategy(NYdb::NTopic::TProducerSettings::EPartitionChooserStrategy::Bound);
producerSettings.SubSessionIdleTimeout(TDuration::Seconds(30));
producerSettings.MaxBlock(TDuration::Seconds(30));
producerSettings.MaxBlockTimeout(TDuration::Seconds(30));
producerSettings.MaxMemoryUsage(100_MB);
return topicClient.CreateProducer(producerSettings);
}
Expand Down Expand Up @@ -66,7 +66,7 @@ void WriteWithHandlingResult(std::shared_ptr<NYdb::NTopic::IProducer> producer,

for (size_t retries = 0; retries < MAX_RETRIES; retries++) {
auto writeResult = producer->Write(std::move(writeMessage));
if (writeResult.IsSuccess()) {
if (writeResult.IsQueued()) {
// if write was successful, we can continue writing messages
continue;
}
Expand Down Expand Up @@ -115,8 +115,7 @@ int main() {
auto messageData = std::string(1_KB, 'a');

for (int i = 0; i < 10; i++) {
NYdb::NTopic::TWriteMessage writeMessage(messageData);
writeMessage.Key("key" + ToString(i));
NYdb::NTopic::TWriteMessage writeMessage("key" + ToString(i), messageData);
WriteWithHandlingResult(producer, std::move(writeMessage));
}
return 0;
Expand Down
3 changes: 3 additions & 0 deletions include/ydb-cpp-sdk/client/table/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,9 @@ class TTableDescription {
// fulltext
void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector<std::string>& indexColumns, const TFulltextIndexSettings& indexSettings);
void AddFulltextIndex(const std::string& indexName, EIndexType type, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns, const TFulltextIndexSettings& indexSettings);
// json
void AddJsonIndex(const std::string& indexName, const std::vector<std::string>& indexColumns);
void AddJsonIndex(const std::string& indexName, const std::vector<std::string>& indexColumns, const std::vector<std::string>& dataColumns);

// default
void AddSecondaryIndex(const std::string& indexName, const std::vector<std::string>& indexColumns);
Expand Down
1 change: 1 addition & 0 deletions include/ydb-cpp-sdk/client/table/table_enum.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ enum class EIndexType {
GlobalVectorKMeansTree,
GlobalFulltextPlain,
GlobalFulltextRelevance,
GlobalJson,

Unknown = std::numeric_limits<int>::max()
};
Expand Down
15 changes: 5 additions & 10 deletions include/ydb-cpp-sdk/client/topic/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ struct TProducerSettings : public TWriteSessionSettings {

enum class EPartitionChooserStrategy {
Bound,
Hash,
KafkaHash,
};

TProducerSettings() = default;
Expand All @@ -39,14 +39,9 @@ struct TProducerSettings : public TWriteSessionSettings {
//! ProducerId is generated as ProducerIdPrefix + partition id.
FLUENT_SETTING(std::string, ProducerIdPrefix);

//! SessionID to use.
FLUENT_SETTING_DEFAULT(std::string, SessionId, "");

//! Maximum block time for write. If set, write will block for up to MaxBlockMs when the buffer is overloaded.
FLUENT_SETTING_DEFAULT(TDuration, MaxBlock, TDuration::Zero());

//! Key producer function.
FLUENT_SETTING_OPTIONAL(std::function<std::string(const TWriteMessage& message)>, KeyProducer);
//! Maximum block timeout for write. If set, write will block for up to MaxBlockTimeout when the buffer is overloaded.
//! If not set, Write will block until the message is written to the buffer.
FLUENT_SETTING_DEFAULT(TDuration, MaxBlockTimeout, TDuration::Max());

private:
using TWriteSessionSettings::ProducerId;
Expand Down Expand Up @@ -88,7 +83,7 @@ struct TWriteResult {
//! Value is std::nullopt if the session is not closed.
std::optional<TCloseDescription> ClosedDescription;

bool IsSuccess() const {
bool IsQueued() const {
return Status == EWriteStatus::Queued;
}

Expand Down
44 changes: 30 additions & 14 deletions include/ydb-cpp-sdk/client/topic/write_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,16 @@ struct TWriteMessage {
: Data(data)
{}

TWriteMessage(const std::string& key, std::string_view data)
: Data(data)
, Key(key)
{}

TWriteMessage(uint32_t partition, std::string_view data)
: Data(data)
, Partition(partition)
{}

TWriteMessage(const TWriteMessage& other)
: DataHolder(other.DataHolder)
, Data(other.DataHolder ? std::string_view(*DataHolder) : other.Data)
Expand All @@ -211,9 +221,9 @@ struct TWriteMessage {
, SeqNo_(other.SeqNo_)
, CreateTimestamp_(other.CreateTimestamp_)
, MessageMeta_(other.MessageMeta_)
, Key_(other.Key_)
, Partition_(other.Partition_)
, Tx_(other.Tx_)
, Key(other.Key)
, Partition(other.Partition)
{}

TWriteMessage(TWriteMessage&& other) noexcept
Expand All @@ -224,9 +234,9 @@ struct TWriteMessage {
, SeqNo_(std::move(other.SeqNo_))
, CreateTimestamp_(std::move(other.CreateTimestamp_))
, MessageMeta_(std::move(other.MessageMeta_))
, Key_(std::move(other.Key_))
, Partition_(std::move(other.Partition_))
, Tx_(std::move(other.Tx_))
, Key(std::move(other.Key))
, Partition(std::move(other.Partition))
{}

TWriteMessage& operator=(const TWriteMessage& other) {
Expand All @@ -241,8 +251,8 @@ struct TWriteMessage {
SeqNo_ = other.SeqNo_;
CreateTimestamp_ = other.CreateTimestamp_;
MessageMeta_ = other.MessageMeta_;
Key_ = other.Key_;
Partition_ = other.Partition_;
Key = other.Key;
Partition = other.Partition;
Tx_ = other.Tx_;

return *this;
Expand All @@ -260,8 +270,8 @@ struct TWriteMessage {
SeqNo_ = std::move(other.SeqNo_);
CreateTimestamp_ = std::move(other.CreateTimestamp_);
MessageMeta_ = std::move(other.MessageMeta_);
Key_ = std::move(other.Key_);
Partition_ = std::move(other.Partition_);
Key = std::move(other.Key);
Partition = std::move(other.Partition);
Tx_ = std::move(other.Tx_);

return *this;
Expand Down Expand Up @@ -304,19 +314,25 @@ struct TWriteMessage {
//! Message metadata. Limited to 4096 characters overall (all keys and values combined).
FLUENT_SETTING(TMessageMeta, MessageMeta);

//! Message key. It will be used to route message to the partition.
FLUENT_SETTING_OPTIONAL(std::string, Key);

//! Partition to write to. It is not recommended to use this option, use Key instead.
FLUENT_SETTING_OPTIONAL(std::uint32_t, Partition);

//! Transaction id
FLUENT_SETTING_OPTIONAL(std::reference_wrapper<TTransactionBase>, Tx);

TTransactionBase* GetTxPtr() const
{
return Tx_ ? &Tx_->get() : nullptr;
}

const std::optional<std::string>& GetKey() const {
return Key;
}

const std::optional<uint32_t>& GetPartition() const {
return Partition;
}

private:
std::optional<std::string> Key;
std::optional<uint32_t> Partition;
};

//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
Expand Down
4 changes: 2 additions & 2 deletions src/api/protos/draft/ydb_nbs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ message DeletePartitionResult {

message GetLoadActorAdapterActorIdRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Partition tablet id (actor id string).
string TabletId = 2;
// Disk id.
string DiskId = 2;
}

message GetLoadActorAdapterActorIdResponse {
Expand Down
80 changes: 80 additions & 0 deletions src/api/protos/draft/ydb_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import "src/api/protos/ydb_operation.proto";
import "src/api/protos/ydb_scheme.proto";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

package Ydb.Replication;
option java_package = "com.yandex.ydb.replication";
Expand Down Expand Up @@ -101,13 +102,23 @@ message DescribeTransferRequest {
Ydb.Operations.OperationParams operation_params = 1;
// Replication path.
string path = 2 [(required) = true];
// Include detailed statistics for each worker.
optional bool include_stats = 3;
}

message DescribeTransferResponse {
// Result of request will be inside operation.
Ydb.Operations.Operation operation = 1;
}

// Message representing sliding window statistics by several windows.
message MultipleWindowsStat {
// Average per minute.
google.protobuf.Duration avg_per_minute = 1;
// Average per hour.
google.protobuf.Duration avg_per_hour = 2;
}

message DescribeTransferResult {
message RunningState {
}
Expand All @@ -122,6 +133,74 @@ message DescribeTransferResult {
message PausedState {
}

message Stats {
enum WorkState {
// No reported state.
STATE_UNSPECIFIED = 0;
// Reading data from topic.
STATE_READ = 1;
// Decompressing read data.
STATE_DECOMPRESS = 2;
// Rrocessing data with lambda.
STATE_PROCESS = 3;
// Writing data to table.
STATE_WRITE = 4;
}

message WorkerStats {
// unique worker identifier
string worker_id = 1;
// last reported work state
WorkState state = 2;
// timestamp of last state change
google.protobuf.Timestamp last_state_change = 3;
// partition of the topic that worker is reading
int64 partition_id = 4;
// current read offset in partition
int64 read_offset = 5;
// how long this worker is being run for
google.protobuf.Duration uptime = 6;
// total cumulative number of worker restarts
int64 restart_count = 7;
// worker restarts per time, sliding window
MultipleWindowsStat restarts = 8;
// topic read speed in sliding window, bytes
MultipleWindowsStat read_bytes = 9;
// topic read speed in sliding window, messages
MultipleWindowsStat read_messages = 10;
// table write speed in sliding window, bytes
MultipleWindowsStat write_bytes = 11;
// table write speed in sliding window, rows
MultipleWindowsStat write_rows = 12;

// cpu usage time on decompression, microseconds
MultipleWindowsStat decompression_cpu_time = 13;
// cpu usage time on processing, microseconds
MultipleWindowsStat processing_cpu_time = 14;
}

// detailed stats of every worker, only included if DescribeTransferRequest.include_stats flag = true in request
repeated WorkerStats workers_stats = 1;
// minimal uptime of all current workers
google.protobuf.Duration min_worker_uptime = 2;

// topic read speed in sliding window, overall for transfer, bytes
MultipleWindowsStat read_bytes = 3;
// topic read speed in sliding window, overall for transfer, messages
MultipleWindowsStat read_messages = 4;
// table write speed in sliding window, overall for transfer, bytes
MultipleWindowsStat write_bytes = 5;
// table write speed in sliding window, overall for transfer, rows
MultipleWindowsStat write_rows = 6;

// cpu usage time on decompression, overall for transfer, microseconds
MultipleWindowsStat decompression_cpu_time = 7;
// cpu usage time on processing, overall for transfer, microseconds
MultipleWindowsStat processing_cpu_time = 8;
// moment when stats collection was last started (or reset)
google.protobuf.Timestamp stats_collection_start = 9;
}

// Description of scheme object.
Ydb.Scheme.Entry self = 1;

Expand All @@ -145,4 +224,5 @@ message DescribeTransferResult {
}

optional BatchSettings batch_settings = 11;
optional Stats stats = 12;
}
12 changes: 12 additions & 0 deletions src/api/protos/ydb_query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,21 @@ message AttachSessionRequest {
string session_id = 1 [(Ydb.length).le = 1024];
}

message SessionShutdownHint {
}

message NodeShutdownHint {
}

message SessionState {
StatusIds.StatusCode status = 1;
repeated Ydb.Issue.IssueMessage issues = 2;

// The reason the session is ending, for SDK-side handling
oneof session_hint {
SessionShutdownHint session_shutdown = 3;
NodeShutdownHint node_shutdown = 4;
}
}

message SerializableModeSettings {
Expand Down
6 changes: 6 additions & 0 deletions src/api/protos/ydb_table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ message LocalBloomNgramFilterIndex {
optional bool case_sensitive = 5;
}

message GlobalJsonIndex {
GlobalIndexSettings settings = 1;
}

// Represent table index
message TableIndex {
// Name of index
Expand All @@ -344,6 +348,7 @@ message TableIndex {
GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 9;
LocalBloomFilterIndex local_bloom_filter_index = 10;
LocalBloomNgramFilterIndex local_bloom_ngram_filter_index = 11;
GlobalJsonIndex global_json_index = 12;
}
// list of columns content to be copied in to index table
repeated string data_columns = 5;
Expand Down Expand Up @@ -372,6 +377,7 @@ message TableIndexDescription {
GlobalFulltextRelevanceIndex global_fulltext_relevance_index = 11;
LocalBloomFilterIndex local_bloom_filter_index = 12;
LocalBloomNgramFilterIndex local_bloom_ngram_filter_index = 13;
GlobalJsonIndex global_json_index = 14;
}
Status status = 4;
// list of columns content to be copied in to index table
Expand Down
Loading
Loading