Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -5076,6 +5077,59 @@
vectorizedPTFMaxMemoryBufferingBatchCount);
}

/**
* Reorders partitionColumnMap and partitionColumnVectorTypes in-place so that projected
* partition-only columns come first in SELECT output order. Any partition columns not found in
* the output are appended at the end in their original plan order.
*/
private static void reorderPartitionColumnsToMatchOutputOrder(List<ColumnInfo> outputSignature,
int evaluatorCount, int[] outputColumnProjectionMap, int[] orderColumnMap,
ExprNodeDesc[] partitionExprNodeDescs, int[] partitionColumnMap,
Type[] partitionColumnVectorTypes) {
final int count = partitionColumnMap.length;
final int[] orderedMap = new int[count];
final Type[] orderedTypes = new Type[count];
final boolean[] placed = new boolean[count];

int idx = 0;
final int outputSize = outputSignature.size();

for (int outputIdx = evaluatorCount; outputIdx < outputSize && idx < count; outputIdx++) {
final int outputColumn = outputColumnProjectionMap[outputIdx];
final String colName = outputSignature.get(outputIdx).getInternalName();
int matchedPartitionIdx = IntStream.range(0, count)
.filter(p -> !placed[p])
.filter(p -> partitionExprNodeDescs[p] instanceof ExprNodeColumnDesc colDesc &&
colDesc.getColumn().equals(colName))
.findFirst()
.orElse(-1);

if (matchedPartitionIdx == -1) {
matchedPartitionIdx = IntStream.range(0, count)
.filter(p -> !placed[p] && partitionColumnMap[p] == outputColumn)
.findFirst()
.orElse(-1);
}

if (matchedPartitionIdx != -1 && !ArrayUtils.contains(orderColumnMap, outputColumn)) {
orderedMap[idx] = outputColumn;
orderedTypes[idx] = partitionColumnVectorTypes[matchedPartitionIdx];
placed[matchedPartitionIdx] = true;
idx++;
}
}

for (int p = 0; p < count; p++) {
if (!placed[p]) {
orderedMap[idx] = partitionColumnMap[p];
orderedTypes[idx] = partitionColumnVectorTypes[p];
idx++;
}
}
System.arraycopy(orderedMap, 0, partitionColumnMap, 0, count);
System.arraycopy(orderedTypes, 0, partitionColumnVectorTypes, 0, count);
}

