diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 662ece1c907b..600f836db283 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1737,6 +1737,15 @@ public static enum ConfVars { "How many rows in the right-most join operand Hive should buffer before emitting the join result."), HIVE_JOIN_CACHE_SIZE("hive.join.cache.size", 25000, "How many rows in the joining tables (except the streaming table) should be cached in memory."), + HIVE_MERGE_JOIN_SKEW_THRESHOLD("hive.merge.join.skew.threshold", -1L, + "Maximum number of rows allowed per join key in a single sort-merge join task before a " + + "skew event is reported."), + HIVE_MERGE_JOIN_SKEW_ABORT("hive.merge.join.skew.abort", false, + "When set to true and the row count is equal to hive.merge.join.skew.threshold, the task will be aborted."), + HIVE_MERGE_JOIN_SKEW_CHECK_INTERVAL("hive.merge.join.skew.check.interval", 10000L, + "Number of rows added to a join-key group between consecutive skew checks. " + + "A lower value detects skew earlier but adds slightly more overhead. " + + "Only effective when hive.merge.join.skew.threshold is set to a positive value."), HIVE_PUSH_RESIDUAL_INNER("hive.join.inner.residual", false, "Whether to push non-equi filter predicates within inner joins. This can improve efficiency in " + "the evaluation of certain joins, since we will not be emitting rows which are thrown away by " diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java index b418c70538ca..2bd2bd83584e 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java @@ -58,6 +58,7 @@ public class QOutProcessor { public static final String MASK_PATTERN = "#### A masked pattern was here ####"; public static final String PARTIAL_MASK_PATTERN = "#### A PARTIAL masked pattern was here ####"; + public static final String MASKED_VERTEX_KILLED_PATTERN = "[Masked Vertex killed due to OTHER_VERTEX_FAILURE]"; private static final PatternReplacementPair MASK_STATS = new PatternReplacementPair( Pattern.compile(" Num rows: [1-9][0-9]* Data size: [1-9][0-9]*"), " Num rows: ###Masked### Data size: ###Masked###"); @@ -197,6 +198,7 @@ public void maskPatterns(String fname) throws Exception { out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), "UTF-8")); boolean lastWasMasked = false; + boolean lastWasVertexKilled = false; while (null != (line = in.readLine())) { LineProcessingResult result = processLine(line); @@ -209,10 +211,22 @@ public void maskPatterns(String fname) throws Exception { lastWasMasked = true; result.partialMaskWasMatched = false; } + lastWasVertexKilled = false; + } else if (result.line.equals(MASKED_VERTEX_KILLED_PATTERN)) { + // Deduplicate consecutive standalone vertex-killed lines — the number of sibling + // vertices still alive when the kill propagates is non-deterministic. + if (!lastWasVertexKilled) { + out.write(result.line); + out.write("\n"); + lastWasVertexKilled = true; + } + lastWasMasked = false; + result.partialMaskWasMatched = false; } else { out.write(result.line); out.write("\n"); lastWasMasked = false; + lastWasVertexKilled = false; result.partialMaskWasMatched = false; } } @@ -350,7 +364,16 @@ private final static class PatternReplacementPair { // We do not want the test to fail because of this. ppm.add(new PatternReplacementPair( Pattern.compile("Vertex killed, vertexName=(.*?),.*\\[\\1\\] killed\\/failed due to:OTHER_VERTEX_FAILURE\\]"), - "[Masked Vertex killed due to OTHER_VERTEX_FAILURE]")); + MASKED_VERTEX_KILLED_PATTERN)); + + // Collapse multiple consecutive embedded [Masked Vertex killed] tokens on the same line + // (the long FAILED: summary line repeats one token per killed vertex). + ppm.add(new PatternReplacementPair(Pattern.compile("(\\Q" + MASKED_VERTEX_KILLED_PATTERN + "\\E){2,}"), + MASKED_VERTEX_KILLED_PATTERN)); + + // The number of vertices killed when a DAG fails is a scheduling race condition — + // depends on how many sibling vertices are still running at the moment the kill propagates. + ppm.add(new PatternReplacementPair(Pattern.compile("killedVertices:[0-9]+"), "killedVertices:#Masked#")); partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]); } diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 447102f91c49..307f96baea84 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -1020,7 +1020,19 @@ public QTestProcessExecResult checkCliDriverResults() throws Exception { qTestResultProcessor.overwriteResults(f.getPath(), outFileName); return QTestProcessExecResult.createWithoutOutput(0); } else { - return qTestResultProcessor.executeDiffCommand(f.getPath(), outFileName, false); + // Apply the same masking pipeline to a temporary copy of the reference file so that + // non-deterministic values (e.g. killedVertices) are normalized on both sides. + // This preserves backward compatibility with existing .q.out files that were written + // before the masking rules were introduced. + File maskedRef = new File(outFileName + ".masked_ref"); + try { + FileUtils.copyFile(new File(outFileName), maskedRef); + qOutProcessor.maskPatterns(maskedRef.getPath()); + return qTestResultProcessor.executeDiffCommand(f.getPath(), maskedRef.getPath(), false); + } finally { + maskedRef.delete(); + new File(maskedRef.getPath() + ".orig").delete(); + } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java index f9e7a40e10e7..33c4cf336839 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonMergeJoinOperator.java @@ -27,6 +27,9 @@ import java.util.TreeSet; import org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedJoinMonitor; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.util.NullOrdering; import org.apache.hadoop.hive.serde.serdeConstants; import org.slf4j.Logger; @@ -40,7 +43,6 @@ import org.apache.hadoop.hive.ql.exec.tez.TezContext; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc; -import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.JoinCondDesc; import org.apache.hadoop.hive.ql.plan.JoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; @@ -97,6 +99,10 @@ public class CommonMergeJoinOperator extends AbstractMapJoinOperator getFetchInputAtCloseList() { return retval; } + private void initSkewJoinNames(int maxAlias) { + joinSkewKeyColumns = new String[maxAlias]; + joinSkewTableAliases = new String[maxAlias]; + + String[] descKeyNames = conf.getSkewJoinKeyNames(); + String[] descTableAliases = conf.getSkewJoinTableAliases(); + + for (int pos = 0; pos < maxAlias; pos++) { + joinSkewKeyColumns[pos] = + (descKeyNames != null && pos < descKeyNames.length && descKeyNames[pos] != null) ? descKeyNames[pos] + : "unknown"; + joinSkewTableAliases[pos] = + (descTableAliases != null && pos < descTableAliases.length && descTableAliases[pos] != null) + ? descTableAliases[pos] : "unknown"; + } + } + @Override public void endGroup() throws HiveException { // we do not want the end group to cause a checkAndGenObject @@ -322,6 +352,9 @@ public void process(Object row, int tag) throws HiveException { assert !nextKeyGroup; candidateStorage[tag].addRow(value); + + skewedMergeJoinMonitor.checkMergeJoinSkew(alias, candidateStorage[tag].rowCount(), + joinSkewKeyColumns[alias], joinSkewTableAliases[alias]); } private void emitUnmatchedRows(int tag, boolean force) throws HiveException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/NoopSkewedMergeJoinMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/NoopSkewedMergeJoinMonitor.java new file mode 100644 index 000000000000..d74bd18b26c1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/NoopSkewedMergeJoinMonitor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +public class NoopSkewedMergeJoinMonitor implements SkewedJoinMonitor { + + @Override + public void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias) { + // No-op + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedJoinMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedJoinMonitor.java new file mode 100644 index 000000000000..efba7b990412 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedJoinMonitor.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface SkewedJoinMonitor { + + /** + * Checks whether a skewed data event is detected during a merge join for a given alias and its + * row count. If skew is detected, a warning is logged or an exception is thrown, depending on + * the configured action. + * + * @param alias the byte identifier of the join alias + * @param rowCount the number of rows accumulated for the current join key on this alias + * @param joinKeyColumns the join key column name(s) resolved via RowSchema for the given alias + * (schema info, not data values — e.g. "customer_id") + * @param tableAlias the table/subquery alias for the given alias (big or small table) + * @throws HiveException if skew is detected and abort mode is enabled + */ + void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias) throws HiveException; +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedMergeJoinMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedMergeJoinMonitor.java new file mode 100644 index 000000000000..f177084e9c9c --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/SkewedMergeJoinMonitor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.tez.monitoring; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SkewedMergeJoinMonitor implements SkewedJoinMonitor { + + private final long mergeJoinSkewThreshold; + private final boolean mergeJoinSkewAbort; + private final long mergeJoinSkewCheckInterval; + private final boolean[] skewedKeyFlagged; + + private static final Logger LOG = LoggerFactory.getLogger(SkewedMergeJoinMonitor.class.getName()); + + public SkewedMergeJoinMonitor(long mergeJoinSkewThreshold, boolean mergeJoinSkewAbort, + long mergeJoinSkewCheckInterval, int maxAlias) { + this.mergeJoinSkewThreshold = mergeJoinSkewThreshold; + this.mergeJoinSkewAbort = mergeJoinSkewAbort; + this.mergeJoinSkewCheckInterval = mergeJoinSkewCheckInterval > 0 ? mergeJoinSkewCheckInterval : 1; + skewedKeyFlagged = new boolean[maxAlias]; + } + + @VisibleForTesting + public boolean isActive() { + return mergeJoinSkewThreshold > 0; + } + + @VisibleForTesting + public boolean shouldBeFlagged(byte alias, long rowCount) { + return rowCount >= mergeJoinSkewThreshold && !skewedKeyFlagged[alias]; + } + + @VisibleForTesting + public boolean isFlagged(int alias) { + return skewedKeyFlagged[alias]; + } + + @VisibleForTesting + public boolean isDueForCheck(long rowCount) { + return (rowCount % mergeJoinSkewCheckInterval == 0) || (rowCount >= mergeJoinSkewThreshold); + } + + @Override + public void checkMergeJoinSkew(byte alias, long rowCount, String joinKeyColumns, String tableAlias) + throws HiveException { + if (!isActive()) { + return; + } + if (skewedKeyFlagged[alias]) { + return; + } + if (!isDueForCheck(rowCount)) { + return; + } + if (!shouldBeFlagged(alias, rowCount)) { + return; + } + + skewedKeyFlagged[alias] = true; + + String msg = String.format( + "Data skew detected in merge join: %d rows accumulated for join column(s) [%s]" + + " in table alias [%s]. Consider reviewing data distribution.", + rowCount, joinKeyColumns, tableAlias); + + if (mergeJoinSkewAbort) { + throw new HiveException(msg); + } else { + LOG.warn(msg); + } + } + + public static SkewedJoinMonitor createSkewedJoinMonitor(long mergeJoinSkewThreshold, boolean mergeJoinSkewAbort, + long mergeJoinSkewCheckInterval, int maxAlias) { + if (mergeJoinSkewThreshold > 0) { + return new SkewedMergeJoinMonitor(mergeJoinSkewThreshold, mergeJoinSkewAbort, mergeJoinSkewCheckInterval, + maxAlias); + } else { + return new NoopSkewedMergeJoinMonitor(); + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 0d94dff357c0..9a25df32082c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor; import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.metainfo.query.JoinOperationMetadataResolver; import org.apache.hadoop.hive.ql.optimizer.physical.LlapClusterStateForCompile; import org.apache.hadoop.hive.ql.parse.GenTezUtils; import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext; @@ -626,6 +627,30 @@ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext cont } } mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators()); + + // Resolve original table names and key column names from the compile-time + // operator tree only when skew monitoring is actually enabled + // (hive.merge.join.skew.threshold > 0). Tree traversal is skipped + // entirely when the feature is off so there is no overhead for the + // common case. + if (HiveConf.getLongVar(context.conf, HiveConf.ConfVars.HIVE_MERGE_JOIN_SKEW_THRESHOLD) > 0) { + JoinOperationMetadataResolver resolver = new JoinOperationMetadataResolver(); + populateSkewJoinNames(resolver, joinOp, mergeJoinOp); + } + } + + /** + * The results are stored as non-transient fields in + * {@link CommonMergeJoinDesc} so they survive plan serialization to the Tez task + * and can be read by the skew-join monitor at runtime. + * + */ + private void populateSkewJoinNames(JoinOperationMetadataResolver resolver, JoinOperator joinOp, + CommonMergeJoinOperator mergeJoinOp) { + resolver.resolveJoinMetadata(joinOp); + + mergeJoinOp.getConf().setSkewJoinKeyNames(resolver.getKeyNames()); + mergeJoinOp.getConf().setSkewJoinTableAliases(resolver.getTableAliases()); } private void setAllChildrenTraits(Operator currentOp, OpTraits opTraits) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/query/JoinOperationMetadataResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/query/JoinOperationMetadataResolver.java new file mode 100644 index 000000000000..abc2b6b77791 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/query/JoinOperationMetadataResolver.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.metainfo.query; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.OperatorUtils; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; + +import java.util.List; + +/** + * Resolves Join Operation metainformation (tablename, join column names, etc). + * + */ +public class JoinOperationMetadataResolver { + + private String[] keyNames; + private String[] tableAliases; + + private static final String UNKNOWN = "unknown"; + + /** + * Resolves the original table name (or query alias) and join key column names + * for each join input position at compile time, while the full operator + * tree is still available. + * + */ + public void resolveJoinMetadata(JoinOperator joinOp) { + List> parents = joinOp.getParentOperators(); + if (parents == null || parents.isEmpty()) { + return; + } + + int numPositions = parents.size(); + keyNames = new String[numPositions]; + tableAliases = new String[numPositions]; + + for (int pos = 0; pos < numPositions; pos++) { + Operator parent = parents.get(pos); + if (parent == null) { + keyNames[pos] = UNKNOWN; + tableAliases[pos] = UNKNOWN; + continue; + } + + resolveTableName(parent, pos); + resolveColumns(parent, pos); + } + + } + + private void resolveColumns(Operator parent, int pos) { + if (!(parent instanceof ReduceSinkOperator)) { + keyNames[pos] = UNKNOWN; + return; + } + + ReduceSinkOperator rsOp = (ReduceSinkOperator) parent; + ReduceSinkDesc rsConf = rsOp.getConf(); + if (rsConf == null) { + keyNames[pos] = UNKNOWN; + return; + } + + List keyCols = rsConf.getKeyCols(); + if (keyCols == null || keyCols.isEmpty()) { + keyNames[pos] = UNKNOWN; + return; + } + + List> rsParents = rsOp.getParentOperators(); + RowSchema inputSchema = + (rsParents != null && !rsParents.isEmpty() && rsParents.get(0) != null) ? rsParents.get(0).getSchema() : null; + + StringBuilder sb = new StringBuilder(); + for (int k = 0; k < keyCols.size(); k++) { + if (k > 0) { + sb.append(", "); + } + ExprNodeDesc keyExpr = keyCols.get(k); + sb.append(resolveKeyColumnName(keyExpr, inputSchema)); + } + keyNames[pos] = sb.toString(); + } + + private void resolveTableName(Operator parent, int pos) { + TableScanOperator tso = OperatorUtils.findSingleOperatorUpstream(parent, TableScanOperator.class); + if (tso != null && tso.getConf() != null) { + String alias = tso.getConf().getAlias(); + String tableName = tso.getConf().getTableName(); + if (alias != null && !alias.isEmpty()) { + tableAliases[pos] = alias; + } else if (tableName != null && !tableName.isEmpty()) { + tableAliases[pos] = tableName; + } else { + tableAliases[pos] = UNKNOWN; + } + } else { + tableAliases[pos] = UNKNOWN; + } + } + + /** + * Returns the most human-readable name for a single join-key expression. + * Never returns {@code null}: falls back to {@code "unknown"} if all else fails. + */ + private String resolveKeyColumnName(ExprNodeDesc keyExpr, RowSchema inputSchema) { + if (keyExpr == null) { + return UNKNOWN; + } + if (keyExpr instanceof ExprNodeColumnDesc) { + String internalColName = ((ExprNodeColumnDesc) keyExpr).getColumn(); + if (internalColName != null && inputSchema != null) { + ColumnInfo ci = inputSchema.getColumnInfo(internalColName); + if (ci != null && ci.getAlias() != null && !ci.getAlias().isEmpty()) { + return ci.getAlias(); + } + } + // Fall back to the internal column name, at least it's something. + return internalColName != null ? internalColName : UNKNOWN; + } + // For computed expressions (UDFs, casts, …) use the expression string. + String exprStr = keyExpr.getExprString(); + return exprStr != null && !exprStr.isEmpty() ? exprStr : UNKNOWN; + } + + public String[] getKeyNames() { + return keyNames; + } + + public String[] getTableAliases() { + return tableAliases; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java index 181b4c963296..259132a97bdf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CommonMergeJoinDesc.java @@ -30,6 +30,18 @@ public class CommonMergeJoinDesc extends MapJoinDesc implements Serializable { private int numBuckets; private int mapJoinConversionPos; + /** + * Human-readable key column names per join position (indexed by alias/tag). + * Populated at compile time so they are available at task runtime for skew monitoring. + */ + private String[] skewJoinKeyNames; + + /** + * Human-readable table names per join position (indexed by alias/tag). + * Populated at compile time so they are available at task runtime for skew monitoring. + */ + private String[] skewJoinTableAliases; + CommonMergeJoinDesc() { } @@ -60,6 +72,22 @@ public void setBigTablePosition(int pos) { mapJoinConversionPos = pos; } + public String[] getSkewJoinKeyNames() { + return skewJoinKeyNames; + } + + public void setSkewJoinKeyNames(String[] skewJoinKeyNames) { + this.skewJoinKeyNames = skewJoinKeyNames; + } + + public String[] getSkewJoinTableAliases() { + return skewJoinTableAliases; + } + + public void setSkewJoinTableAliases(String[] skewJoinTableAliases) { + this.skewJoinTableAliases = skewJoinTableAliases; + } + @Override public boolean isSame(OperatorDesc other) { if (super.isSame(other)) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java new file mode 100644 index 000000000000..074d3c2dd55a --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestCommonMergeJoinSkewThreshold.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.ql.exec.tez.monitoring.SkewedMergeJoinMonitor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Unit tests for merge join skew threshold detection in {@link CommonMergeJoinOperator}. + */ +public class TestCommonMergeJoinSkewThreshold { + + private static final String KEY_COLS_A = "customer_id"; + private static final String TABLE_ALIAS_A = "orders"; + + private CommonMergeJoinOperator op; + + @Before + public void setUp() { + op = new CommonMergeJoinOperator(); + } + + @Test + public void testDisabledNoWarnNoThrow() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(-1L, false, 1L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, Long.MAX_VALUE, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertFalse(monitor.isActive()); + Assert.assertFalse("tag 0 should still be clear", monitor.isFlagged(0)); + } + + @Test + public void testBelowThresholdIsOk() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(1000L, false, 1L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 999L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertFalse("tag 0 should still be clear", monitor.isFlagged(0)); + } + + @Test + public void testAtThresholdWarnOnce() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(500L, false, 1L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 500L, KEY_COLS_A, TABLE_ALIAS_A); + + Assert.assertTrue("skewedKeyFlagged[0] must be set after the first crossing", monitor.isFlagged(0)); + } + + @Test + public void testFlagsAreIndependentPerTag() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(100L, false, 1L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 200L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertTrue("tag 0 should be flagged", monitor.isFlagged(0)); + Assert.assertFalse("tag 1 should still be clear", monitor.isFlagged(1)); + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 1, 150L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertTrue("tag 1 should now be flagged", monitor.isFlagged(1)); + } + + @Test + public void testAbortModeBelowThresholdNoThrow() throws HiveException { + op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(100L, true, 1L, 4); + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 99L, KEY_COLS_A, TABLE_ALIAS_A); + } + + @Test + public void testAbortModeThrowsHiveException() { + op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(100L, true, 1L, 4); + + try { + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 200L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.fail("Expected HiveException to be thrown in abort mode"); + } catch (HiveException e) { + String msg = e.getMessage(); + Assert.assertNotNull(msg); + Assert.assertTrue("Message should mention row count 200", msg.contains("200")); + Assert.assertTrue("Message should mention join column name", msg.contains(KEY_COLS_A)); + Assert.assertTrue("Message should mention table alias", msg.contains(TABLE_ALIAS_A)); + } + } + + @Test + public void testIntervalSkipsCheckOnNonBoundaryRows() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(500L, false, 10000L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 100L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertFalse("Should not be flagged at non-boundary row below threshold", monitor.isFlagged(0)); + } + + @Test + public void testIntervalChecksOnBoundaryRow() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(50L, false, 100L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 100L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertTrue("Should be flagged at boundary row that exceeds threshold", monitor.isFlagged(0)); + } + + @Test + public void testIntervalAlwaysChecksWhenRowCountExceedsThreshold() throws HiveException { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(500L, false, 10000L, 4); + op.skewedMergeJoinMonitor = monitor; + + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 600L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.assertTrue("Should be flagged when rowCount >= threshold even at non-boundary row", monitor.isFlagged(0)); + } + + @Test + public void testIsDueForCheckBoundaryAndThreshold() { + SkewedMergeJoinMonitor monitor = new SkewedMergeJoinMonitor(500L, false, 100L, 4); + + Assert.assertTrue("Row 100 is a boundary (100 % 100 == 0)", monitor.isDueForCheck(100L)); + Assert.assertFalse("Row 50 is not a boundary and below threshold", monitor.isDueForCheck(50L)); + Assert.assertTrue("Row 500 equals threshold, always due", monitor.isDueForCheck(500L)); + Assert.assertTrue("Row 700 exceeds threshold, always due", monitor.isDueForCheck(700L)); + Assert.assertTrue("Row 200 is a boundary (200 % 100 == 0)", monitor.isDueForCheck(200L)); + } + + @Test + public void testSkewMessageContainsJoinKeyColumnsAndRowCount() { + op.skewedMergeJoinMonitor = new SkewedMergeJoinMonitor(10L, true, 1L, 4); + + try { + op.skewedMergeJoinMonitor.checkMergeJoinSkew((byte) 0, 50L, KEY_COLS_A, TABLE_ALIAS_A); + Assert.fail("Expected HiveException"); + } catch (HiveException e) { + String msg = e.getMessage(); + Assert.assertTrue("Message must contain join key column name", msg.contains(KEY_COLS_A)); + Assert.assertTrue("Message must contain row count", msg.contains("50")); + Assert.assertTrue("Message must contain table alias", msg.contains(TABLE_ALIAS_A)); + } + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java new file mode 100644 index 000000000000..3a3838ec1489 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/metainfo/query/TestJoinOperationMetadataResolver.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.optimizer.metainfo.query; + +import org.apache.hadoop.hive.ql.exec.ColumnInfo; +import org.apache.hadoop.hive.ql.exec.JoinOperator; +import org.apache.hadoop.hive.ql.exec.Operator; +import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator; +import org.apache.hadoop.hive.ql.exec.RowSchema; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; +import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.OperatorDesc; +import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link JoinOperationMetadataResolver}. + *

+ * All Hive operator/plan objects are mocked so the test has no Hadoop/Hive + * runtime dependency. + */ +class TestJoinOperationMetadataResolver { + + /** + * Tracks the intermediate "schema-parent" operator created by rsWithColumnKey/rsWithColumnKeys + * so that attachTso() can wire the TSO into the correct position in the chain. + */ + private final Map> schemaParents = new HashMap<>(); + private JoinOperationMetadataResolver resolver; + + @BeforeEach + void setUp() { + resolver = new JoinOperationMetadataResolver(); + schemaParents.clear(); + } + + @Test + void resolveJoinMetadataNullParentsLeavesArraysNull() { + JoinOperator join = mock(JoinOperator.class); + when(join.getParentOperators()).thenReturn(null); + + resolver.resolveJoinMetadata(join); + + assertNull(resolver.getKeyNames(), "keyNames should remain null when parent list is null"); + assertNull(resolver.getTableAliases(), "tableAliases should remain null when parent list is null"); + } + + @Test + void resolveJoinMetadataEmptyParentsLeavesArraysNull() { + JoinOperator join = mock(JoinOperator.class); + when(join.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(join); + + assertNull(resolver.getKeyNames(), "keyNames should remain null when parent list is empty"); + assertNull(resolver.getTableAliases(), "tableAliases should remain null when parent list is empty"); + } + + @Test + void resolveJoinMetadataNullParentSlotProducesUnknown() { + JoinOperator join = mock(JoinOperator.class); + when(join.getParentOperators()).thenReturn(Collections.singletonList(null)); + + resolver.resolveJoinMetadata(join); + + assertEquals("unknown", resolver.getKeyNames()[0]); + assertEquals("unknown", resolver.getTableAliases()[0]); + } + + @Test + void resolveTableAliasPrefersTsoAliasOverTableName() { + RowSchema schema = mock(RowSchema.class); + ReduceSinkOperator rs = rsWithColumnKey("_col0", schema); + attachTso(rs, "a", "orders"); + + JoinOperator join = joinOp(rs); + resolver.resolveJoinMetadata(join); + + assertEquals("a", resolver.getTableAliases()[0], "Should prefer the TSO alias over the physical table name"); + } + + @Test + void resolveTableAliasFallsBackToTableNameWhenAliasBlank() { + RowSchema schema = mock(RowSchema.class); + ReduceSinkOperator rs = rsWithColumnKey("_col0", schema); + attachTso(rs, "", "orders"); + + JoinOperator join = joinOp(rs); + resolver.resolveJoinMetadata(join); + + assertEquals("orders", resolver.getTableAliases()[0], "Should fall back to table name when alias is blank"); + } + + @Test + void resolveTableAliasUnknownWhenNoTsoFound() { + // Parent is not a ReduceSinkOperator (no TS reachable) + Operator oddParent = mock(Operator.class); + when(oddParent.getParentOperators()).thenReturn(Collections.emptyList()); + + JoinOperator join = joinOp(oddParent); + resolver.resolveJoinMetadata(join); + + assertEquals("unknown", resolver.getTableAliases()[0]); + } + + @Test + void resolveKeyNameColumnRefResolvesToSchemaAlias() { + ColumnInfo ci = mock(ColumnInfo.class); + when(ci.getAlias()).thenReturn("key"); + + RowSchema schema = mock(RowSchema.class); + when(schema.getColumnInfo("_col0")).thenReturn(ci); + + ReduceSinkOperator rs = rsWithColumnKey("_col0", schema); + + // attach a TSO so table alias is set too + attachTso(rs, "a", "t"); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("key", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameColumnRefFallsBackToInternalNameWhenNoSchemaAlias() { + ColumnInfo ci = mock(ColumnInfo.class); + when(ci.getAlias()).thenReturn(""); // empty alias + + RowSchema schema = mock(RowSchema.class); + when(schema.getColumnInfo("_col0")).thenReturn(ci); + + ReduceSinkOperator rs = rsWithColumnKey("_col0", schema); + attachTso(rs, "a", "t"); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("_col0", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameColumnRefFallsBackToInternalNameWhenColumnInfoNull() { + RowSchema schema = mock(RowSchema.class); + when(schema.getColumnInfo("_col0")).thenReturn(null); + + ReduceSinkOperator rs = rsWithColumnKey("_col0", schema); + attachTso(rs, "a", "t"); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("_col0", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameColumnRefUnknownWhenInternalNameNull() { + ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class); + when(keyExpr.getColumn()).thenReturn(null); + + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr)); + + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("unknown", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameExprKeyUsesExprString() { + ReduceSinkOperator rs = rsWithExprKey("upper(val)"); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("upper(val)", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameExprKeyUnknownWhenExprStringNull() { + ExprNodeDesc keyExpr = mock(ExprNodeDesc.class); + when(keyExpr.getExprString()).thenReturn(null); + + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr)); + + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("unknown", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameUnknownWhenReduceSinkConfNull() { + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(null); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("unknown", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameUnknownWhenKeyColsNull() { + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(null); + + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("unknown", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameUnknownWhenKeyColsEmpty() { + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(Collections.emptyList()); + + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("unknown", resolver.getKeyNames()[0]); + } + + @Test + void resolveKeyNameCompoundKeyJoinedWithCommaSpace() { + ColumnInfo ci1 = mock(ColumnInfo.class); + when(ci1.getAlias()).thenReturn("dept_id"); + ColumnInfo ci2 = mock(ColumnInfo.class); + when(ci2.getAlias()).thenReturn("year"); + + RowSchema schema = mock(RowSchema.class); + when(schema.getColumnInfo("_col0")).thenReturn(ci1); + when(schema.getColumnInfo("_col1")).thenReturn(ci2); + + ReduceSinkOperator rs = rsWithColumnKeys(Arrays.asList("_col0", "_col1"), schema); + attachTso(rs, "a", "t"); + + resolver.resolveJoinMetadata(joinOp(rs)); + + assertEquals("dept_id, year", resolver.getKeyNames()[0]); + } + + @Test + void resolveJoinMetadataThreePositionsPopulatesAllThreeSlots() { + String[] tables = {"employees", "salaries", "departments"}; + String[] aliases = {"e", "s", "d"}; + String[] internalCols = {"_col0", "_col0", "_col0"}; + String[] expectedKeys = {"emp_id", "sid", "dept_id"}; + + ReduceSinkOperator[] rsList = new ReduceSinkOperator[expectedKeys.length]; + for (int i = 0; i < expectedKeys.length; i++) { + ColumnInfo ci = mock(ColumnInfo.class); + when(ci.getAlias()).thenReturn(expectedKeys[i]); + + RowSchema schema = mock(RowSchema.class); + when(schema.getColumnInfo(internalCols[i])).thenReturn(ci); + + ReduceSinkOperator rs = rsWithColumnKey(internalCols[i], schema); + attachTso(rs, aliases[i], tables[i]); + rsList[i] = rs; + } + + resolver.resolveJoinMetadata(joinOp(rsList[0], rsList[1], rsList[2])); + + assertArrayEquals(expectedKeys, resolver.getKeyNames()); + assertArrayEquals(aliases, resolver.getTableAliases()); + } + + @Test + void resolveJoinMetadataArraySizeMatchesParentCount() { + ReduceSinkOperator rs0 = rsWithExprKey("id"); + ReduceSinkOperator rs1 = rsWithExprKey("id"); + ReduceSinkOperator rs2 = rsWithExprKey("id"); + + resolver.resolveJoinMetadata(joinOp(rs0, rs1, rs2)); + + assertEquals(3, resolver.getKeyNames().length); + assertEquals(3, resolver.getTableAliases().length); + } + + // helpers + + /** + * Build a mocked ReduceSinkOperator whose key list is a single ExprNodeColumnDesc. + *

+ * The RS parent is a generic operator that carries {@code inputSchema}. + * When {@link #attachTso} is later called on the RS, it places the TSO as the + * parent of that schema operator (not directly as parent of RS), so + * that both the schema and the upstream TSO remain reachable for traversal. + */ + private ReduceSinkOperator rsWithColumnKey(String internalColName, RowSchema inputSchema) { + ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class); + when(keyExpr.getColumn()).thenReturn(internalColName); + + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr)); + + // schemaParent is the direct parent of RS; it exposes the input schema and + // will have the TSO attached as its own parent by attachTso(). + @SuppressWarnings("unchecked") + Operator schemaParent = mock(Operator.class); + when(schemaParent.getSchema()).thenReturn(inputSchema); + when(schemaParent.getParentOperators()).thenReturn(Collections.emptyList()); + + @SuppressWarnings("unchecked") + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.singletonList(schemaParent)); + // store schemaParent so attachTso() can wire the TSO into it + schemaParents.put(rs, schemaParent); + return rs; + } + + /** + * Build a mocked ReduceSinkOperator with multiple key columns. + */ + private ReduceSinkOperator rsWithColumnKeys(List internalColNames, RowSchema inputSchema) { + List keyExprs = new java.util.ArrayList<>(); + for (String name : internalColNames) { + ExprNodeColumnDesc keyExpr = mock(ExprNodeColumnDesc.class); + when(keyExpr.getColumn()).thenReturn(name); + keyExprs.add(keyExpr); + } + + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(keyExprs); + + @SuppressWarnings("unchecked") + Operator schemaParent = mock(Operator.class); + when(schemaParent.getSchema()).thenReturn(inputSchema); + when(schemaParent.getParentOperators()).thenReturn(Collections.emptyList()); + + @SuppressWarnings("unchecked") + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.singletonList(schemaParent)); + schemaParents.put(rs, schemaParent); + return rs; + } + + /** + * Build a mocked ReduceSinkOperator whose key is a generic (non-column) expression. + */ + private ReduceSinkOperator rsWithExprKey(String exprString) { + ExprNodeDesc keyExpr = mock(ExprNodeDesc.class); + when(keyExpr.getExprString()).thenReturn(exprString); + + ReduceSinkDesc rsConf = mock(ReduceSinkDesc.class); + when(rsConf.getKeyCols()).thenReturn(Collections.singletonList(keyExpr)); + + @SuppressWarnings("unchecked") + ReduceSinkOperator rs = mock(ReduceSinkOperator.class); + when(rs.getConf()).thenReturn(rsConf); + when(rs.getParentOperators()).thenReturn(Collections.emptyList()); + return rs; + } + + /** + * Attach a mocked TableScanOperator into the upstream chain of {@code rs}. + *

+ * The resolver resolves the table name by calling + * {@code OperatorUtils.findSingleOperatorUpstream(parent, TableScanOperator.class)}, + * where {@code parent} is the direct parent of the join-input RS. + * That traversal starts at the RS, checks if it is a TSO, then recurses into + * {@code rs.getParentOperators()}. The RS's direct parent is the schema-carrying + * operator created by {@code rsWithColumnKey}; we wire the TSO as that + * operator's parent so the BFS finds it without disturbing the schema lookup. + *

+ * Chain: Join → RS → schemaParent → TSO + */ + @SuppressWarnings("unchecked") + private TableScanOperator attachTso(ReduceSinkOperator rs, String alias, String tableName) { + TableScanDesc tsoConf = mock(TableScanDesc.class); + when(tsoConf.getAlias()).thenReturn(alias); + when(tsoConf.getTableName()).thenReturn(tableName); + + TableScanOperator tso = mock(TableScanOperator.class); + when(tso.getConf()).thenReturn(tsoConf); + when(tso.getParentOperators()).thenReturn(Collections.emptyList()); + + // Wire TSO as parent of the schema-parent so findSingleOperatorUpstream finds it. + Operator schemaParent = schemaParents.get(rs); + if (schemaParent != null) { + when(schemaParent.getParentOperators()).thenReturn(Collections.singletonList(tso)); + } + return tso; + } + + /** + * Build a JoinOperator mock whose parent list is the supplied operators. + */ + @SuppressWarnings("unchecked") + private JoinOperator joinOp(Operator... parents) { + JoinOperator join = mock(JoinOperator.class); + when(join.getParentOperators()).thenReturn(Arrays.asList(parents)); + return join; + } +} + diff --git a/ql/src/test/queries/clientnegative/mergejoin_skew_abort.q b/ql/src/test/queries/clientnegative/mergejoin_skew_abort.q new file mode 100644 index 000000000000..10de1d8bd95d --- /dev/null +++ b/ql/src/test/queries/clientnegative/mergejoin_skew_abort.q @@ -0,0 +1,18 @@ +set hive.explain.user=false; +set hive.auto.convert.join=false; +-- merge join observability config, with true should throw exception after skew +-- join detected beyond the threshold +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=true; +-- interval=1 means check every row (most aggressive, catches skew at first boundary crossing) +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_abort_a (testColKey_A int, testColValue_A string); +CREATE TABLE merge_skew_abort_b (testColKey_B int, testColValue_B string); + +INSERT INTO TABLE merge_skew_abort_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'),(2, 'b1'); +INSERT INTO TABLE merge_skew_abort_b VALUES (1, 'x1'), (2, 'y1'); + +SELECT a.testColKey_A, a.testColValue_A, b.testColValue_B +FROM merge_skew_abort_a a JOIN merge_skew_abort_b b ON a.testColKey_A = b.testColKey_B; + diff --git a/ql/src/test/queries/clientnegative/mergejoin_skew_abort_compound_key.q b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_compound_key.q new file mode 100644 index 000000000000..1f0b4e6bc19d --- /dev/null +++ b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_compound_key.q @@ -0,0 +1,20 @@ +-- compound key join with skewed data, merge join should abort when skew threshold is exceeded +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=true; +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_compound_a (k1 int, k2 string, val string); +CREATE TABLE merge_skew_compound_b (k1 int, k2 string, val string); + +INSERT INTO TABLE merge_skew_compound_a VALUES + (1, 'x', 'a1'), (1, 'x', 'a2'), (1, 'x', 'a3'), (1, 'x', 'a4'), + (2, 'y', 'b1'); +INSERT INTO TABLE merge_skew_compound_b VALUES + (1, 'x', 'r1'), (2, 'y', 'r2'); + +SELECT a.k1, a.k2, a.val, b.val +FROM merge_skew_compound_a a JOIN merge_skew_compound_b b + ON a.k1 = b.k1 AND a.k2 = b.k2; + diff --git a/ql/src/test/queries/clientnegative/mergejoin_skew_abort_expr_key.q b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_expr_key.q new file mode 100644 index 000000000000..f857d0d1a4b0 --- /dev/null +++ b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_expr_key.q @@ -0,0 +1,25 @@ +-- Skew abort with a UDF expression as join key. +-- The skew message should show the expression string (e.g. "upper(val)") +-- rather than an internal column name, because resolveKeyColumnName() falls +-- through to ExprNodeDesc.getExprString() for non-column key expressions. +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=true; +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_expr_a (id int, val string); +CREATE TABLE merge_skew_expr_b (id int, val string); + +-- key UPPER('x') appears 4 times in table a -> triggers abort at threshold=2 +INSERT INTO TABLE merge_skew_expr_a VALUES + (1, 'x'), (2, 'x'), (3, 'x'), (4, 'x'), (5, 'y'); +INSERT INTO TABLE merge_skew_expr_b VALUES + (10, 'x'), (20, 'y'); + +SELECT a.id, b.id +FROM merge_skew_expr_a a JOIN merge_skew_expr_b b ON UPPER(a.val) = UPPER(b.val); + +DROP TABLE merge_skew_expr_a; +DROP TABLE merge_skew_expr_b; + diff --git a/ql/src/test/queries/clientnegative/mergejoin_skew_abort_three_tables_join.q b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_three_tables_join.q new file mode 100644 index 000000000000..50d48daba19d --- /dev/null +++ b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_three_tables_join.q @@ -0,0 +1,23 @@ +set hive.auto.convert.join=false; +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=true; +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_abort_3w_a (key int, val string); +CREATE TABLE merge_skew_abort_3w_b (key int, val string); +CREATE TABLE merge_skew_abort_3w_c (key int, val string); + +-- key=10 has 4 rows in table a -> abort threshold exceeded +INSERT INTO TABLE merge_skew_abort_3w_a VALUES (10, 's1'), (20, 's5'); +INSERT INTO TABLE merge_skew_abort_3w_b VALUES (10, 't1'), (20, 't2'); +INSERT INTO TABLE merge_skew_abort_3w_c VALUES (10, 'u1'), (20, 'u2'), (10, 's2'), (10, 's3'), (10, 's4'); + +SELECT count(*) +FROM merge_skew_abort_3w_a a + JOIN merge_skew_abort_3w_b b ON a.key = b.key + JOIN merge_skew_abort_3w_c c ON b.key = c.key; + +DROP TABLE merge_skew_abort_3w_a; +DROP TABLE merge_skew_abort_3w_b; +DROP TABLE merge_skew_abort_3w_c; + diff --git a/ql/src/test/queries/clientnegative/mergejoin_skew_abort_union_joins.q b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_union_joins.q new file mode 100644 index 000000000000..333356c082f8 --- /dev/null +++ b/ql/src/test/queries/clientnegative/mergejoin_skew_abort_union_joins.q @@ -0,0 +1,27 @@ +-- 2 joins in single query: first join has unique keys (no skew), second join has skew +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=true; +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_warn_2j_unique_a (key int, val string); +CREATE TABLE merge_skew_warn_2j_unique_b (key int, val string); +CREATE TABLE merge_skew_warn_2j_skew_a (key int, val string); +CREATE TABLE merge_skew_warn_2j_skew_b (key int, val string); + +-- unique side: 1-to-1 +INSERT INTO TABLE merge_skew_warn_2j_unique_a VALUES (10, 'u1'), (20, 'u2'), (30, 'u3'); +INSERT INTO TABLE merge_skew_warn_2j_unique_b VALUES (10, 'v1'), (20, 'v2'), (30, 'v3'); +-- skewed side: key=10 has 4 rows in skew_a -> abort the task +INSERT INTO TABLE merge_skew_warn_2j_skew_a VALUES (10, 's1'), (10, 's2'), (10, 's3'), (10, 's4'), (20, 's5'); +INSERT INTO TABLE merge_skew_warn_2j_skew_b VALUES (10, 't1'), (20, 't2'); + +SELECT count(*) +FROM merge_skew_warn_2j_unique_a ua JOIN merge_skew_warn_2j_unique_b ub ON ua.key = ub.key +UNION ALL +SELECT count(*) +FROM merge_skew_warn_2j_skew_a sa JOIN merge_skew_warn_2j_skew_b sb ON sa.key = sb.key; + +DROP TABLE merge_skew_warn_2j_unique_a; +DROP TABLE merge_skew_warn_2j_unique_b; +DROP TABLE merge_skew_warn_2j_skew_a; +DROP TABLE merge_skew_warn_2j_skew_b; diff --git a/ql/src/test/queries/clientpositive/mergejoin_skew_warn.q b/ql/src/test/queries/clientpositive/mergejoin_skew_warn.q new file mode 100644 index 000000000000..75a28d7ac8d3 --- /dev/null +++ b/ql/src/test/queries/clientpositive/mergejoin_skew_warn.q @@ -0,0 +1,117 @@ +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=false; +set hive.merge.join.skew.check.interval=1; + +-- SORT_QUERY_RESULTS + +CREATE TABLE merge_skew_warn_a (key int, value string); +CREATE TABLE merge_skew_warn_b (key int, value string); + +INSERT INTO TABLE merge_skew_warn_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), +(2, 'b1'), (3, 'c1'); +INSERT INTO TABLE merge_skew_warn_b VALUES (1, 'x1'), (2, 'y1'), (3, 'z1'); + +EXPLAIN +SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key; + +SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key; + +SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key; + +-- no warning run +set hive.merge.join.skew.threshold=-1; + +SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key; + +-- interval test: threshold=2, interval=3 -- skew key (key=1 has 4 rows) must still be detected +-- even though it may not be evaluated on every row +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.check.interval=3; + +SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key; + +-- unique-mapping test: 1-to-1 join between tables with unique keys should never trip threshold +-- even with a low threshold of 2. Each key appears only once, so no skew. +set hive.merge.join.skew.abort=true; + +CREATE TABLE merge_skew_warn_unique_a (key int, value string); +CREATE TABLE merge_skew_warn_unique_b (key int, value string); + +INSERT INTO TABLE merge_skew_warn_unique_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3'); +INSERT INTO TABLE merge_skew_warn_unique_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3'); + +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.check.interval=1; + +-- must complete without abort: every key appears exactly once on both sides +SELECT count(*) FROM merge_skew_warn_unique_a a JOIN merge_skew_warn_unique_b b ON a.key = b.key; + +DROP TABLE merge_skew_warn_unique_a; +DROP TABLE merge_skew_warn_unique_b; + +DROP TABLE merge_skew_warn_a; +DROP TABLE merge_skew_warn_b; + +-- three table join + +set hive.explain.user=false; +set hive.auto.convert.join=false; +set hive.merge.join.skew.threshold=2; +set hive.merge.join.skew.abort=false; +set hive.merge.join.skew.check.interval=1; + +CREATE TABLE merge_skew_warn_3w_a (key int, val string); +CREATE TABLE merge_skew_warn_3w_b (key int, val string); +CREATE TABLE merge_skew_warn_3w_c (key int, val string); + +-- key=1 has 4 rows in table a -> triggers skew warning +INSERT INTO TABLE merge_skew_warn_3w_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), (2, 'a5'), (3, 'a6'); +INSERT INTO TABLE merge_skew_warn_3w_b VALUES (1, 'b1'), (2, 'b2'), (3, 'b3'); +INSERT INTO TABLE merge_skew_warn_3w_c VALUES (1, 'c1'), (2, 'c2'), (3, 'c3'); + +EXPLAIN +SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key; + +-- Should complete with a skew warning for key=1 +SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key; + +SELECT count(*) +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key; + +-- unique-keys run: no skew expected even with abort=true +set hive.merge.join.skew.abort=true; + +CREATE TABLE merge_skew_warn_3w_uniq_a (key int, val string); +CREATE TABLE merge_skew_warn_3w_uniq_b (key int, val string); +CREATE TABLE merge_skew_warn_3w_uniq_c (key int, val string); + +INSERT INTO TABLE merge_skew_warn_3w_uniq_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3'); +INSERT INTO TABLE merge_skew_warn_3w_uniq_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3'); +INSERT INTO TABLE merge_skew_warn_3w_uniq_c VALUES (1, 'w1'), (2, 'w2'), (3, 'w3'); + +-- must complete without abort: every key appears exactly once in all three tables +SELECT count(*) +FROM merge_skew_warn_3w_uniq_a a + JOIN merge_skew_warn_3w_uniq_b b ON a.key = b.key + JOIN merge_skew_warn_3w_uniq_c c ON b.key = c.key; + +DROP TABLE merge_skew_warn_3w_uniq_a; +DROP TABLE merge_skew_warn_3w_uniq_b; +DROP TABLE merge_skew_warn_3w_uniq_c; + +DROP TABLE merge_skew_warn_3w_a; +DROP TABLE merge_skew_warn_3w_b; +DROP TABLE merge_skew_warn_3w_c; + diff --git a/ql/src/test/results/clientnegative/mergejoin_skew_abort.q.out b/ql/src/test/results/clientnegative/mergejoin_skew_abort.q.out new file mode 100644 index 000000000000..9284b39bcbe4 --- /dev/null +++ b/ql/src/test/results/clientnegative/mergejoin_skew_abort.q.out @@ -0,0 +1,70 @@ +PREHOOK: query: CREATE TABLE merge_skew_abort_a (testColKey_A int, testColValue_A string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_abort_a +POSTHOOK: query: CREATE TABLE merge_skew_abort_a (testColKey_A int, testColValue_A string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_abort_a +PREHOOK: query: CREATE TABLE merge_skew_abort_b (testColKey_B int, testColValue_B string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_abort_b +POSTHOOK: query: CREATE TABLE merge_skew_abort_b (testColKey_B int, testColValue_B string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_abort_b +PREHOOK: query: INSERT INTO TABLE merge_skew_abort_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'),(2, 'b1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_abort_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_abort_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'),(2, 'b1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_abort_a +POSTHOOK: Lineage: merge_skew_abort_a.testcolkey_a SCRIPT [] +POSTHOOK: Lineage: merge_skew_abort_a.testcolvalue_a SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_abort_b VALUES (1, 'x1'), (2, 'y1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_abort_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_abort_b VALUES (1, 'x1'), (2, 'y1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_abort_b +POSTHOOK: Lineage: merge_skew_abort_b.testcolkey_b SCRIPT [] +POSTHOOK: Lineage: merge_skew_abort_b.testcolvalue_b SCRIPT [] +PREHOOK: query: SELECT a.testColKey_A, a.testColValue_A, b.testColValue_B +FROM merge_skew_abort_a a JOIN merge_skew_abort_b b ON a.testColKey_A = b.testColKey_B +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_abort_a +PREHOOK: Input: default@merge_skew_abort_b +#### A masked pattern was here #### +Status: Failed +Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [testcolkey_a] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [testcolkey_a] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE] +DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [testcolkey_a] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [testcolkey_a] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 diff --git a/ql/src/test/results/clientnegative/mergejoin_skew_abort_compound_key.q.out b/ql/src/test/results/clientnegative/mergejoin_skew_abort_compound_key.q.out new file mode 100644 index 000000000000..e9ff48422127 --- /dev/null +++ b/ql/src/test/results/clientnegative/mergejoin_skew_abort_compound_key.q.out @@ -0,0 +1,79 @@ +PREHOOK: query: CREATE TABLE merge_skew_compound_a (k1 int, k2 string, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_compound_a +POSTHOOK: query: CREATE TABLE merge_skew_compound_a (k1 int, k2 string, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_compound_a +PREHOOK: query: CREATE TABLE merge_skew_compound_b (k1 int, k2 string, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_compound_b +POSTHOOK: query: CREATE TABLE merge_skew_compound_b (k1 int, k2 string, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_compound_b +PREHOOK: query: INSERT INTO TABLE merge_skew_compound_a VALUES + (1, 'x', 'a1'), (1, 'x', 'a2'), (1, 'x', 'a3'), (1, 'x', 'a4'), + (2, 'y', 'b1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_compound_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_compound_a VALUES + (1, 'x', 'a1'), (1, 'x', 'a2'), (1, 'x', 'a3'), (1, 'x', 'a4'), + (2, 'y', 'b1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_compound_a +POSTHOOK: Lineage: merge_skew_compound_a.k1 SCRIPT [] +POSTHOOK: Lineage: merge_skew_compound_a.k2 SCRIPT [] +POSTHOOK: Lineage: merge_skew_compound_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_compound_b VALUES + (1, 'x', 'r1'), (2, 'y', 'r2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_compound_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_compound_b VALUES + (1, 'x', 'r1'), (2, 'y', 'r2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_compound_b +POSTHOOK: Lineage: merge_skew_compound_b.k1 SCRIPT [] +POSTHOOK: Lineage: merge_skew_compound_b.k2 SCRIPT [] +POSTHOOK: Lineage: merge_skew_compound_b.val SCRIPT [] +PREHOOK: query: SELECT a.k1, a.k2, a.val, b.val +FROM merge_skew_compound_a a JOIN merge_skew_compound_b b + ON a.k1 = b.k1 AND a.k2 = b.k2 +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_compound_a +PREHOOK: Input: default@merge_skew_compound_b +#### A masked pattern was here #### +Status: Failed +Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [k1, k2] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [k1, k2] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE] +DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [k1, k2] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [k1, k2] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 diff --git a/ql/src/test/results/clientnegative/mergejoin_skew_abort_expr_key.q.out b/ql/src/test/results/clientnegative/mergejoin_skew_abort_expr_key.q.out new file mode 100644 index 000000000000..bf365d53921d --- /dev/null +++ b/ql/src/test/results/clientnegative/mergejoin_skew_abort_expr_key.q.out @@ -0,0 +1,74 @@ +PREHOOK: query: CREATE TABLE merge_skew_expr_a (id int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_expr_a +POSTHOOK: query: CREATE TABLE merge_skew_expr_a (id int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_expr_a +PREHOOK: query: CREATE TABLE merge_skew_expr_b (id int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_expr_b +POSTHOOK: query: CREATE TABLE merge_skew_expr_b (id int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_expr_b +PREHOOK: query: INSERT INTO TABLE merge_skew_expr_a VALUES + (1, 'x'), (2, 'x'), (3, 'x'), (4, 'x'), (5, 'y') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_expr_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_expr_a VALUES + (1, 'x'), (2, 'x'), (3, 'x'), (4, 'x'), (5, 'y') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_expr_a +POSTHOOK: Lineage: merge_skew_expr_a.id SCRIPT [] +POSTHOOK: Lineage: merge_skew_expr_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_expr_b VALUES + (10, 'x'), (20, 'y') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_expr_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_expr_b VALUES + (10, 'x'), (20, 'y') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_expr_b +POSTHOOK: Lineage: merge_skew_expr_b.id SCRIPT [] +POSTHOOK: Lineage: merge_skew_expr_b.val SCRIPT [] +PREHOOK: query: SELECT a.id, b.id +FROM merge_skew_expr_a a JOIN merge_skew_expr_b b ON UPPER(a.val) = UPPER(b.val) +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_expr_a +PREHOOK: Input: default@merge_skew_expr_b +#### A masked pattern was here #### +Status: Failed +Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [expr$0] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [expr$0] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE] +DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Reducer 2, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [expr$0] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [expr$0] in table alias [a]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:0 diff --git a/ql/src/test/results/clientnegative/mergejoin_skew_abort_three_tables_join.q.out b/ql/src/test/results/clientnegative/mergejoin_skew_abort_three_tables_join.q.out new file mode 100644 index 000000000000..88d34d190868 --- /dev/null +++ b/ql/src/test/results/clientnegative/mergejoin_skew_abort_three_tables_join.q.out @@ -0,0 +1,92 @@ +PREHOOK: query: CREATE TABLE merge_skew_abort_3w_a (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_abort_3w_a +POSTHOOK: query: CREATE TABLE merge_skew_abort_3w_a (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_abort_3w_a +PREHOOK: query: CREATE TABLE merge_skew_abort_3w_b (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_abort_3w_b +POSTHOOK: query: CREATE TABLE merge_skew_abort_3w_b (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_abort_3w_b +PREHOOK: query: CREATE TABLE merge_skew_abort_3w_c (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_abort_3w_c +POSTHOOK: query: CREATE TABLE merge_skew_abort_3w_c (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_abort_3w_c +PREHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_a VALUES (10, 's1'), (20, 's5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_abort_3w_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_a VALUES (10, 's1'), (20, 's5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_abort_3w_a +POSTHOOK: Lineage: merge_skew_abort_3w_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_abort_3w_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_b VALUES (10, 't1'), (20, 't2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_abort_3w_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_b VALUES (10, 't1'), (20, 't2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_abort_3w_b +POSTHOOK: Lineage: merge_skew_abort_3w_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_abort_3w_b.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_c VALUES (10, 'u1'), (20, 'u2'), (10, 's2'), (10, 's3'), (10, 's4') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_abort_3w_c +POSTHOOK: query: INSERT INTO TABLE merge_skew_abort_3w_c VALUES (10, 'u1'), (20, 'u2'), (10, 's2'), (10, 's3'), (10, 's4') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_abort_3w_c +POSTHOOK: Lineage: merge_skew_abort_3w_c.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_abort_3w_c.val SCRIPT [] +PREHOOK: query: SELECT count(*) +FROM merge_skew_abort_3w_a a + JOIN merge_skew_abort_3w_b b ON a.key = b.key + JOIN merge_skew_abort_3w_c c ON b.key = c.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_abort_3w_a +PREHOOK: Input: default@merge_skew_abort_3w_b +PREHOOK: Input: default@merge_skew_abort_3w_c +#### A masked pattern was here #### +Status: Failed +Vertex failed, vertexName=Reducer 3, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [c]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [c]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 3] killed/failed due to:OWN_TASK_FAILURE] +[Masked Vertex killed due to OTHER_VERTEX_FAILURE] +DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1 +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Reducer 3, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [c]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [c]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 3] killed/failed due to:OWN_TASK_FAILURE][Masked Vertex killed due to OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1 diff --git a/ql/src/test/results/clientnegative/mergejoin_skew_abort_union_joins.q.out b/ql/src/test/results/clientnegative/mergejoin_skew_abort_union_joins.q.out new file mode 100644 index 000000000000..7b39824e8b65 --- /dev/null +++ b/ql/src/test/results/clientnegative/mergejoin_skew_abort_union_joins.q.out @@ -0,0 +1,112 @@ +PREHOOK: query: CREATE TABLE merge_skew_warn_2j_unique_a (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_2j_unique_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_2j_unique_a (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_2j_unique_a +PREHOOK: query: CREATE TABLE merge_skew_warn_2j_unique_b (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_2j_unique_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_2j_unique_b (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_2j_unique_b +PREHOOK: query: CREATE TABLE merge_skew_warn_2j_skew_a (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_2j_skew_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_2j_skew_a (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_2j_skew_a +PREHOOK: query: CREATE TABLE merge_skew_warn_2j_skew_b (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_2j_skew_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_2j_skew_b (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_2j_skew_b +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_unique_a VALUES (10, 'u1'), (20, 'u2'), (30, 'u3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_2j_unique_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_unique_a VALUES (10, 'u1'), (20, 'u2'), (30, 'u3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_2j_unique_a +POSTHOOK: Lineage: merge_skew_warn_2j_unique_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_2j_unique_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_unique_b VALUES (10, 'v1'), (20, 'v2'), (30, 'v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_2j_unique_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_unique_b VALUES (10, 'v1'), (20, 'v2'), (30, 'v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_2j_unique_b +POSTHOOK: Lineage: merge_skew_warn_2j_unique_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_2j_unique_b.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_skew_a VALUES (10, 's1'), (10, 's2'), (10, 's3'), (10, 's4'), (20, 's5') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_2j_skew_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_skew_a VALUES (10, 's1'), (10, 's2'), (10, 's3'), (10, 's4'), (20, 's5') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_2j_skew_a +POSTHOOK: Lineage: merge_skew_warn_2j_skew_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_2j_skew_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_skew_b VALUES (10, 't1'), (20, 't2') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_2j_skew_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_2j_skew_b VALUES (10, 't1'), (20, 't2') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_2j_skew_b +POSTHOOK: Lineage: merge_skew_warn_2j_skew_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_2j_skew_b.val SCRIPT [] +PREHOOK: query: SELECT count(*) +FROM merge_skew_warn_2j_unique_a ua JOIN merge_skew_warn_2j_unique_b ub ON ua.key = ub.key +UNION ALL +SELECT count(*) +FROM merge_skew_warn_2j_skew_a sa JOIN merge_skew_warn_2j_skew_b sb ON sa.key = sb.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_2j_skew_a +PREHOOK: Input: default@merge_skew_warn_2j_skew_b +PREHOOK: Input: default@merge_skew_warn_2j_unique_a +PREHOOK: Input: default@merge_skew_warn_2j_unique_b +#### A masked pattern was here #### +Status: Failed +Vertex failed, vertexName=Reducer 7, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [sa]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [sa]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 7] killed/failed due to:OWN_TASK_FAILURE] +[Masked Vertex killed due to OTHER_VERTEX_FAILURE] +DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:#Masked# +FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask. Vertex failed, vertexName=Reducer 7, vertexId=vertex_#ID#, diagnostics=[Task failed, taskId=task_#ID#, diagnostics=[TaskAttempt 0 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [sa]. Consider reviewing data distribution. +#### A masked pattern was here #### +], TaskAttempt 1 failed, info=[Error: Error while running task ( failure ) : attempt_#ID#:java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row +#### A masked pattern was here #### +Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: Data skew detected in merge join: 2 rows accumulated for join column(s) [key] in table alias [sa]. Consider reviewing data distribution. +#### A masked pattern was here #### +]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:0, Vertex vertex_#ID# [Reducer 7] killed/failed due to:OWN_TASK_FAILURE][Masked Vertex killed due to OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:#Masked# diff --git a/ql/src/test/results/clientpositive/llap/mergejoin_skew_warn.q.out b/ql/src/test/results/clientpositive/llap/mergejoin_skew_warn.q.out new file mode 100644 index 000000000000..ae72d6af6a89 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/mergejoin_skew_warn.q.out @@ -0,0 +1,651 @@ +PREHOOK: query: CREATE TABLE merge_skew_warn_a (key int, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_a (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_a +PREHOOK: query: CREATE TABLE merge_skew_warn_b (key int, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_b (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_b +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), +(2, 'b1'), (3, 'c1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), +(2, 'b1'), (3, 'c1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_a +POSTHOOK: Lineage: merge_skew_warn_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_a.value SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_b VALUES (1, 'x1'), (2, 'y1'), (3, 'z1') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_b VALUES (1, 'x1'), (2, 'y1'), (3, 'z1') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_b +POSTHOOK: Lineage: merge_skew_warn_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_b.value SCRIPT [] +PREHOOK: query: EXPLAIN +SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN +SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 3 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: a + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 3 + Map Operator Tree: + TableScan + alias: b + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: int), value (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 6 Data size: 1056 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col0 (type: int), _col1 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2 + Statistics: Num rows: 6 Data size: 1056 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 6 Data size: 1056 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +POSTHOOK: query: SELECT a.key, a.value, b.value +FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +1 a1 x1 +1 a2 x1 +1 a3 x1 +1 a4 x1 +2 b1 y1 +3 c1 z1 +PREHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +6 +PREHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +6 +PREHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM merge_skew_warn_a a JOIN merge_skew_warn_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Input: default@merge_skew_warn_b +#### A masked pattern was here #### +6 +PREHOOK: query: CREATE TABLE merge_skew_warn_unique_a (key int, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_unique_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_unique_a (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_unique_a +PREHOOK: query: CREATE TABLE merge_skew_warn_unique_b (key int, value string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_unique_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_unique_b (key int, value string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_unique_b +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_unique_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_unique_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_unique_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_unique_a +POSTHOOK: Lineage: merge_skew_warn_unique_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_unique_a.value SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_unique_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_unique_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_unique_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_unique_b +POSTHOOK: Lineage: merge_skew_warn_unique_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_unique_b.value SCRIPT [] +PREHOOK: query: SELECT count(*) FROM merge_skew_warn_unique_a a JOIN merge_skew_warn_unique_b b ON a.key = b.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_unique_a +PREHOOK: Input: default@merge_skew_warn_unique_b +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) FROM merge_skew_warn_unique_a a JOIN merge_skew_warn_unique_b b ON a.key = b.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_unique_a +POSTHOOK: Input: default@merge_skew_warn_unique_b +#### A masked pattern was here #### +3 +PREHOOK: query: DROP TABLE merge_skew_warn_unique_a +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_unique_a +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_unique_a +POSTHOOK: query: DROP TABLE merge_skew_warn_unique_a +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_unique_a +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_unique_a +PREHOOK: query: DROP TABLE merge_skew_warn_unique_b +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_unique_b +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_unique_b +POSTHOOK: query: DROP TABLE merge_skew_warn_unique_b +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_unique_b +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_unique_b +PREHOOK: query: DROP TABLE merge_skew_warn_a +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_a +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_a +POSTHOOK: query: DROP TABLE merge_skew_warn_a +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_a +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_a +PREHOOK: query: DROP TABLE merge_skew_warn_b +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_b +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_b +POSTHOOK: query: DROP TABLE merge_skew_warn_b +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_b +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_b +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_a (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_a (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_a +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_b (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_b (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_b +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_c (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_c +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_c (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_c +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), (2, 'a5'), (3, 'a6') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_a VALUES (1, 'a1'), (1, 'a2'), (1, 'a3'), (1, 'a4'), (2, 'a5'), (3, 'a6') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_a +POSTHOOK: Lineage: merge_skew_warn_3w_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_b VALUES (1, 'b1'), (2, 'b2'), (3, 'b3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_b VALUES (1, 'b1'), (2, 'b2'), (3, 'b3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_b +POSTHOOK: Lineage: merge_skew_warn_3w_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_b.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_c VALUES (1, 'c1'), (2, 'c2'), (3, 'c3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_c +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_c VALUES (1, 'c1'), (2, 'c2'), (3, 'c3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_c +POSTHOOK: Lineage: merge_skew_warn_3w_c.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_c.val SCRIPT [] +PREHOOK: query: EXPLAIN +SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_3w_a +PREHOOK: Input: default@merge_skew_warn_3w_b +PREHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +POSTHOOK: query: EXPLAIN +SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_3w_a +POSTHOOK: Input: default@merge_skew_warn_3w_b +POSTHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +STAGE DEPENDENCIES: + Stage-1 is a root stage + Stage-0 depends on stages: Stage-1 + +STAGE PLANS: + Stage: Stage-1 + Tez +#### A masked pattern was here #### + Edges: + Reducer 2 <- Map 1 (SIMPLE_EDGE), Map 4 (SIMPLE_EDGE) + Reducer 3 <- Map 5 (SIMPLE_EDGE), Reducer 2 (SIMPLE_EDGE) +#### A masked pattern was here #### + Vertices: + Map 1 + Map Operator Tree: + TableScan + alias: b + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: int), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 4 + Map Operator Tree: + TableScan + alias: c + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: int), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 3 Data size: 270 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Map 5 + Map Operator Tree: + TableScan + alias: a + filterExpr: key is not null (type: boolean) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Filter Operator + predicate: key is not null (type: boolean) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: key (type: int), val (type: string) + outputColumnNames: _col0, _col1 + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 6 Data size: 540 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string) + Execution mode: vectorized, llap + LLAP IO: all inputs + Reducer 2 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col0, _col1, _col3 + Statistics: Num rows: 3 Data size: 528 Basic stats: COMPLETE Column stats: COMPLETE + Reduce Output Operator + key expressions: _col0 (type: int) + null sort order: z + sort order: + + Map-reduce partition columns: _col0 (type: int) + Statistics: Num rows: 3 Data size: 528 Basic stats: COMPLETE Column stats: COMPLETE + value expressions: _col1 (type: string), _col3 (type: string) + Reducer 3 + Execution mode: llap + Reduce Operator Tree: + Merge Join Operator + condition map: + Inner Join 0 to 1 + keys: + 0 _col0 (type: int) + 1 _col0 (type: int) + outputColumnNames: _col1, _col3, _col4, _col5 + Statistics: Num rows: 6 Data size: 1572 Basic stats: COMPLETE Column stats: COMPLETE + Select Operator + expressions: _col4 (type: int), _col5 (type: string), _col1 (type: string), _col3 (type: string) + outputColumnNames: _col0, _col1, _col2, _col3 + Statistics: Num rows: 6 Data size: 1572 Basic stats: COMPLETE Column stats: COMPLETE + File Output Operator + compressed: false + Statistics: Num rows: 6 Data size: 1572 Basic stats: COMPLETE Column stats: COMPLETE + table: + input format: org.apache.hadoop.mapred.SequenceFileInputFormat + output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat + serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe + + Stage: Stage-0 + Fetch Operator + limit: -1 + Processor Tree: + ListSink + +PREHOOK: query: SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_3w_a +PREHOOK: Input: default@merge_skew_warn_3w_b +PREHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +POSTHOOK: query: SELECT a.key, a.val, b.val, c.val +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_3w_a +POSTHOOK: Input: default@merge_skew_warn_3w_b +POSTHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +1 a1 b1 c1 +1 a2 b1 c1 +1 a3 b1 c1 +1 a4 b1 c1 +2 a5 b2 c2 +3 a6 b3 c3 +PREHOOK: query: SELECT count(*) +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_3w_a +PREHOOK: Input: default@merge_skew_warn_3w_b +PREHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) +FROM merge_skew_warn_3w_a a + JOIN merge_skew_warn_3w_b b ON a.key = b.key + JOIN merge_skew_warn_3w_c c ON b.key = c.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_3w_a +POSTHOOK: Input: default@merge_skew_warn_3w_b +POSTHOOK: Input: default@merge_skew_warn_3w_c +#### A masked pattern was here #### +6 +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_a (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_a +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_a (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_a +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_b (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_b +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_b (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_b +PREHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_c (key int, val string) +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_c +POSTHOOK: query: CREATE TABLE merge_skew_warn_3w_uniq_c (key int, val string) +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_c +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_uniq_a +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_a VALUES (1, 'u1'), (2, 'u2'), (3, 'u3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_a +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_a.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_a.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_uniq_b +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_b VALUES (1, 'v1'), (2, 'v2'), (3, 'v3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_b +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_b.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_b.val SCRIPT [] +PREHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_c VALUES (1, 'w1'), (2, 'w2'), (3, 'w3') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@merge_skew_warn_3w_uniq_c +POSTHOOK: query: INSERT INTO TABLE merge_skew_warn_3w_uniq_c VALUES (1, 'w1'), (2, 'w2'), (3, 'w3') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_c +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_c.key SCRIPT [] +POSTHOOK: Lineage: merge_skew_warn_3w_uniq_c.val SCRIPT [] +PREHOOK: query: SELECT count(*) +FROM merge_skew_warn_3w_uniq_a a + JOIN merge_skew_warn_3w_uniq_b b ON a.key = b.key + JOIN merge_skew_warn_3w_uniq_c c ON b.key = c.key +PREHOOK: type: QUERY +PREHOOK: Input: default@merge_skew_warn_3w_uniq_a +PREHOOK: Input: default@merge_skew_warn_3w_uniq_b +PREHOOK: Input: default@merge_skew_warn_3w_uniq_c +#### A masked pattern was here #### +POSTHOOK: query: SELECT count(*) +FROM merge_skew_warn_3w_uniq_a a + JOIN merge_skew_warn_3w_uniq_b b ON a.key = b.key + JOIN merge_skew_warn_3w_uniq_c c ON b.key = c.key +POSTHOOK: type: QUERY +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_a +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_b +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_c +#### A masked pattern was here #### +3 +PREHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_a +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_uniq_a +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_a +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_a +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_a +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_a +PREHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_b +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_uniq_b +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_b +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_b +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_b +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_b +PREHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_c +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_uniq_c +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_uniq_c +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_uniq_c +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_uniq_c +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_uniq_c +PREHOOK: query: DROP TABLE merge_skew_warn_3w_a +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_a +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_a +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_a +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_a +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_a +PREHOOK: query: DROP TABLE merge_skew_warn_3w_b +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_b +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_b +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_b +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_b +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_b +PREHOOK: query: DROP TABLE merge_skew_warn_3w_c +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@merge_skew_warn_3w_c +PREHOOK: Output: database:default +PREHOOK: Output: default@merge_skew_warn_3w_c +POSTHOOK: query: DROP TABLE merge_skew_warn_3w_c +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@merge_skew_warn_3w_c +POSTHOOK: Output: database:default +POSTHOOK: Output: default@merge_skew_warn_3w_c