Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.Constants;
Expand Down Expand Up @@ -48,6 +51,8 @@

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -58,8 +63,8 @@

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.dropTables;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;

/**
* Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes.
Expand Down Expand Up @@ -184,7 +189,8 @@ protected void verifySuccessfulCompaction(int expectedSuccessfulCompactions) thr
protected HiveHookEvents.HiveHookEventProto getRelatedTezEvent(String dbTableName) throws Exception {
int retryCount = 3;
while (retryCount-- > 0) {
List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>> readers = TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
List<ProtoMessageReader<HiveHookEvents.HiveHookEventProto>>
readers = TestHiveProtoLoggingHook.getTestReader(conf, tmpFolder);
for (ProtoMessageReader<HiveHookEvents.HiveHookEventProto> reader : readers) {
do {
HiveHookEvents.HiveHookEventProto event;
Expand Down Expand Up @@ -541,9 +547,40 @@ protected List<String> getBucketData(String tblName, String bucketId) throws Exc
"select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID, a, b", driver);
}

protected List<RowInfo> getStructuredBucketData(String tblName, String bucketId) throws Exception {
List<String> getBucketData = getBucketData(tblName, bucketId);

List<RowInfo> result = new ArrayList<>(getBucketData.size());
for (String row : getBucketData) {
result.add(RowInfo.fromRawString(row));
}

return result;
}

protected void dropTable(String tblName) throws Exception {
executeStatementOnDriver("drop table " + tblName, driver);
}

protected record RowInfo(long writeId, long bucketId, long rowId, TestRebalanceCompactor.RowData rowData) {
private static final ObjectMapper MAPPER = new ObjectMapper();

static RowInfo fromRawString(String row) throws JsonProcessingException {
// Example row data to parse: "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":10}\t5\t4",

String[] parts = row.split("\t");

JsonNode json = MAPPER.readTree(parts[0]);

return new RowInfo(
json.get("writeid").asLong(),
json.get("bucketid").asLong(),
json.get("rowid").asLong(),

new TestRebalanceCompactor.RowData(Arrays.copyOfRange(parts, 1, parts.length))
);
}
}
}

protected Initiator createInitiator() throws Exception {
Expand Down
Loading
Loading