From a965ceda43903d34f4944f097a33b0181faa5677 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 1 Apr 2026 18:25:54 +0800 Subject: [PATCH 1/9] import. --- cpp/src/common/device_id.cc | 5 ++--- cpp/src/encoding/int32_rle_decoder.h | 2 +- cpp/src/file/tsfile_io_reader.cc | 9 +++------ 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/cpp/src/common/device_id.cc b/cpp/src/common/device_id.cc index 02cd32510..31ca88426 100644 --- a/cpp/src/common/device_id.cc +++ b/cpp/src/common/device_id.cc @@ -58,7 +58,7 @@ bool IDeviceIDComparator::operator()( // StringArrayDeviceID implementation StringArrayDeviceID::StringArrayDeviceID( const std::vector& segments) - : segments_(formalize(segments)) {} + : segments_(formalize(split_device_id_string(segments))) {} StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { auto segments = split_device_id_string(device_id_string); @@ -209,11 +209,10 @@ std::vector StringArrayDeviceID::split_device_id_string( const std::string& device_id_string) { #ifdef ENABLE_ANTLR4 auto splits = storage::PathNodesGenerator::invokeParser(device_id_string); - return split_device_id_string(splits); #else auto splits = split_string(device_id_string, '.'); - return split_device_id_string(splits); #endif + return split_device_id_string(splits); } std::vector StringArrayDeviceID::split_device_id_string( diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index aee9048a1..402efa871 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -237,7 +237,7 @@ class Int32RleDecoder : public Decoder { is_length_and_bitwidth_readed_ = false; current_count_ = 0; if (current_buffer_) { - delete[] current_buffer_; + common::mem_free(current_buffer_); current_buffer_ = nullptr; } if (packer_) { diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 69e12e45a..6917fa316 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -298,17 +298,14 @@ int TsFileIOReader::load_device_index_entry( if (device_id_comparable == nullptr) { return E_INVALID_DATA_POINT; } - std::string table_name = device_id_comparable->device_id_->get_table_name(); + auto table_name = device_id_comparable->device_id_->get_table_name(); auto it = tsfile_meta_.table_metadata_index_node_map_.find(table_name); - if (it == tsfile_meta_.table_metadata_index_node_map_.end()) { + if (it == tsfile_meta_.table_metadata_index_node_map_.end() || + it->second == nullptr) { return E_DEVICE_NOT_EXIST; } auto index_node = it->second; - if (index_node == nullptr) { - return E_DEVICE_NOT_EXIST; - } if (index_node->node_type_ == LEAF_DEVICE) { - // FIXME ret = index_node->binary_search_children( device_name, true, device_index_entry, end_offset); } else { From 6e2c1fbe084da24c945f573389acebf3474bfde3 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 1 Apr 2026 18:11:42 +0800 Subject: [PATCH 2/9] tmp code. --- cpp/src/encoding/int32_rle_decoder.h | 52 ++++++- cpp/src/encoding/int64_rle_decoder.h | 34 ++++- cpp/test/encoding/int32_rle_codec_test.cc | 127 ++++++++++++++++++ .../tree_view/tsfile_reader_tree_test.cc | 97 +++++++++++++ 4 files changed, 303 insertions(+), 7 deletions(-) diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 402efa871..4c3105109 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -109,13 +109,20 @@ class Int32RleDecoder : public Decoder { read_length_and_bitwidth(buffer); } if (current_count_ == 0) { - uint8_t header; + // The header is encoded as an unsigned varint where: + // low bit = 0 => RLE run: header_value >> 1 is the run count + // low bit = 1 => bit-packing: header_value >> 1 is the group count + uint32_t header_value = 0; int ret = common::E_OK; - if (RET_FAIL( - common::SerializationUtil::read_ui8(header, byte_cache_))) { + if (RET_FAIL(common::SerializationUtil::read_var_uint( + header_value, byte_cache_))) { return ret; } - call_read_bit_packing_buffer(header); + if (header_value & 1) { + call_read_bit_packing_buffer(header_value); + } else { + call_read_rle_run(header_value); + } } --current_count_; int32_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; @@ -125,8 +132,41 @@ class Int32RleDecoder : public Decoder { return result; } - int call_read_bit_packing_buffer(uint8_t header) { - int bit_packed_group_count = (int)(header >> 1); + int call_read_rle_run(uint32_t header_value) { + int ret = common::E_OK; + int run_length = (int)(header_value >> 1); + if (run_length <= 0) { + return common::E_DECODE_ERR; + } + int byte_width = (bit_width_ + 7) / 8; + // Read the repeated value (stored as byte_width bytes, little-endian) + int32_t value = 0; + for (int i = 0; i < byte_width; i++) { + uint8_t b; + if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) { + return ret; + } + value |= ((int32_t)b) << (i * 8); + } + if (current_buffer_ != nullptr) { + common::mem_free(current_buffer_); + } + current_buffer_ = static_cast( + common::mem_alloc(sizeof(int32_t) * run_length, + common::MOD_DECODER_OBJ)); + if (IS_NULL(current_buffer_)) { + return common::E_OOM; + } + for (int i = 0; i < run_length; i++) { + current_buffer_[i] = value; + } + current_count_ = run_length; + bitpacking_num_ = run_length; + return ret; + } + + int call_read_bit_packing_buffer(uint32_t header_value) { + int bit_packed_group_count = (int)(header_value >> 1); // in last bit-packing group, there may be some useless value, // lastBitPackedNum indicates how many values is useful uint8_t last_bit_packed_num; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 8010fe0f7..4a33e0518 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -112,7 +112,11 @@ class Int64RleDecoder : public Decoder { common::SerializationUtil::read_ui8(header, byte_cache_))) { return ret; } - call_read_bit_packing_buffer(header); + if (header & 1) { + call_read_bit_packing_buffer(header); + } else { + call_read_rle_run(header); + } } --current_count_; int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; @@ -122,6 +126,34 @@ class Int64RleDecoder : public Decoder { return result; } + int call_read_rle_run(uint8_t header) { + int ret = common::E_OK; + int run_length = (int)(header >> 1); + if (run_length <= 0) { + return common::E_DECODE_ERR; + } + int byte_width = (bit_width_ + 7) / 8; + // Read the repeated value (stored as byte_width bytes, little-endian) + int64_t value = 0; + for (int i = 0; i < byte_width; i++) { + uint8_t b; + if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) { + return ret; + } + value |= ((int64_t)b) << (i * 8); + } + if (current_buffer_ != nullptr) { + delete[] current_buffer_; + } + current_buffer_ = new int64_t[run_length]; + for (int i = 0; i < run_length; i++) { + current_buffer_[i] = value; + } + current_count_ = run_length; + bitpacking_num_ = run_length; + return ret; + } + int call_read_bit_packing_buffer(uint8_t header) { int bit_packed_group_count = (int)(header >> 1); // in last bit-packing group, there may be some useless value, diff --git a/cpp/test/encoding/int32_rle_codec_test.cc b/cpp/test/encoding/int32_rle_codec_test.cc index c580a0eb1..0d27d0211 100644 --- a/cpp/test/encoding/int32_rle_codec_test.cc +++ b/cpp/test/encoding/int32_rle_codec_test.cc @@ -164,4 +164,131 @@ TEST_F(Int32RleEncoderTest, EncodeFlushWithoutData) { EXPECT_EQ(stream.total_size(), 0u); } +// Helper: write a manually crafted RLE segment (Java/Parquet hybrid RLE format): +// [length_varint] [bit_width] [group_header_varint] [value_bytes...] +// run_count must be the actual count (written as (run_count<<1)|0 varint). +static void write_rle_segment(common::ByteStream& stream, uint8_t bit_width, + uint32_t run_count, int32_t value) { + common::ByteStream content(32, common::MOD_ENCODER_OBJ); + common::SerializationUtil::write_ui8(bit_width, content); + // Group header: (run_count << 1) | 0 = even varint + common::SerializationUtil::write_var_uint(run_count << 1, content); + // Value: ceil(bit_width / 8) bytes, little-endian + int byte_width = (bit_width + 7) / 8; + for (int i = 0; i < byte_width; i++) { + common::SerializationUtil::write_ui8((value >> (i * 8)) & 0xFF, + content); + } + uint32_t length = content.total_size(); + common::SerializationUtil::write_var_uint(length, stream); + // Append content bytes to stream + uint8_t buf[64]; + uint32_t read_len = 0; + content.read_buf(buf, length, read_len); + stream.write_buf(buf, read_len); +} + +// Regression test: run_count=64 requires a 2-byte LEB128 varint header +// ((64<<1)|0 = 128 = [0x80, 0x01]). Before the fix, only 1 byte was read, +// causing byte misalignment and incorrect decoding. +TEST_F(Int32RleEncoderTest, DecodeRleRunCountExactly64) { + common::ByteStream stream(32, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/7, /*run_count=*/64, + /*value=*/42); + + Int32RleDecoder decoder; + std::vector decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), 64u); + for (int32_t v : decoded) { + EXPECT_EQ(v, 42); + } +} + +// Run counts of 128 and 256 each need a 2-byte varint header. +TEST_F(Int32RleEncoderTest, DecodeRleRunCountLarge) { + for (uint32_t count : {128u, 256u, 500u}) { + common::ByteStream stream(64, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/count, + /*value=*/100); + + Int32RleDecoder decoder; + std::vector decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), (size_t)count) + << "Failed for run_count=" << count; + for (int32_t v : decoded) { + EXPECT_EQ(v, 100); + } + } +} + +// Multiple consecutive RLE runs including large ones (simulates real sensor +// data with repeated values and occasional changes). +TEST_F(Int32RleEncoderTest, DecodeMultipleRleRunsWithLargeCount) { + common::ByteStream stream(128, common::MOD_ENCODER_OBJ); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/64, + /*value=*/25); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/8, + /*value=*/26); + write_rle_segment(stream, /*bit_width=*/8, /*run_count=*/100, + /*value=*/25); + + Int32RleDecoder decoder; + std::vector decoded; + while (decoder.has_next(stream)) { + int32_t v; + decoder.read_int32(v, stream); + decoded.push_back(v); + } + + ASSERT_EQ(decoded.size(), 172u); // 64 + 8 + 100 + for (size_t i = 0; i < 64; i++) EXPECT_EQ(decoded[i], 25); + for (size_t i = 64; i < 72; i++) EXPECT_EQ(decoded[i], 26); + for (size_t i = 72; i < 172; i++) EXPECT_EQ(decoded[i], 25); +} + +// Regression test: Int32RleDecoder::reset() previously called delete[] on +// current_buffer_ which was allocated with mem_alloc (malloc). This is +// undefined behaviour and typically causes a crash. The fix uses mem_free. +TEST_F(Int32RleEncoderTest, ResetAfterDecodeNoCrash) { + common::ByteStream stream(1024, common::MOD_ENCODER_OBJ); + Int32RleEncoder encoder; + for (int i = 0; i < 16; i++) encoder.encode(i, stream); + encoder.flush(stream); + + Int32RleDecoder decoder; + // Decode at least one value to populate current_buffer_ via mem_alloc. + int32_t v; + ASSERT_TRUE(decoder.has_next(stream)); + decoder.read_int32(v, stream); + + // reset() must use mem_free, not delete[]. Before the fix this would crash. + decoder.reset(); + + // Verify the decoder is functional after reset. + common::ByteStream stream2(1024, common::MOD_ENCODER_OBJ); + Int32RleEncoder encoder2; + std::vector input = {7, 7, 7, 7, 7, 7, 7, 7}; + for (int32_t x : input) encoder2.encode(x, stream2); + encoder2.flush(stream2); + + std::vector decoded; + while (decoder.has_next(stream2)) { + decoder.read_int32(v, stream2); + decoded.push_back(v); + } + ASSERT_EQ(decoded, input); +} + } // namespace storage diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index aa4ff2544..258db4d06 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -20,6 +20,7 @@ #include +#include "reader/result_set.h" #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" @@ -425,3 +426,99 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { delete measurement; } } + +// Regression test: query_table_on_tree on a device path with three or more +// dot-segments (e.g. "root.sensors.TH") previously SEGVed because: +// 1. StringArrayDeviceID split "root.sensors.TH" into ["root","sensors","TH"] +// instead of the correct ["root.sensors","TH"], so get_table_name() returned +// "root" instead of "root.sensors". +// 2. load_device_index_entry used operator[] on the table map which inserted a +// null entry, then asserted on it. +TEST_F(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) { + TsFileTreeWriter writer(&write_file_); + // Device paths with 3 dot-segments: table_name="root.sensors", device="TH" + std::string device_id = "root.sensors.TH"; + std::string m_temp = "temperature"; + std::string m_humi = "humidity"; + auto* ms_temp = new MeasurementSchema(m_temp, INT32); + auto* ms_humi = new MeasurementSchema(m_humi, INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, ms_temp)); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, ms_humi)); + delete ms_temp; + delete ms_humi; + + for (int ts = 0; ts < 5; ts++) { + TsRecord rec(device_id, ts); + rec.add_point(m_temp, static_cast(20 + ts)); + rec.add_point(m_humi, static_cast(50 + ts)); + ASSERT_EQ(E_OK, writer.write(rec)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + ASSERT_EQ(E_OK, reader.open(file_name_)); + ResultSet* result; + // query_table_on_tree used to SEGV here due to wrong table-name lookup + ASSERT_EQ(E_OK, + reader.query_table_on_tree({m_temp, m_humi}, INT64_MIN, + INT64_MAX, result)); + + auto* trs = static_cast(result); + bool has_next = false; + int row_cnt = 0; + while (IS_SUCC(trs->next(has_next)) && has_next) { + row_cnt++; + } + EXPECT_EQ(row_cnt, 5); + reader.destroy_query_data_set(result); + reader.close(); +} + +// Regression test: load_device_index_entry previously used operator[] to look +// up the table node, which silently inserted a null entry and then asserted. +// After the fix it uses find() and returns E_DEVICE_NOT_EXIST gracefully. +// This is triggered when querying a measurement that no device in the file has. +TEST_F(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) { + // Use the same multi-device setup as ReadTreeByTable to ensure a valid file. + TsFileTreeWriter writer(&write_file_); + std::vector device_ids = {"root.db1.t1", "root.db2.t1"}; + std::string m_temp = "temperature"; + for (auto dev : device_ids) { + auto* ms = new MeasurementSchema(m_temp, INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(dev, ms)); + delete ms; + TsRecord rec(dev, 0); + rec.add_point(m_temp, static_cast(25)); + ASSERT_EQ(E_OK, writer.write(rec)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + ASSERT_EQ(E_OK, reader.open(file_name_)); + ResultSet* result = nullptr; + // "nonexistent" is not present in any device. Before the fix, + // load_device_index_entry used operator[] which inserted null and crashed. + // After the fix it returns E_DEVICE_NOT_EXIST or E_COLUMN_NOT_EXIST. + int ret = reader.query_table_on_tree({"nonexistent"}, INT64_MIN, + INT64_MAX, result); + EXPECT_NE(ret, E_OK); // Must not succeed (measurement not found) + if (result != nullptr) { + reader.destroy_query_data_set(result); + } + reader.close(); +} + +TEST_F(TsFileTreeReaderTest, simpletest) { + TsFileReader reader; + reader.open("/Users/colin/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files/wxid_197w1jpv66ag22_cc63/msg/file/2026-03/1761643915818-1-0-0.tsfile"); + ResultSet* result; + int ret = reader.query_table_on_tree({"t", "h"}, INT64_MIN, + INT64_MAX, result); + ASSERT_EQ(ret, E_OK); + + auto* table_result_set = (storage::TableResultSet*)result; + bool has_next = false; + print_table_result_set((table_result_set)); +} From 8fbd41a2828ac698e3f72c9d3aea3975dc50c861 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 1 Apr 2026 19:18:29 +0800 Subject: [PATCH 3/9] fix some code. --- cpp/src/file/tsfile_io_reader.cc | 2 +- .../tree_view/tsfile_reader_tree_test.cc | 27 +++++-------------- 2 files changed, 8 insertions(+), 21 deletions(-) diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 6917fa316..31d4a303a 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -298,7 +298,7 @@ int TsFileIOReader::load_device_index_entry( if (device_id_comparable == nullptr) { return E_INVALID_DATA_POINT; } - auto table_name = device_id_comparable->device_id_->get_table_name(); + std::string table_name = device_id_comparable->device_id_->get_table_name(); auto it = tsfile_meta_.table_metadata_index_node_map_.find(table_name); if (it == tsfile_meta_.table_metadata_index_node_map_.end() || it->second == nullptr) { diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index 258db4d06..8181b6130 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -20,11 +20,11 @@ #include -#include "reader/result_set.h" #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" #include "file/write_file.h" +#include "reader/result_set.h" #include "reader/tsfile_reader.h" #include "reader/tsfile_tree_reader.h" #include "writer/tsfile_table_writer.h" @@ -460,9 +460,8 @@ TEST_F(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) { ASSERT_EQ(E_OK, reader.open(file_name_)); ResultSet* result; // query_table_on_tree used to SEGV here due to wrong table-name lookup - ASSERT_EQ(E_OK, - reader.query_table_on_tree({m_temp, m_humi}, INT64_MIN, - INT64_MAX, result)); + ASSERT_EQ(E_OK, reader.query_table_on_tree({m_temp, m_humi}, INT64_MIN, + INT64_MAX, result)); auto* trs = static_cast(result); bool has_next = false; @@ -480,7 +479,8 @@ TEST_F(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) { // After the fix it uses find() and returns E_DEVICE_NOT_EXIST gracefully. // This is triggered when querying a measurement that no device in the file has. TEST_F(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) { - // Use the same multi-device setup as ReadTreeByTable to ensure a valid file. + // Use the same multi-device setup as ReadTreeByTable to ensure a valid + // file. TsFileTreeWriter writer(&write_file_); std::vector device_ids = {"root.db1.t1", "root.db2.t1"}; std::string m_temp = "temperature"; @@ -501,24 +501,11 @@ TEST_F(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) { // "nonexistent" is not present in any device. Before the fix, // load_device_index_entry used operator[] which inserted null and crashed. // After the fix it returns E_DEVICE_NOT_EXIST or E_COLUMN_NOT_EXIST. - int ret = reader.query_table_on_tree({"nonexistent"}, INT64_MIN, - INT64_MAX, result); + int ret = reader.query_table_on_tree({"nonexistent"}, INT64_MIN, INT64_MAX, + result); EXPECT_NE(ret, E_OK); // Must not succeed (measurement not found) if (result != nullptr) { reader.destroy_query_data_set(result); } reader.close(); } - -TEST_F(TsFileTreeReaderTest, simpletest) { - TsFileReader reader; - reader.open("/Users/colin/Library/Containers/com.tencent.xinWeChat/Data/Documents/xwechat_files/wxid_197w1jpv66ag22_cc63/msg/file/2026-03/1761643915818-1-0-0.tsfile"); - ResultSet* result; - int ret = reader.query_table_on_tree({"t", "h"}, INT64_MIN, - INT64_MAX, result); - ASSERT_EQ(ret, E_OK); - - auto* table_result_set = (storage::TableResultSet*)result; - bool has_next = false; - print_table_result_set((table_result_set)); -} From fb6ac4b57def7d70c60b229ea401181a27bccedf Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 1 Apr 2026 19:36:58 +0800 Subject: [PATCH 4/9] fix comment. --- cpp/src/encoding/int64_rle_decoder.h | 23 +++++++++++++---------- cpp/test/encoding/int32_rle_codec_test.cc | 3 ++- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 4a33e0518..6fb874056 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -106,16 +106,19 @@ class Int64RleDecoder : public Decoder { read_length_and_bitwidth(buffer); } if (current_count_ == 0) { - uint8_t header; + // The header is encoded as an unsigned varint where: + // low bit = 0 => RLE run: header_value >> 1 is the run count + // low bit = 1 => bit-packing: header_value >> 1 is the group count + uint32_t header_value = 0; int ret = common::E_OK; - if (RET_FAIL( - common::SerializationUtil::read_ui8(header, byte_cache_))) { + if (RET_FAIL(common::SerializationUtil::read_var_uint( + header_value, byte_cache_))) { return ret; } - if (header & 1) { - call_read_bit_packing_buffer(header); + if (header_value & 1) { + call_read_bit_packing_buffer(header_value); } else { - call_read_rle_run(header); + call_read_rle_run(header_value); } } --current_count_; @@ -126,9 +129,9 @@ class Int64RleDecoder : public Decoder { return result; } - int call_read_rle_run(uint8_t header) { + int call_read_rle_run(uint32_t header_value) { int ret = common::E_OK; - int run_length = (int)(header >> 1); + int run_length = (int)(header_value >> 1); if (run_length <= 0) { return common::E_DECODE_ERR; } @@ -154,8 +157,8 @@ class Int64RleDecoder : public Decoder { return ret; } - int call_read_bit_packing_buffer(uint8_t header) { - int bit_packed_group_count = (int)(header >> 1); + int call_read_bit_packing_buffer(uint32_t header_value) { + int bit_packed_group_count = (int)(header_value >> 1); // in last bit-packing group, there may be some useless value, // lastBitPackedNum indicates how many values is useful uint8_t last_bit_packed_num; diff --git a/cpp/test/encoding/int32_rle_codec_test.cc b/cpp/test/encoding/int32_rle_codec_test.cc index 0d27d0211..4b0b76d0a 100644 --- a/cpp/test/encoding/int32_rle_codec_test.cc +++ b/cpp/test/encoding/int32_rle_codec_test.cc @@ -175,8 +175,9 @@ static void write_rle_segment(common::ByteStream& stream, uint8_t bit_width, common::SerializationUtil::write_var_uint(run_count << 1, content); // Value: ceil(bit_width / 8) bytes, little-endian int byte_width = (bit_width + 7) / 8; + uint32_t uvalue = static_cast(value); for (int i = 0; i < byte_width; i++) { - common::SerializationUtil::write_ui8((value >> (i * 8)) & 0xFF, + common::SerializationUtil::write_ui8((uvalue >> (i * 8)) & 0xFF, content); } uint32_t length = content.total_size(); From f5ee23c6da7c32943ad030d7a28aec508a1133f7 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Wed, 1 Apr 2026 20:26:45 +0800 Subject: [PATCH 5/9] tmp code. --- cpp/src/common/device_id.cc | 2 +- cpp/src/encoding/int32_rle_decoder.h | 10 +++++----- cpp/src/encoding/int64_rle_decoder.h | 5 +++-- cpp/test/encoding/int32_rle_codec_test.cc | 5 +++-- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/cpp/src/common/device_id.cc b/cpp/src/common/device_id.cc index 31ca88426..b35a8593f 100644 --- a/cpp/src/common/device_id.cc +++ b/cpp/src/common/device_id.cc @@ -58,7 +58,7 @@ bool IDeviceIDComparator::operator()( // StringArrayDeviceID implementation StringArrayDeviceID::StringArrayDeviceID( const std::vector& segments) - : segments_(formalize(split_device_id_string(segments))) {} + : segments_(formalize(segments)) {} StringArrayDeviceID::StringArrayDeviceID(const std::string& device_id_string) { auto segments = split_device_id_string(device_id_string); diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 4c3105109..cc0d207f3 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -110,8 +110,9 @@ class Int32RleDecoder : public Decoder { } if (current_count_ == 0) { // The header is encoded as an unsigned varint where: - // low bit = 0 => RLE run: header_value >> 1 is the run count - // low bit = 1 => bit-packing: header_value >> 1 is the group count + // low bit = 0 => RLE run: header_value >> 1 is the run + // count low bit = 1 => bit-packing: header_value >> 1 is the + // group count uint32_t header_value = 0; int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( @@ -151,9 +152,8 @@ class Int32RleDecoder : public Decoder { if (current_buffer_ != nullptr) { common::mem_free(current_buffer_); } - current_buffer_ = static_cast( - common::mem_alloc(sizeof(int32_t) * run_length, - common::MOD_DECODER_OBJ)); + current_buffer_ = static_cast(common::mem_alloc( + sizeof(int32_t) * run_length, common::MOD_DECODER_OBJ)); if (IS_NULL(current_buffer_)) { return common::E_OOM; } diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 6fb874056..832153701 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -107,8 +107,9 @@ class Int64RleDecoder : public Decoder { } if (current_count_ == 0) { // The header is encoded as an unsigned varint where: - // low bit = 0 => RLE run: header_value >> 1 is the run count - // low bit = 1 => bit-packing: header_value >> 1 is the group count + // low bit = 0 => RLE run: header_value >> 1 is the run + // count low bit = 1 => bit-packing: header_value >> 1 is the + // group count uint32_t header_value = 0; int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( diff --git a/cpp/test/encoding/int32_rle_codec_test.cc b/cpp/test/encoding/int32_rle_codec_test.cc index 4b0b76d0a..dfc737c8b 100644 --- a/cpp/test/encoding/int32_rle_codec_test.cc +++ b/cpp/test/encoding/int32_rle_codec_test.cc @@ -164,11 +164,12 @@ TEST_F(Int32RleEncoderTest, EncodeFlushWithoutData) { EXPECT_EQ(stream.total_size(), 0u); } -// Helper: write a manually crafted RLE segment (Java/Parquet hybrid RLE format): +// Helper: write a manually crafted RLE segment (Java/Parquet hybrid RLE +// format): // [length_varint] [bit_width] [group_header_varint] [value_bytes...] // run_count must be the actual count (written as (run_count<<1)|0 varint). static void write_rle_segment(common::ByteStream& stream, uint8_t bit_width, - uint32_t run_count, int32_t value) { + uint32_t run_count, int32_t value) { common::ByteStream content(32, common::MOD_ENCODER_OBJ); common::SerializationUtil::write_ui8(bit_width, content); // Group header: (run_count << 1) | 0 = even varint From 74eba446f872f0bac2c5c7101f3972905910e181 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 2 Apr 2026 10:16:41 +0800 Subject: [PATCH 6/9] fix comment. --- cpp/src/encoding/int32_rle_decoder.h | 26 ++++++++++++++------------ cpp/src/encoding/int64_rle_decoder.h | 22 ++++++++++++++-------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index cc0d207f3..29c8fa6ce 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -37,6 +37,8 @@ class Int32RleDecoder : public Decoder { int bitpacking_num_; bool is_length_and_bitwidth_readed_; int current_count_; + bool is_rle_run_; + int32_t rle_value_; common::ByteStream byte_cache_{common::MOD_DECODER_OBJ}; int32_t* current_buffer_; Int32Packer* packer_; @@ -49,6 +51,8 @@ class Int32RleDecoder : public Decoder { bitpacking_num_(0), is_length_and_bitwidth_readed_(false), current_count_(0), + is_rle_run_(false), + rle_value_(0), byte_cache_(1024, common::MOD_DECODER_OBJ), current_buffer_(nullptr), packer_(nullptr), @@ -89,6 +93,8 @@ class Int32RleDecoder : public Decoder { bit_width_ = 0; bitpacking_num_ = 0; current_count_ = 0; + is_rle_run_ = false; + rle_value_ = 0; } bool has_next(common::ByteStream& buffer) { @@ -126,7 +132,9 @@ class Int32RleDecoder : public Decoder { } } --current_count_; - int32_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; + int32_t result = + is_rle_run_ ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } @@ -149,17 +157,8 @@ class Int32RleDecoder : public Decoder { } value |= ((int32_t)b) << (i * 8); } - if (current_buffer_ != nullptr) { - common::mem_free(current_buffer_); - } - current_buffer_ = static_cast(common::mem_alloc( - sizeof(int32_t) * run_length, common::MOD_DECODER_OBJ)); - if (IS_NULL(current_buffer_)) { - return common::E_OOM; - } - for (int i = 0; i < run_length; i++) { - current_buffer_[i] = value; - } + rle_value_ = value; + is_rle_run_ = true; current_count_ = run_length; bitpacking_num_ = run_length; return ret; @@ -179,6 +178,7 @@ class Int32RleDecoder : public Decoder { current_count_ = (bit_packed_group_count - 1) * 8 + last_bit_packed_num; bitpacking_num_ = current_count_; + is_rle_run_ = false; } else { return common::E_DECODE_ERR; } @@ -276,6 +276,8 @@ class Int32RleDecoder : public Decoder { bitpacking_num_ = 0; is_length_and_bitwidth_readed_ = false; current_count_ = 0; + is_rle_run_ = false; + rle_value_ = 0; if (current_buffer_) { common::mem_free(current_buffer_); current_buffer_ = nullptr; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 832153701..58f2c2aaa 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -37,6 +37,8 @@ class Int64RleDecoder : public Decoder { int bitpacking_num_; bool is_length_and_bitwidth_readed_; int current_count_; + bool is_rle_run_; + int64_t rle_value_; common::ByteStream byte_cache_{common::MOD_DECODER_OBJ}; int64_t* current_buffer_; Int64Packer* packer_; @@ -49,6 +51,8 @@ class Int64RleDecoder : public Decoder { bitpacking_num_(0), is_length_and_bitwidth_readed_(false), current_count_(0), + is_rle_run_(false), + rle_value_(0), byte_cache_(1024, common::MOD_DECODER_OBJ), current_buffer_(nullptr), packer_(nullptr), @@ -86,6 +90,8 @@ class Int64RleDecoder : public Decoder { bit_width_ = 0; bitpacking_num_ = 0; current_count_ = 0; + is_rle_run_ = false; + rle_value_ = 0; } bool has_next(common::ByteStream& buffer) { @@ -123,7 +129,9 @@ class Int64RleDecoder : public Decoder { } } --current_count_; - int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1]; + int64_t result = + is_rle_run_ ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } @@ -146,13 +154,8 @@ class Int64RleDecoder : public Decoder { } value |= ((int64_t)b) << (i * 8); } - if (current_buffer_ != nullptr) { - delete[] current_buffer_; - } - current_buffer_ = new int64_t[run_length]; - for (int i = 0; i < run_length; i++) { - current_buffer_[i] = value; - } + rle_value_ = value; + is_rle_run_ = true; current_count_ = run_length; bitpacking_num_ = run_length; return ret; @@ -172,6 +175,7 @@ class Int64RleDecoder : public Decoder { current_count_ = (bit_packed_group_count - 1) * 8 + last_bit_packed_num; bitpacking_num_ = current_count_; + is_rle_run_ = false; } else { printf( "tsfile-encoding IntRleDecoder: bit_packed_group_count %d, " @@ -258,6 +262,8 @@ class Int64RleDecoder : public Decoder { bitpacking_num_ = 0; is_length_and_bitwidth_readed_ = false; current_count_ = 0; + is_rle_run_ = false; + rle_value_ = 0; if (current_buffer_) { common::mem_free(current_buffer_); current_buffer_ = nullptr; From 4a8222266da72f1bd362128afec754a4349ce92a Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 2 Apr 2026 15:21:44 +0800 Subject: [PATCH 7/9] fix return code. --- cpp/src/encoding/int32_rle_decoder.h | 34 ++++++++++++++++------------ cpp/src/encoding/int64_rle_decoder.h | 26 ++++++++++++--------- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index 29c8fa6ce..dbf02a881 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -64,13 +64,14 @@ class Int32RleDecoder : public Decoder { } int read_boolean(bool& ret_value, common::ByteStream& in) override { int32_t bool_value; - read_int32(bool_value, in); - ret_value = bool_value == 0 ? false : true; - return common::E_OK; + int ret = read_int32(bool_value, in); + if (ret == common::E_OK) { + ret_value = bool_value != 0; + } + return ret; } int read_int32(int32_t& ret_value, common::ByteStream& in) override { - ret_value = static_cast(read_int(in)); - return common::E_OK; + return read_int(ret_value, in); } int read_int64(int64_t& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; @@ -109,10 +110,13 @@ class Int32RleDecoder : public Decoder { return current_count_ > 0 || byte_cache_.remaining_size() > 0; } - int32_t read_int(common::ByteStream& buffer) { + int read_int(int32_t& result, common::ByteStream& buffer) { + int ret = common::E_OK; if (!is_length_and_bitwidth_readed_) { // start to reader a new rle+bit-packing pattern - read_length_and_bitwidth(buffer); + if (RET_FAIL(read_length_and_bitwidth(buffer))) { + return ret; + } } if (current_count_ == 0) { // The header is encoded as an unsigned varint where: @@ -120,25 +124,27 @@ class Int32RleDecoder : public Decoder { // count low bit = 1 => bit-packing: header_value >> 1 is the // group count uint32_t header_value = 0; - int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( header_value, byte_cache_))) { return ret; } if (header_value & 1) { - call_read_bit_packing_buffer(header_value); + if (RET_FAIL(call_read_bit_packing_buffer(header_value))) { + return ret; + } } else { - call_read_rle_run(header_value); + if (RET_FAIL(call_read_rle_run(header_value))) { + return ret; + } } } --current_count_; - int32_t result = - is_rle_run_ ? rle_value_ - : current_buffer_[bitpacking_num_ - current_count_ - 1]; + result = is_rle_run_ ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } - return result; + return ret; } int call_read_rle_run(uint32_t header_value) { diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index 58f2c2aaa..c2fab8752 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -69,8 +69,7 @@ class Int64RleDecoder : public Decoder { return common::E_TYPE_NOT_MATCH; } int read_int64(int64_t& ret_value, common::ByteStream& in) override { - ret_value = read_int(in); - return common::E_OK; + return read_int(ret_value, in); } int read_float(float& ret_value, common::ByteStream& in) override { return common::E_TYPE_NOT_MATCH; @@ -106,10 +105,13 @@ class Int64RleDecoder : public Decoder { return current_count_ > 0 || byte_cache_.remaining_size() > 0; } - int64_t read_int(common::ByteStream& buffer) { + int read_int(int64_t& result, common::ByteStream& buffer) { + int ret = common::E_OK; if (!is_length_and_bitwidth_readed_) { // start to reader a new rle+bit-packing pattern - read_length_and_bitwidth(buffer); + if (RET_FAIL(read_length_and_bitwidth(buffer))) { + return ret; + } } if (current_count_ == 0) { // The header is encoded as an unsigned varint where: @@ -117,25 +119,27 @@ class Int64RleDecoder : public Decoder { // count low bit = 1 => bit-packing: header_value >> 1 is the // group count uint32_t header_value = 0; - int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( header_value, byte_cache_))) { return ret; } if (header_value & 1) { - call_read_bit_packing_buffer(header_value); + if (RET_FAIL(call_read_bit_packing_buffer(header_value))) { + return ret; + } } else { - call_read_rle_run(header_value); + if (RET_FAIL(call_read_rle_run(header_value))) { + return ret; + } } } --current_count_; - int64_t result = - is_rle_run_ ? rle_value_ - : current_buffer_[bitpacking_num_ - current_count_ - 1]; + result = is_rle_run_ ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } - return result; + return ret; } int call_read_rle_run(uint32_t header_value) { From 70d2ef9f0ded9d8fb807d8bf38d6854d92b40fc8 Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 2 Apr 2026 15:27:56 +0800 Subject: [PATCH 8/9] fix fmt. --- cpp/src/encoding/int32_rle_decoder.h | 5 +++-- cpp/src/encoding/int64_rle_decoder.h | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/encoding/int32_rle_decoder.h b/cpp/src/encoding/int32_rle_decoder.h index dbf02a881..ef7b6f095 100644 --- a/cpp/src/encoding/int32_rle_decoder.h +++ b/cpp/src/encoding/int32_rle_decoder.h @@ -139,8 +139,9 @@ class Int32RleDecoder : public Decoder { } } --current_count_; - result = is_rle_run_ ? rle_value_ - : current_buffer_[bitpacking_num_ - current_count_ - 1]; + result = is_rle_run_ + ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index c2fab8752..e9fd6e8e8 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -134,8 +134,9 @@ class Int64RleDecoder : public Decoder { } } --current_count_; - result = is_rle_run_ ? rle_value_ - : current_buffer_[bitpacking_num_ - current_count_ - 1]; + result = is_rle_run_ + ? rle_value_ + : current_buffer_[bitpacking_num_ - current_count_ - 1]; if (!has_next_package()) { is_length_and_bitwidth_readed_ = false; } From 9e1dce87c12891f58f2970b47cb0eea304b2796a Mon Sep 17 00:00:00 2001 From: ColinLee Date: Thu, 2 Apr 2026 15:44:56 +0800 Subject: [PATCH 9/9] Fix compile --- cpp/src/encoding/dictionary_decoder.h | 3 ++- cpp/src/encoding/int32_sprintz_decoder.h | 4 +++- cpp/src/encoding/int64_rle_decoder.h | 26 ++++++++++++++++-------- cpp/src/encoding/int64_sprintz_decoder.h | 4 +++- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/cpp/src/encoding/dictionary_decoder.h b/cpp/src/encoding/dictionary_decoder.h index 5f64b5873..2962c66ba 100644 --- a/cpp/src/encoding/dictionary_decoder.h +++ b/cpp/src/encoding/dictionary_decoder.h @@ -73,7 +73,8 @@ class DictionaryDecoder : public Decoder { if (entry_index_.empty()) { init_map(buffer); } - int code = value_decoder_.read_int(buffer); + int32_t code = 0; + value_decoder_.read_int(code, buffer); return entry_index_[code]; } diff --git a/cpp/src/encoding/int32_sprintz_decoder.h b/cpp/src/encoding/int32_sprintz_decoder.h index 3d15597ee..a7c92eede 100644 --- a/cpp/src/encoding/int32_sprintz_decoder.h +++ b/cpp/src/encoding/int32_sprintz_decoder.h @@ -125,7 +125,9 @@ class Int32SprintzDecoder : public SprintzDecoder { decode_size_ = bit_width_ & ~(1 << 7); Int32RleDecoder decoder; for (int i = 0; i < decode_size_; ++i) { - current_buffer_[i] = decoder.read_int(input); + if (RET_FAIL(decoder.read_int(current_buffer_[i], input))) { + return ret; + } } } else { decode_size_ = block_size_ + 1; diff --git a/cpp/src/encoding/int64_rle_decoder.h b/cpp/src/encoding/int64_rle_decoder.h index e9fd6e8e8..df8e17838 100644 --- a/cpp/src/encoding/int64_rle_decoder.h +++ b/cpp/src/encoding/int64_rle_decoder.h @@ -182,24 +182,25 @@ class Int64RleDecoder : public Decoder { bitpacking_num_ = current_count_; is_rle_run_ = false; } else { - printf( - "tsfile-encoding IntRleDecoder: bit_packed_group_count %d, " - "smaller " - "than 1", - bit_packed_group_count); + return common::E_DECODE_ERR; } - read_bit_packing_buffer(bit_packed_group_count, last_bit_packed_num); + ret = read_bit_packing_buffer(bit_packed_group_count, + last_bit_packed_num); return ret; } - void read_bit_packing_buffer(int bit_packed_group_count, - int last_bit_packed_num) { + int read_bit_packing_buffer(int bit_packed_group_count, + int last_bit_packed_num) { + int ret = common::E_OK; if (current_buffer_ != nullptr) { common::mem_free(current_buffer_); } current_buffer_ = static_cast( common::mem_alloc(sizeof(int64_t) * bit_packed_group_count * 8, common::MOD_DECODER_OBJ)); + if (IS_NULL(current_buffer_)) { + return common::E_OOM; + } int bytes_to_read = bit_packed_group_count * bit_width_; if (bytes_to_read > (int)byte_cache_.remaining_size()) { bytes_to_read = byte_cache_.remaining_size(); @@ -207,13 +208,17 @@ class Int64RleDecoder : public Decoder { std::vector bytes(bytes_to_read); for (int i = 0; i < bytes_to_read; i++) { - common::SerializationUtil::read_ui8(bytes[i], byte_cache_); + if (RET_FAIL(common::SerializationUtil::read_ui8(bytes[i], + byte_cache_))) { + return ret; + } } // save all int values in currentBuffer packer_->unpack_all_values( bytes.data(), bytes_to_read, current_buffer_); // decode from bytes, save in currentBuffer + return ret; } int read_length_and_bitwidth(common::ByteStream& buffer) { @@ -222,6 +227,9 @@ class Int64RleDecoder : public Decoder { common::SerializationUtil::read_var_uint(length_, buffer))) { return common::E_PARTIAL_READ; } else { + if (tmp_buf_) { + common::mem_free(tmp_buf_); + } tmp_buf_ = (uint8_t*)common::mem_alloc(length_, common::MOD_DECODER_OBJ); if (tmp_buf_ == nullptr) { diff --git a/cpp/src/encoding/int64_sprintz_decoder.h b/cpp/src/encoding/int64_sprintz_decoder.h index a7e3fdd27..7b0827688 100644 --- a/cpp/src/encoding/int64_sprintz_decoder.h +++ b/cpp/src/encoding/int64_sprintz_decoder.h @@ -124,7 +124,9 @@ class Int64SprintzDecoder : public SprintzDecoder { decode_size_ = bit_width_ & ~(1 << 7); Int64RleDecoder decoder; for (int i = 0; i < decode_size_; ++i) { - current_buffer_[i] = decoder.read_int(input); + if (RET_FAIL(decoder.read_int(current_buffer_[i], input))) { + return ret; + } } } else { decode_size_ = block_size_ + 1;