Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -1737,6 +1737,15 @@ public static enum ConfVars {
"How many rows in the right-most join operand Hive should buffer before emitting the join result."),
HIVE_JOIN_CACHE_SIZE("hive.join.cache.size", 25000,
"How many rows in the joining tables (except the streaming table) should be cached in memory."),
HIVE_MERGE_JOIN_SKEW_THRESHOLD("hive.merge.join.skew.threshold", -1L,
"Maximum number of rows allowed per join key in a single sort-merge join task before a "
+ "skew event is reported."),
HIVE_MERGE_JOIN_SKEW_ABORT("hive.merge.join.skew.abort", false,
"When set to true and the row count is equal to hive.merge.join.skew.threshold, the task will be aborted."),
HIVE_MERGE_JOIN_SKEW_CHECK_INTERVAL("hive.merge.join.skew.check.interval", 10000L,
"Number of rows added to a join-key group between consecutive skew checks. "
+ "A lower value detects skew earlier but adds slightly more overhead. "
+ "Only effective when hive.merge.join.skew.threshold is set to a positive value."),
HIVE_PUSH_RESIDUAL_INNER("hive.join.inner.residual", false,
"Whether to push non-equi filter predicates within inner joins. This can improve efficiency in "
+ "the evaluation of certain joins, since we will not be emitting rows which are thrown away by "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

public static final String MASK_PATTERN = "#### A masked pattern was here ####";
public static final String PARTIAL_MASK_PATTERN = "#### A PARTIAL masked pattern was here ####";
public static final String MASKED_VERTEX_KILLED_PATTERN = "[Masked Vertex killed due to OTHER_VERTEX_FAILURE]";
private static final PatternReplacementPair MASK_STATS = new PatternReplacementPair(
Pattern.compile(" Num rows: [1-9][0-9]* Data size: [1-9][0-9]*"),
" Num rows: ###Masked### Data size: ###Masked###");
Expand Down Expand Up @@ -197,6 +198,7 @@
out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8"));

boolean lastWasMasked = false;
boolean lastWasVertexKilled = false;

while (null != (line = in.readLine())) {
LineProcessingResult result = processLine(line);
Expand All @@ -209,10 +211,22 @@
lastWasMasked = true;
result.partialMaskWasMatched = false;
}
lastWasVertexKilled = false;
} else if (result.line.equals(MASKED_VERTEX_KILLED_PATTERN)) {
// Deduplicate consecutive standalone vertex-killed lines — the number of sibling
// vertices still alive when the kill propagates is non-deterministic.
if (!lastWasVertexKilled) {
out.write(result.line);
out.write("\n");
lastWasVertexKilled = true;
}
lastWasMasked = false;
result.partialMaskWasMatched = false;
} else {
out.write(result.line);
out.write("\n");
lastWasMasked = false;
lastWasVertexKilled = false;
result.partialMaskWasMatched = false;
}
}
Expand Down Expand Up @@ -350,7 +364,16 @@
// We do not want the test to fail because of this.
ppm.add(new PatternReplacementPair(
Pattern.compile("Vertex killed, vertexName=(.*?),.*\\[\\1\\] killed\\/failed due to:OTHER_VERTEX_FAILURE\\]"),
"[Masked Vertex killed due to OTHER_VERTEX_FAILURE]"));
MASKED_VERTEX_KILLED_PATTERN));

// Collapse multiple consecutive embedded [Masked Vertex killed] tokens on the same line
// (the long FAILED: summary line repeats one token per killed vertex).
ppm.add(new PatternReplacementPair(Pattern.compile("(\\Q" + MASKED_VERTEX_KILLED_PATTERN + "\\E){2,}"),
MASKED_VERTEX_KILLED_PATTERN));

// The number of vertices killed when a DAG fails is a scheduling race condition —
// depends on how many sibling vertices are still running at the moment the kill propagates.
ppm.add(new PatternReplacementPair(Pattern.compile("killedVertices:[0-9]+"), "killedVertices:#Masked#"));

Check warning on line 376 in itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use concise character class syntax '\\d' instead of '[0-9]'.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7W4viYut7bsMDRr9ku&open=AZ7W4viYut7bsMDRr9ku&pullRequest=6456

partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,19 @@
qTestResultProcessor.overwriteResults(f.getPath(), outFileName);
return QTestProcessExecResult.createWithoutOutput(0);
} else {
return qTestResultProcessor.executeDiffCommand(f.getPath(), outFileName, false);
// Apply the same masking pipeline to a temporary copy of the reference file so that
// non-deterministic values (e.g. killedVertices) are normalized on both sides.
// This preserves backward compatibility with existing .q.out files that were written
// before the masking rules were introduced.
File maskedRef = new File(outFileName + ".masked_ref");
try {
FileUtils.copyFile(new File(outFileName), maskedRef);
qOutProcessor.maskPatterns(maskedRef.getPath());
return qTestResultProcessor.executeDiffCommand(f.getPath(), maskedRef.getPath(), false);
} finally {
maskedRef.delete();

Check warning on line 1033 in itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do something with the "boolean" value returned by "delete".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7W4vmCut7bsMDRr9kv&open=AZ7W4vmCut7bsMDRr9kv&pullRequest=6456

Check warning on line 1033 in itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "java.nio.file.Files#delete" here for better messages on error conditions.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7W4vmCut7bsMDRr9kw&open=AZ7W4vmCut7bsMDRr9kw&pullRequest=6456
new File(maskedRef.getPath() + ".orig").delete();

Check warning on line 1034 in itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Do something with the "boolean" value returned by "delete".

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7W4vmCut7bsMDRr9kx&open=AZ7W4vmCut7bsMDRr9kx&pullRequest=6456

Check warning on line 1034 in itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "java.nio.file.Files#delete" here for better messages on error conditions.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ7W4vmCut7bsMDRr9ky&open=AZ7W4vmCut7bsMDRr9ky&pullRequest=6456
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.TreeSet;

import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedJoinMonitor;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.util.NullOrdering;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.slf4j.Logger;
Expand All @@ -40,7 +43,6 @@
import org.apache.hadoop.hive.ql.exec.tez.TezContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
Expand Down Expand Up @@ -97,6 +99,10 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator<CommonMerge
transient NullOrdering nullOrdering;
transient private boolean shortcutUnmatchedRows;

transient SkewedJoinMonitor skewedMergeJoinMonitor;
transient String[] joinSkewKeyColumns;
transient String[] joinSkewTableAliases;

/** Kryo ctor. */
protected CommonMergeJoinOperator() {
super();
Expand Down Expand Up @@ -139,6 +145,13 @@ public void initializeOp(Configuration hconf) throws HiveException {
int oldVar = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVE_MAPJOIN_BUCKET_CACHE_SIZE);
shortcutUnmatchedRows = HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_JOIN_SHORTCUT_UNMATCHED_ROWS);

skewedMergeJoinMonitor = SkewedMergeJoinMonitor.createSkewedJoinMonitor(
HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_MERGE_JOIN_SKEW_THRESHOLD),
HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_MERGE_JOIN_SKEW_ABORT),
HiveConf.getLongVar(hconf, HiveConf.ConfVars.HIVE_MERGE_JOIN_SKEW_CHECK_INTERVAL), maxAlias);

initSkewJoinNames(maxAlias);

