Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cpp/src/common/device_id.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,10 @@ std::vector<std::string> StringArrayDeviceID::split_device_id_string(
const std::string& device_id_string) {
#ifdef ENABLE_ANTLR4
auto splits = storage::PathNodesGenerator::invokeParser(device_id_string);
return split_device_id_string(splits);
#else
auto splits = split_string(device_id_string, '.');
return split_device_id_string(splits);
#endif
return split_device_id_string(splits);
}

std::vector<std::string> StringArrayDeviceID::split_device_id_string(
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/encoding/dictionary_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class DictionaryDecoder : public Decoder {
if (entry_index_.empty()) {
init_map(buffer);
}
int code = value_decoder_.read_int(buffer);
int32_t code = 0;
value_decoder_.read_int(code, buffer);
return entry_index_[code];
}

Expand Down
83 changes: 66 additions & 17 deletions cpp/src/encoding/int32_rle_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Int32RleDecoder : public Decoder {
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
bool is_rle_run_;
int32_t rle_value_;
common::ByteStream byte_cache_{common::MOD_DECODER_OBJ};
int32_t* current_buffer_;
Int32Packer* packer_;
Expand All @@ -49,6 +51,8 @@ class Int32RleDecoder : public Decoder {
bitpacking_num_(0),
is_length_and_bitwidth_readed_(false),
current_count_(0),
is_rle_run_(false),
rle_value_(0),
byte_cache_(1024, common::MOD_DECODER_OBJ),
current_buffer_(nullptr),
packer_(nullptr),
Expand All @@ -60,13 +64,14 @@ class Int32RleDecoder : public Decoder {
}
int read_boolean(bool& ret_value, common::ByteStream& in) override {
int32_t bool_value;
read_int32(bool_value, in);
ret_value = bool_value == 0 ? false : true;
return common::E_OK;
int ret = read_int32(bool_value, in);
if (ret == common::E_OK) {
ret_value = bool_value != 0;
}
return ret;
}
int read_int32(int32_t& ret_value, common::ByteStream& in) override {
ret_value = static_cast<int32_t>(read_int(in));
return common::E_OK;
return read_int(ret_value, in);
}
int read_int64(int64_t& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
Expand All @@ -89,6 +94,8 @@ class Int32RleDecoder : public Decoder {
bit_width_ = 0;
bitpacking_num_ = 0;
current_count_ = 0;
is_rle_run_ = false;
rle_value_ = 0;
}

bool has_next(common::ByteStream& buffer) {
Expand All @@ -103,30 +110,69 @@ class Int32RleDecoder : public Decoder {
return current_count_ > 0 || byte_cache_.remaining_size() > 0;
}

int32_t read_int(common::ByteStream& buffer) {
int read_int(int32_t& result, common::ByteStream& buffer) {
int ret = common::E_OK;
if (!is_length_and_bitwidth_readed_) {
// start to reader a new rle+bit-packing pattern
read_length_and_bitwidth(buffer);
if (RET_FAIL(read_length_and_bitwidth(buffer))) {
return ret;
}
}
if (current_count_ == 0) {
uint8_t header;
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_ui8(header, byte_cache_))) {
// The header is encoded as an unsigned varint where:
// low bit = 0 => RLE run: header_value >> 1 is the run
// count low bit = 1 => bit-packing: header_value >> 1 is the
// group count
uint32_t header_value = 0;
if (RET_FAIL(common::SerializationUtil::read_var_uint(
header_value, byte_cache_))) {
return ret;
}
call_read_bit_packing_buffer(header);
if (header_value & 1) {
if (RET_FAIL(call_read_bit_packing_buffer(header_value))) {
return ret;
}
} else {
if (RET_FAIL(call_read_rle_run(header_value))) {
return ret;
}
}
}
--current_count_;
int32_t result = current_buffer_[bitpacking_num_ - current_count_ - 1];
result = is_rle_run_
? rle_value_
: current_buffer_[bitpacking_num_ - current_count_ - 1];
if (!has_next_package()) {
is_length_and_bitwidth_readed_ = false;
}
return result;
return ret;
}

int call_read_bit_packing_buffer(uint8_t header) {
int bit_packed_group_count = (int)(header >> 1);
int call_read_rle_run(uint32_t header_value) {
int ret = common::E_OK;
int run_length = (int)(header_value >> 1);
if (run_length <= 0) {
return common::E_DECODE_ERR;
}
int byte_width = (bit_width_ + 7) / 8;
// Read the repeated value (stored as byte_width bytes, little-endian)
int32_t value = 0;
for (int i = 0; i < byte_width; i++) {
uint8_t b;
if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) {
return ret;
}
value |= ((int32_t)b) << (i * 8);
}
rle_value_ = value;
is_rle_run_ = true;
current_count_ = run_length;
bitpacking_num_ = run_length;
return ret;
}

int call_read_bit_packing_buffer(uint32_t header_value) {
int bit_packed_group_count = (int)(header_value >> 1);
// in last bit-packing group, there may be some useless value,
// lastBitPackedNum indicates how many values is useful
uint8_t last_bit_packed_num;
Expand All @@ -139,6 +185,7 @@ class Int32RleDecoder : public Decoder {
current_count_ =
(bit_packed_group_count - 1) * 8 + last_bit_packed_num;
bitpacking_num_ = current_count_;
is_rle_run_ = false;
} else {
return common::E_DECODE_ERR;
}
Expand Down Expand Up @@ -236,8 +283,10 @@ class Int32RleDecoder : public Decoder {
bitpacking_num_ = 0;
is_length_and_bitwidth_readed_ = false;
current_count_ = 0;
is_rle_run_ = false;
rle_value_ = 0;
if (current_buffer_) {
delete[] current_buffer_;
common::mem_free(current_buffer_);
current_buffer_ = nullptr;
}
if (packer_) {
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/encoding/int32_sprintz_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ class Int32SprintzDecoder : public SprintzDecoder {
decode_size_ = bit_width_ & ~(1 << 7);
Int32RleDecoder decoder;
for (int i = 0; i < decode_size_; ++i) {
current_buffer_[i] = decoder.read_int(input);
if (RET_FAIL(decoder.read_int(current_buffer_[i], input))) {
return ret;
}
}
} else {
decode_size_ = block_size_ + 1;
Expand Down
99 changes: 77 additions & 22 deletions cpp/src/encoding/int64_rle_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Int64RleDecoder : public Decoder {
int bitpacking_num_;
bool is_length_and_bitwidth_readed_;
int current_count_;
bool is_rle_run_;
int64_t rle_value_;
common::ByteStream byte_cache_{common::MOD_DECODER_OBJ};
int64_t* current_buffer_;
Int64Packer* packer_;
Expand All @@ -49,6 +51,8 @@ class Int64RleDecoder : public Decoder {
bitpacking_num_(0),
is_length_and_bitwidth_readed_(false),
current_count_(0),
is_rle_run_(false),
rle_value_(0),
byte_cache_(1024, common::MOD_DECODER_OBJ),
current_buffer_(nullptr),
packer_(nullptr),
Expand All @@ -65,8 +69,7 @@ class Int64RleDecoder : public Decoder {
return common::E_TYPE_NOT_MATCH;
}
int read_int64(int64_t& ret_value, common::ByteStream& in) override {
ret_value = read_int(in);
return common::E_OK;
return read_int(ret_value, in);
}
int read_float(float& ret_value, common::ByteStream& in) override {
return common::E_TYPE_NOT_MATCH;
Expand All @@ -86,6 +89,8 @@ class Int64RleDecoder : public Decoder {
bit_width_ = 0;
bitpacking_num_ = 0;
current_count_ = 0;
is_rle_run_ = false;
rle_value_ = 0;
}

bool has_next(common::ByteStream& buffer) {
Expand All @@ -100,30 +105,69 @@ class Int64RleDecoder : public Decoder {
return current_count_ > 0 || byte_cache_.remaining_size() > 0;
}

int64_t read_int(common::ByteStream& buffer) {
int read_int(int64_t& result, common::ByteStream& buffer) {
int ret = common::E_OK;
if (!is_length_and_bitwidth_readed_) {
// start to reader a new rle+bit-packing pattern
read_length_and_bitwidth(buffer);
if (RET_FAIL(read_length_and_bitwidth(buffer))) {
return ret;
}
}
if (current_count_ == 0) {
uint8_t header;
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::read_ui8(header, byte_cache_))) {
// The header is encoded as an unsigned varint where:
// low bit = 0 => RLE run: header_value >> 1 is the run
// count low bit = 1 => bit-packing: header_value >> 1 is the
// group count
uint32_t header_value = 0;
if (RET_FAIL(common::SerializationUtil::read_var_uint(
header_value, byte_cache_))) {
return ret;
}
call_read_bit_packing_buffer(header);
if (header_value & 1) {
if (RET_FAIL(call_read_bit_packing_buffer(header_value))) {
return ret;
}
} else {
if (RET_FAIL(call_read_rle_run(header_value))) {
return ret;
}
}
}
--current_count_;
int64_t result = current_buffer_[bitpacking_num_ - current_count_ - 1];
result = is_rle_run_
? rle_value_
: current_buffer_[bitpacking_num_ - current_count_ - 1];
if (!has_next_package()) {
is_length_and_bitwidth_readed_ = false;
}
return result;
return ret;
}

int call_read_bit_packing_buffer(uint8_t header) {
int bit_packed_group_count = (int)(header >> 1);
int call_read_rle_run(uint32_t header_value) {
int ret = common::E_OK;
int run_length = (int)(header_value >> 1);
if (run_length <= 0) {
return common::E_DECODE_ERR;
}
int byte_width = (bit_width_ + 7) / 8;
// Read the repeated value (stored as byte_width bytes, little-endian)
int64_t value = 0;
for (int i = 0; i < byte_width; i++) {
uint8_t b;
if (RET_FAIL(common::SerializationUtil::read_ui8(b, byte_cache_))) {
return ret;
}
value |= ((int64_t)b) << (i * 8);
}
rle_value_ = value;
is_rle_run_ = true;
current_count_ = run_length;
bitpacking_num_ = run_length;
return ret;
}

int call_read_bit_packing_buffer(uint32_t header_value) {
int bit_packed_group_count = (int)(header_value >> 1);
// in last bit-packing group, there may be some useless value,
// lastBitPackedNum indicates how many values is useful
uint8_t last_bit_packed_num;
Expand All @@ -136,39 +180,45 @@ class Int64RleDecoder : public Decoder {
current_count_ =
(bit_packed_group_count - 1) * 8 + last_bit_packed_num;
bitpacking_num_ = current_count_;
is_rle_run_ = false;
} else {
printf(
"tsfile-encoding IntRleDecoder: bit_packed_group_count %d, "
"smaller "
"than 1",
bit_packed_group_count);
return common::E_DECODE_ERR;
}
read_bit_packing_buffer(bit_packed_group_count, last_bit_packed_num);
ret = read_bit_packing_buffer(bit_packed_group_count,
last_bit_packed_num);
return ret;
}

void read_bit_packing_buffer(int bit_packed_group_count,
int last_bit_packed_num) {
int read_bit_packing_buffer(int bit_packed_group_count,
int last_bit_packed_num) {
int ret = common::E_OK;
if (current_buffer_ != nullptr) {
common::mem_free(current_buffer_);
}
current_buffer_ = static_cast<int64_t*>(
common::mem_alloc(sizeof(int64_t) * bit_packed_group_count * 8,
common::MOD_DECODER_OBJ));
if (IS_NULL(current_buffer_)) {
return common::E_OOM;
}
int bytes_to_read = bit_packed_group_count * bit_width_;
if (bytes_to_read > (int)byte_cache_.remaining_size()) {
bytes_to_read = byte_cache_.remaining_size();
}
std::vector<unsigned char> bytes(bytes_to_read);

for (int i = 0; i < bytes_to_read; i++) {
common::SerializationUtil::read_ui8(bytes[i], byte_cache_);
if (RET_FAIL(common::SerializationUtil::read_ui8(bytes[i],
byte_cache_))) {
return ret;
}
}

// save all int values in currentBuffer
packer_->unpack_all_values(
bytes.data(), bytes_to_read,
current_buffer_); // decode from bytes, save in currentBuffer
return ret;
}

int read_length_and_bitwidth(common::ByteStream& buffer) {
Expand All @@ -177,6 +227,9 @@ class Int64RleDecoder : public Decoder {
common::SerializationUtil::read_var_uint(length_, buffer))) {
return common::E_PARTIAL_READ;
} else {
if (tmp_buf_) {
common::mem_free(tmp_buf_);
}
tmp_buf_ =
(uint8_t*)common::mem_alloc(length_, common::MOD_DECODER_OBJ);
if (tmp_buf_ == nullptr) {
Expand Down Expand Up @@ -222,6 +275,8 @@ class Int64RleDecoder : public Decoder {
bitpacking_num_ = 0;
is_length_and_bitwidth_readed_ = false;
current_count_ = 0;
is_rle_run_ = false;
rle_value_ = 0;
if (current_buffer_) {
common::mem_free(current_buffer_);
current_buffer_ = nullptr;
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/encoding/int64_sprintz_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@ class Int64SprintzDecoder : public SprintzDecoder {
decode_size_ = bit_width_ & ~(1 << 7);
Int64RleDecoder decoder;
for (int i = 0; i < decode_size_; ++i) {
current_buffer_[i] = decoder.read_int(input);
if (RET_FAIL(decoder.read_int(current_buffer_[i], input))) {
return ret;
}
}
} else {
decode_size_ = block_size_ + 1;
Expand Down
Loading
Loading