Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import com.linkedin.openhouse.tables.common.DefaultColumnPattern;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -36,6 +41,10 @@ public boolean validate(
CreateUpdateTableRequestBody createUpdateTableRequestBody, TableUri tableUri) {
Retention retention = createUpdateTableRequestBody.getPolicies().getRetention();
TimePartitionSpec timePartitioning = createUpdateTableRequestBody.getTimePartitioning();
List<ClusteringColumn> clustering =
createUpdateTableRequestBody.getClustering() == null
? Collections.emptyList()
: createUpdateTableRequestBody.getClustering();
String schema = createUpdateTableRequestBody.getSchema();

if (retention != null) {
Expand Down Expand Up @@ -82,11 +91,39 @@ public boolean validate(
tableUri);
return false;
}
// Retention DELETE must be satisfiable by partition pruning. When the table is not
// time-partitioned, the retention column must be one of the clustering columns;
// otherwise the retention job would degrade into a row-level rewrite (or fail outright
// when backup is enabled).
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add why it fails backup and the impact of a failing backup

if (timePartitioning == null
&& retention.getColumnPattern() != null
&& !isClusteringColumn(retention.getColumnPattern().getColumnName(), clustering)) {
failureMessage =
String.format(
"Retention column[%s] for table %s must be one of the clustering columns %s; "
+ "retention on non-partitioned columns is not supported because it cannot be executed as a metadata-only delete.",
retention.getColumnPattern().getColumnName(),
tableUri,
clustering.stream()
.map(ClusteringColumn::getColumnName)
.collect(Collectors.toList()));
return false;
}
}

return true;
}

/** Returns true iff {@code columnName} matches one of the table's clustering columns. */
private boolean isClusteringColumn(String columnName, List<ClusteringColumn> clustering) {
if (columnName == null) {
return false;
}
return clustering.stream()
.map(ClusteringColumn::getColumnName)
.anyMatch(name -> Objects.equals(name, columnName));
}

/**
* Validate the pattern provided by users are legit pattern that complies with {@link
* DateTimeFormatter} symbols. Also, the provided column name needs to be part of schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@

import com.linkedin.openhouse.common.api.spec.TableUri;
import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern;
import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -147,16 +150,50 @@ void testValidate() {
validator.validate(
requestBodyMissingPatternAndTimePartitionSpec, TableUri.builder().build()));

// Positive: Only have pattern but no timepartitionSpec
// Positive: Only have pattern but no timepartitionSpec, and retention column is a
// clustering column.
CreateUpdateTableRequestBody requestBodyNoTimePartitionSpec =
createRequestBodyWithRetentionPolicy(
RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").build(),
RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(),
1,
TimePartitionSpec.Granularity.DAY,
null);
null,
Collections.singletonList(ClusteringColumn.builder().columnName("aa").build()));
Assertions.assertTrue(
validator.validate(requestBodyNoTimePartitionSpec, TableUri.builder().build()));

// Negative: non-time-partitioned table with retention column that is NOT a clustering column.
CreateUpdateTableRequestBody requestBodyNonPartitionRetentionColumn =
createRequestBodyWithRetentionPolicy(
RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(),
1,
TimePartitionSpec.Granularity.DAY,
null,
Collections.singletonList(ClusteringColumn.builder().columnName("id").build()));
Assertions.assertFalse(
validator.validate(requestBodyNonPartitionRetentionColumn, TableUri.builder().build()));

Field failureMessageField =
org.springframework.util.ReflectionUtils.findField(
RetentionPolicySpecValidator.class, "failureMessage");
Assertions.assertNotNull(failureMessageField);
org.springframework.util.ReflectionUtils.makeAccessible(failureMessageField);
Assertions.assertTrue(
((String) org.springframework.util.ReflectionUtils.getField(failureMessageField, validator))
.contains("must be one of the clustering columns"));

// Negative: non-time-partitioned table with retention column when no clustering is defined
// (fully non-partitioned table).
CreateUpdateTableRequestBody requestBodyNoPartitioningAtAll =
createRequestBodyWithRetentionPolicy(
RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(),
1,
TimePartitionSpec.Granularity.DAY,
null,
null);
Assertions.assertFalse(
validator.validate(requestBodyNoPartitioningAtAll, TableUri.builder().build()));

// Negative: Having both timepartitionspec AND pattern
CreateUpdateTableRequestBody requestBodyBothPatternAndTimePartitionSpec =
createRequestBodyWithRetentionPolicy(
Expand Down Expand Up @@ -225,6 +262,16 @@ private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy(
int retentionCount,
TimePartitionSpec.Granularity granularity,
TimePartitionSpec timePartitioning) {
return createRequestBodyWithRetentionPolicy(
pattern, retentionCount, granularity, timePartitioning, null);
}

private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy(
RetentionColumnPattern pattern,
int retentionCount,
TimePartitionSpec.Granularity granularity,
TimePartitionSpec timePartitioning,
List<ClusteringColumn> clustering) {
Retention retention =
Retention.builder()
.count(retentionCount)
Expand All @@ -236,6 +283,7 @@ private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy(
.policies(policiesInvalidColumnNameCasing)
.schema(getSchemaJsonFromSchema(dummySchema))
.timePartitioning(timePartitioning)
.clustering(clustering)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ public void testUpdatePolicies() throws Exception {
.columnPattern(
RetentionColumnPattern.builder()
.pattern("yyyy-MM-dd-HH")
.columnName("timestampCol")
.columnName("name")
.build())
.build();
Policies newPolicies = Policies.builder().retention(retention).build();
Expand Down Expand Up @@ -644,7 +644,7 @@ public void testUpdatePolicies() throws Exception {
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
"timestampCol");
"name");
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"),
"yyyy-MM-dd-HH");
Expand Down Expand Up @@ -1284,10 +1284,7 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
.count(4)
.granularity(TimePartitionSpec.Granularity.HOUR)
.columnPattern(
RetentionColumnPattern.builder()
.pattern("yyyy-MM-dd")
.columnName("timestampCol")
.build())
RetentionColumnPattern.builder().pattern("yyyy-MM-dd").columnName("name").build())
.build();
Policies newPolicies = Policies.builder().replication(replication).retention(retention).build();

Expand All @@ -1314,7 +1311,7 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception {
Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4);
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"),
"timestampCol");
"name");
Assertions.assertEquals(
((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"),
"yyyy-MM-dd");
Expand Down
Loading