if (oldVar != 100) {
bucketSize = oldVar;
} else {
Expand Down Expand Up @@ -222,6 +235,23 @@ private Set<Integer> getFetchInputAtCloseList() {
return retval;
}

private void initSkewJoinNames(int maxAlias) {
joinSkewKeyColumns = new String[maxAlias];
joinSkewTableAliases = new String[maxAlias];

String[] descKeyNames = conf.getSkewJoinKeyNames();
String[] descTableAliases = conf.getSkewJoinTableAliases();

for (int pos = 0; pos < maxAlias; pos++) {
joinSkewKeyColumns[pos] =
(descKeyNames != null && pos < descKeyNames.length && descKeyNames[pos] != null) ? descKeyNames[pos]
: "unknown";
joinSkewTableAliases[pos] =
(descTableAliases != null && pos < descTableAliases.length && descTableAliases[pos] != null)
? descTableAliases[pos] : "unknown";
}
}

@Override
public void endGroup() throws HiveException {
// we do not want the end group to cause a checkAndGenObject
Expand Down Expand Up @@ -322,6 +352,9 @@ public void process(Object row, int tag) throws HiveException {

assert !nextKeyGroup;
candidateStorage[tag].addRow(value);

skewedMergeJoinMonitor.checkMergeJoinSkew(alias, candidateStorage[tag].rowCount(),
joinSkewKeyColumns[alias], joinSkewTableAliases[alias]);
}

private void emitUnmatchedRows(int tag, boolean force) throws HiveException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.tez.monitoring;

public class NoopSkewedMergeJoinMonitor implements SkewedJoinMonitor {

@Override
public void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias) {
// No-op
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.tez.monitoring;

import org.apache.hadoop.hive.ql.metadata.HiveException;

public interface SkewedJoinMonitor {

/**
* Checks whether a skewed data event is detected during a merge join for a given alias and its
* row count. If skew is detected, a warning is logged or an exception is thrown, depending on
* the configured action.
*
* @param alias the byte identifier of the join alias
* @param rowCount the number of rows accumulated for the current join key on this alias
* @param joinKeyColumns the join key column name(s) resolved via RowSchema for the given alias
* (schema info, not data values — e.g. "customer_id")
* @param tableAlias the table/subquery alias for the given alias (big or small table)
* @throws HiveException if skew is detected and abort mode is enabled
*/
void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias) throws HiveException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.tez.monitoring;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SkewedMergeJoinMonitor implements SkewedJoinMonitor {

private final long mergeJoinSkewThreshold;
private final boolean mergeJoinSkewAbort;
private final long mergeJoinSkewCheckInterval;
private final boolean[] skewedKeyFlagged;

private static final Logger LOG = LoggerFactory.getLogger(SkewedMergeJoinMonitor.class.getName());

public SkewedMergeJoinMonitor(long mergeJoinSkewThreshold, boolean mergeJoinSkewAbort,
long mergeJoinSkewCheckInterval, int maxAlias) {
this.mergeJoinSkewThreshold = mergeJoinSkewThreshold;
this.mergeJoinSkewAbort = mergeJoinSkewAbort;
this.mergeJoinSkewCheckInterval = mergeJoinSkewCheckInterval > 0 ? mergeJoinSkewCheckInterval : 1;
skewedKeyFlagged = new boolean[maxAlias];
}

@VisibleForTesting
public boolean isActive() {
return mergeJoinSkewThreshold > 0;
}

@VisibleForTesting
public boolean shouldBeFlagged(byte alias, long rowCount) {
return rowCount >= mergeJoinSkewThreshold && !skewedKeyFlagged[alias];
}

@VisibleForTesting
public boolean isFlagged(int alias) {
return skewedKeyFlagged[alias];
}

@VisibleForTesting
public boolean isDueForCheck(long rowCount) {
return (rowCount % mergeJoinSkewCheckInterval == 0) || (rowCount >= mergeJoinSkewThreshold);
}

@Override
public void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias)
throws HiveException {
if (!isActive()) {
return;
}
if (skewedKeyFlagged[alias]) {
return;
}
if (!isDueForCheck(rowCount)) {
return;
}
if (!shouldBeFlagged(alias, rowCount)) {
return;
}

skewedKeyFlagged[alias] = true;

String msg = String.format(
"Data skew detected in merge join: %d rows accumulated for join column(s) [%s]"
+ " in table alias [%s]. Consider reviewing data distribution.",
rowCount, joinKeyColumns, tableAlias);

if (mergeJoinSkewAbort) {
throw new HiveException(msg);
} else {
LOG.warn(msg);
}
}

public static SkewedJoinMonitor createSkewedJoinMonitor(long mergeJoinSkewThreshold, boolean mergeJoinSkewAbort,
long mergeJoinSkewCheckInterval, int maxAlias) {
if (mergeJoinSkewThreshold > 0) {
return new SkewedMergeJoinMonitor(mergeJoinSkewThreshold, mergeJoinSkewAbort, mergeJoinSkewCheckInterval,
maxAlias);
} else {
return new NoopSkewedMergeJoinMonitor();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.metainfo.query.JoinOperationMetadataResolver;
import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile;
import org.apache.hadoop.hive.ql.parse.GenTezUtils;
import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
Expand Down Expand Up @@ -626,6 +627,30 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont
}
}
mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());

// Resolve original table names and key column names from the compile-time
// operator tree only when skew monitoring is actually enabled
// (hive.merge.join.skew.threshold > 0). Tree traversal is skipped
// entirely when the feature is off so there is no overhead for the
// common case.
if (HiveConf.getLongVar(context.conf, HiveConf.ConfVars.HIVE_MERGE_JOIN_SKEW_THRESHOLD) > 0) {
JoinOperationMetadataResolver resolver = new JoinOperationMetadataResolver();
populateSkewJoinNames(resolver, joinOp, mergeJoinOp);
}
}

/**
* The results are stored as non-transient fields in
* {@link CommonMergeJoinDesc} so they survive plan serialization to the Tez task
* and can be read by the skew-join monitor at runtime.
*
*/
private void populateSkewJoinNames(JoinOperationMetadataResolver resolver, JoinOperator joinOp,
CommonMergeJoinOperator mergeJoinOp) {
resolver.resolveJoinMetadata(joinOp);

mergeJoinOp.getConf().setSkewJoinKeyNames(resolver.getKeyNames());
mergeJoinOp.getConf().setSkewJoinTableAliases(resolver.getTableAliases());
}

private void setAllChildrenTraits(Operator<? extends OperatorDesc> currentOp, OpTraits opTraits) {
Expand Down
Loading
Loading