Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions example/session/src/main/java/org/apache/iotdb/SessionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static void main(String[] args)
.password("root")
.version(Version.V_1_0)
.build();
session.open(false);
session.open(true);

// set session fetchSize
session.setFetchSize(10000);
Expand Down Expand Up @@ -110,18 +110,18 @@ public static void main(String[] args)
// deleteTimeseries();
// setTimeout();

sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
sessionEnableRedirect.setEnableQueryRedirection(true);
sessionEnableRedirect.open(false);

// set session fetchSize
sessionEnableRedirect.setFetchSize(10000);

fastLastDataQueryForOneDevice();
insertRecord4Redirect();
query4Redirect();
sessionEnableRedirect.close();
session.close();
// sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
// sessionEnableRedirect.setEnableQueryRedirection(true);
// sessionEnableRedirect.open(false);
//
// // set session fetchSize
// sessionEnableRedirect.setFetchSize(10000);
//
// fastLastDataQueryForOneDevice();
// insertRecord4Redirect();
// query4Redirect();
// sessionEnableRedirect.close();
// session.close();
}

private static void createAndDropContinuousQueries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class RpcTransportFactory extends TTransportFactory {

private final TTransportFactory inner;

private RpcTransportFactory(TTransportFactory inner) {
public RpcTransportFactory(TTransportFactory inner) {
this.inner = inner;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,19 @@
package org.apache.iotdb.rpc;

import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(TSnappyElasticFramedTransport.class);
private static final AtomicLong totalUncompressionTime = new AtomicLong(0);
private static final AtomicLong totalUncompressionCount = new AtomicLong(0);
private static final AtomicLong totalOriginalDataSize = new AtomicLong(0);
private static final AtomicLong totalCompressedDataSize = new AtomicLong(0);

public static class Factory extends TElasticFramedTransport.Factory {

Expand Down Expand Up @@ -74,6 +82,23 @@ protected int compress(byte[] input, int inOff, int len, byte[] output, int outO
@Override
protected void uncompress(byte[] input, int inOff, int size, byte[] output, int outOff)
throws IOException {
Snappy.uncompress(input, inOff, size, output, outOff);

long startTime = System.nanoTime();
int uncompressedSize = Snappy.uncompress(input, inOff, size, output, outOff);
long endTime = System.nanoTime();
totalUncompressionTime.addAndGet(endTime - startTime);
totalOriginalDataSize.addAndGet(uncompressedSize);
totalCompressedDataSize.addAndGet(size);
long count = totalUncompressionCount.incrementAndGet();
if (count % 1000 == 0) {
LOGGER.info(
"Average uncompression time: {} ms, average compression rate: {}",
totalUncompressionTime.doubleValue() / count / 1000000,
totalOriginalDataSize.doubleValue() / totalCompressedDataSize.doubleValue());
totalUncompressionCount.set(0);
totalUncompressionTime.set(0);
totalOriginalDataSize.set(0);
totalCompressedDataSize.set(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ private void init(TEndPoint endPoint, boolean useSSL, String trustStore, String
session.connectionTimeoutInMs,
trustStore,
trustStorePwd);
} else if (session.enableRPCCompression) {
RpcTransportFactory.setUseSnappy(true);
RpcTransportFactory.reInit();
transport =
RpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use TSocket.wrap
endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs);
} else {
transport =
RpcTransportFactory.INSTANCE.getTransport(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ dn_seed_config_node=127.0.0.1:10710
# this feature is under development, set this as false before it is done.
# dn_rpc_advanced_compression_enable=false

# dn_data_transfer_compression_enable=false

# Datatype: int
# dn_rpc_selector_thread_count=1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ public class IoTDBConfig {
/** whether to use Snappy compression before sending data through the network */
private boolean rpcAdvancedCompressionEnable = false;

private boolean dataTransportCompressionEnable = false;

/** Port which the JDBC server listens to. */
private int rpcPort = 6667;

Expand Down Expand Up @@ -2589,6 +2591,14 @@ public void setRpcAdvancedCompressionEnable(boolean rpcAdvancedCompressionEnable
RpcTransportFactory.setUseSnappy(this.rpcAdvancedCompressionEnable);
}

public boolean isDataTransportCompressionEnable() {
return dataTransportCompressionEnable;
}

public void setDataTransportCompressionEnable(boolean dataTransportCompressionEnable) {
this.dataTransportCompressionEnable = dataTransportCompressionEnable;
}

public int getMlogBufferSize() {
return mlogBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
Boolean.toString(conf.isRpcAdvancedCompressionEnable()))
.trim()));

conf.setDataTransportCompressionEnable(
Boolean.parseBoolean(
properties
.getProperty(
"dn_data_transfer_compression_enable",
Boolean.toString(conf.isDataTransportCompressionEnable()))
.trim()));

conf.setConnectionTimeoutInMS(
Integer.parseInt(
properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,28 @@

package org.apache.iotdb.db.protocol.thrift.impl;

import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
Expand Down Expand Up @@ -176,34 +198,10 @@
import org.apache.iotdb.tsfile.utils.TimeDuration;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang3.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;

public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(ClientRPCServiceImpl.class);
Expand Down Expand Up @@ -1639,7 +1637,8 @@ public TSStatus insertRecords(TSInsertRecordsReq req) {

// check whether measurement is legal according to syntax convention
req.setMeasurementsList(
PathUtils.checkIsLegalSingleMeasurementListsAndUpdate(req.getMeasurementsList()));
PathUtils.checkIsLegalSingleMeasurementsListsAndUpdateForRows(
req.getMeasurementsSet(), req.getMeasurementsList()));

// Step 1: transfer from TSInsertRecordsReq to Statement
InsertRowsStatement statement = StatementGenerator.createStatement(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void initThriftServiceThread() throws IllegalAccessException {
config.getRpcMaxConcurrentClientNum(),
config.getThriftServerAwaitTimeForStopService(),
new RPCServiceThriftHandler(impl),
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable());
config.isDataTransportCompressionEnable());
}

} catch (RPCServiceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.runtime.RPCServiceException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TimeoutChangeableTSnappyFramedTransport;

import org.apache.thrift.TBaseAsyncProcessor;
import org.apache.thrift.TProcessor;
Expand Down Expand Up @@ -208,6 +211,12 @@ protected AbstractThriftServiceThread(
serverTransport = openTransport(bindAddress, port);
TThreadPoolServer.Args poolArgs =
initSyncedPoolArgs(processor, threadsName, maxWorkerThreads, timeoutSecond);
if (compress) {
poolArgs.transportFactory(
new RpcTransportFactory(
new TimeoutChangeableTSnappyFramedTransport.Factory(
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY, RpcUtils.THRIFT_FRAME_MAX_SIZE)));
}
poolServer = new TThreadPoolServer(poolArgs);
poolServer.setServerEventHandler(serverEventHandler);
} catch (TTransportException e) {
Expand Down
Loading