private static void determineKeyAndNonKeyInputColumnMap(int[] outputColumnProjectionMap,
boolean isPartitionOrderBy, int[] orderColumnMap, int[] partitionColumnMap,
int evaluatorCount, ArrayList<Integer> keyInputColumns,
Expand Down Expand Up @@ -5113,7 +5167,7 @@
* Create the additional vectorization PTF information needed by the VectorPTFOperator during
* execution.
*/
private static VectorPTFInfo createVectorPTFInfo(Operator<? extends OperatorDesc> ptfOp,

Check failure on line 5170 in ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 17 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ5qZJevokFn1UQ2ggY6&open=AZ5qZJevokFn1UQ2ggY6&pullRequest=6512
PTFDesc ptfDesc, VectorizationContext vContext, VectorPTFDesc vectorPTFDesc)
throws HiveException {

Expand Down Expand Up @@ -5190,6 +5244,11 @@
int[] keyInputColumnMap = ArrayUtils.toPrimitive(keyInputColumns.toArray(new Integer[0]));
int[] nonKeyInputColumnMap = ArrayUtils.toPrimitive(nonKeyInputColumns.toArray(new Integer[0]));

if (isPartitionOrderBy && partitionKeyCount > 1) {
reorderPartitionColumnsToMatchOutputOrder(outputSignature, evaluatorCount, outputColumnProjectionMap,
orderColumnMap, partitionExprNodeDescs, partitionColumnMap, partitionColumnVectorTypes);
}

VectorExpression[][] evaluatorInputExpressions = new VectorExpression[evaluatorCount][];
Type[][] evaluatorInputColumnVectorTypes = new Type[evaluatorCount][];
for (int i = 0; i < evaluatorCount; i++) {
Expand Down
155 changes: 155 additions & 0 deletions ql/src/test/queries/clientpositive/vector_ptf_cols_order.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
SET hive.vectorized.execution.enabled=true;
create table web_sales_txt
(
ws_sold_date_sk int,
ws_sold_time_sk int,
ws_ship_date_sk int,
ws_item_sk int,
ws_bill_customer_sk int,
ws_bill_cdemo_sk int,
ws_bill_hdemo_sk int,
ws_bill_addr_sk int,
ws_ship_customer_sk int,
ws_ship_cdemo_sk int,
ws_ship_hdemo_sk int,
ws_ship_addr_sk int,
ws_web_page_sk int,
ws_web_site_sk int,
ws_ship_mode_sk int,
ws_warehouse_sk int,
ws_promo_sk int,
ws_order_number int,
ws_quantity int,
ws_wholesale_cost decimal(7,2),
ws_list_price decimal(7,2),
ws_sales_price decimal(7,2),
ws_ext_discount_amt decimal(7,2),
ws_ext_sales_price decimal(7,2),
ws_ext_wholesale_cost decimal(7,2),
ws_ext_list_price decimal(7,2),
ws_ext_tax decimal(7,2),
ws_coupon_amt decimal(7,2),
ws_ext_ship_cost decimal(7,2),
ws_net_paid decimal(7,2),
ws_net_paid_inc_tax decimal(7,2),
ws_net_paid_inc_ship decimal(7,2),
ws_net_paid_inc_ship_tax decimal(7,2),
ws_net_profit decimal(7,2)
)
row format delimited fields terminated by '|'
stored as textfile;

LOAD DATA LOCAL INPATH '../../data/files/web_sales_2k' OVERWRITE INTO TABLE web_sales_txt;

-- Baseline query to verify data load.
select ws_bill_customer_sk, ws_item_sk from web_sales_txt;

-- Vectorized LAG: verify output column order follows the SELECT list
-- (ws_bill_customer_sk, ws_item_sk), not the PARTITION BY order, when window
-- functions use different PARTITION BY column orderings.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LAG(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS prev_sales_price,
ws_sales_price - LAG(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) AS sales_price_diff
FROM
web_sales_txt;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments explaining what test case is being tested before this and other SELECT queries in this .q file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, comments would be essential to explain this Qtest. Added comments to explain all the SELECT queries being run in the Qtest in commit: a5e3bfc


-- Vectorized LEAD: same column-ordering check using LEAD window functions.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS next_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) - ws_sales_price AS sales_price_diff
FROM
web_sales_txt;

-- Vectorized FIRST_VALUE/LAST_VALUE: same column-ordering check using FIRST_VALUE and
-- LAST_VALUE window functions with differing PARTITION BY orderings.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
FIRST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS first_price,
LAST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_price
FROM
web_sales_txt;

SET hive.vectorized.execution.enabled=false;

-- Non-vectorized LAG: reference results for validating vectorized LAG query execution above.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LAG(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS prev_sales_price,
ws_sales_price - LAG(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) AS sales_price_diff
FROM
web_sales_txt;

-- Non-vectorized LEAD: reference results for validating vectorized LEAD query execution above.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS next_sales_price,
LEAD(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
) - ws_sales_price AS sales_price_diff
FROM
web_sales_txt;

-- Non-vectorized FIRST_VALUE/LAST_VALUE: reference results for validating vectorized
-- FIRST_VALUE/LAST_VALUE query execution above.
SELECT
ws_bill_customer_sk,
ws_item_sk,
ws_sold_date_sk,
ws_sales_price,
FIRST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_item_sk, ws_bill_customer_sk
ORDER BY ws_sold_date_sk
) AS first_price,
LAST_VALUE(ws_sales_price) OVER (
PARTITION BY ws_bill_customer_sk, ws_item_sk
ORDER BY ws_sold_date_sk
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS last_price
FROM
web_sales_txt;
Loading
Loading