diff --git a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidator.java b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidator.java index 9f68ac32a..53a8bb692 100644 --- a/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidator.java +++ b/services/tables/src/main/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidator.java @@ -4,11 +4,16 @@ import com.linkedin.openhouse.common.api.spec.TableUri; import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import com.linkedin.openhouse.tables.common.DefaultColumnPattern; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -36,6 +41,10 @@ public boolean validate( CreateUpdateTableRequestBody createUpdateTableRequestBody, TableUri tableUri) { Retention retention = createUpdateTableRequestBody.getPolicies().getRetention(); TimePartitionSpec timePartitioning = createUpdateTableRequestBody.getTimePartitioning(); + List clustering = + createUpdateTableRequestBody.getClustering() == null + ? Collections.emptyList() + : createUpdateTableRequestBody.getClustering(); String schema = createUpdateTableRequestBody.getSchema(); if (retention != null) { @@ -82,11 +91,39 @@ public boolean validate( tableUri); return false; } + // Retention DELETE must be satisfiable by partition pruning. When the table is not + // time-partitioned, the retention column must be one of the clustering columns; + // otherwise the retention job would degrade into a row-level rewrite (or fail outright + // when backup is enabled). + if (timePartitioning == null + && retention.getColumnPattern() != null + && !isClusteringColumn(retention.getColumnPattern().getColumnName(), clustering)) { + failureMessage = + String.format( + "Retention column[%s] for table %s must be one of the clustering columns %s; " + + "retention on non-partitioned columns is not supported because it cannot be executed as a metadata-only delete.", + retention.getColumnPattern().getColumnName(), + tableUri, + clustering.stream() + .map(ClusteringColumn::getColumnName) + .collect(Collectors.toList())); + return false; + } } return true; } + /** Returns true iff {@code columnName} matches one of the table's clustering columns. */ + private boolean isClusteringColumn(String columnName, List clustering) { + if (columnName == null) { + return false; + } + return clustering.stream() + .map(ClusteringColumn::getColumnName) + .anyMatch(name -> Objects.equals(name, columnName)); + } + /** * Validate the pattern provided by users are legit pattern that complies with {@link * DateTimeFormatter} symbols. Also, the provided column name needs to be part of schema. diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidatorTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidatorTest.java index 3f50a8906..289cf7b16 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidatorTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/api/validator/impl/RetentionPolicySpecValidatorTest.java @@ -5,11 +5,14 @@ import com.linkedin.openhouse.common.api.spec.TableUri; import com.linkedin.openhouse.tables.api.spec.v0.request.CreateUpdateTableRequestBody; +import com.linkedin.openhouse.tables.api.spec.v0.request.components.ClusteringColumn; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Policies; import com.linkedin.openhouse.tables.api.spec.v0.request.components.Retention; import com.linkedin.openhouse.tables.api.spec.v0.request.components.RetentionColumnPattern; import com.linkedin.openhouse.tables.api.spec.v0.request.components.TimePartitionSpec; import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; import org.apache.iceberg.Schema; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Assertions; @@ -147,16 +150,50 @@ void testValidate() { validator.validate( requestBodyMissingPatternAndTimePartitionSpec, TableUri.builder().build())); - // Positive: Only have pattern but no timepartitionSpec + // Positive: Only have pattern but no timepartitionSpec, and retention column is a + // clustering column. CreateUpdateTableRequestBody requestBodyNoTimePartitionSpec = createRequestBodyWithRetentionPolicy( - RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").build(), + RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(), 1, TimePartitionSpec.Granularity.DAY, - null); + null, + Collections.singletonList(ClusteringColumn.builder().columnName("aa").build())); Assertions.assertTrue( validator.validate(requestBodyNoTimePartitionSpec, TableUri.builder().build())); + // Negative: non-time-partitioned table with retention column that is NOT a clustering column. + CreateUpdateTableRequestBody requestBodyNonPartitionRetentionColumn = + createRequestBodyWithRetentionPolicy( + RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(), + 1, + TimePartitionSpec.Granularity.DAY, + null, + Collections.singletonList(ClusteringColumn.builder().columnName("id").build())); + Assertions.assertFalse( + validator.validate(requestBodyNonPartitionRetentionColumn, TableUri.builder().build())); + + Field failureMessageField = + org.springframework.util.ReflectionUtils.findField( + RetentionPolicySpecValidator.class, "failureMessage"); + Assertions.assertNotNull(failureMessageField); + org.springframework.util.ReflectionUtils.makeAccessible(failureMessageField); + Assertions.assertTrue( + ((String) org.springframework.util.ReflectionUtils.getField(failureMessageField, validator)) + .contains("must be one of the clustering columns")); + + // Negative: non-time-partitioned table with retention column when no clustering is defined + // (fully non-partitioned table). + CreateUpdateTableRequestBody requestBodyNoPartitioningAtAll = + createRequestBodyWithRetentionPolicy( + RetentionColumnPattern.builder().pattern("yyyy-mm-dd-hh").columnName("aa").build(), + 1, + TimePartitionSpec.Granularity.DAY, + null, + null); + Assertions.assertFalse( + validator.validate(requestBodyNoPartitioningAtAll, TableUri.builder().build())); + // Negative: Having both timepartitionspec AND pattern CreateUpdateTableRequestBody requestBodyBothPatternAndTimePartitionSpec = createRequestBodyWithRetentionPolicy( @@ -225,6 +262,16 @@ private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy( int retentionCount, TimePartitionSpec.Granularity granularity, TimePartitionSpec timePartitioning) { + return createRequestBodyWithRetentionPolicy( + pattern, retentionCount, granularity, timePartitioning, null); + } + + private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy( + RetentionColumnPattern pattern, + int retentionCount, + TimePartitionSpec.Granularity granularity, + TimePartitionSpec timePartitioning, + List clustering) { Retention retention = Retention.builder() .count(retentionCount) @@ -236,6 +283,7 @@ private CreateUpdateTableRequestBody createRequestBodyWithRetentionPolicy( .policies(policiesInvalidColumnNameCasing) .schema(getSchemaJsonFromSchema(dummySchema)) .timePartitioning(timePartitioning) + .clustering(clustering) .build(); } } diff --git a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java index 777adc6fa..335e0e2ec 100644 --- a/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java +++ b/services/tables/src/test/java/com/linkedin/openhouse/tables/e2e/h2/TablesControllerTest.java @@ -616,7 +616,7 @@ public void testUpdatePolicies() throws Exception { .columnPattern( RetentionColumnPattern.builder() .pattern("yyyy-MM-dd-HH") - .columnName("timestampCol") + .columnName("name") .build()) .build(); Policies newPolicies = Policies.builder().retention(retention).build(); @@ -644,7 +644,7 @@ public void testUpdatePolicies() throws Exception { Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"), - "timestampCol"); + "name"); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"), "yyyy-MM-dd-HH"); @@ -1284,10 +1284,7 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { .count(4) .granularity(TimePartitionSpec.Granularity.HOUR) .columnPattern( - RetentionColumnPattern.builder() - .pattern("yyyy-MM-dd") - .columnName("timestampCol") - .build()) + RetentionColumnPattern.builder().pattern("yyyy-MM-dd").columnName("name").build()) .build(); Policies newPolicies = Policies.builder().replication(replication).retention(retention).build(); @@ -1314,7 +1311,7 @@ public void testUpdateSucceedsForReplicationAndRetention() throws Exception { Assertions.assertEquals(updatedPolicies.get("retention").get("count"), 4); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("columnName"), - "timestampCol"); + "name"); Assertions.assertEquals( ((HashMap) updatedPolicies.get("retention").get("columnPattern")).get("pattern"), "yyyy-MM-dd");