diff --git a/apps/optimizer-analyzer/build.gradle b/apps/optimizer-analyzer/build.gradle new file mode 100644 index 000000000..f66ecc608 --- /dev/null +++ b/apps/optimizer-analyzer/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation project(':services:optimizer') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' + testRuntimeOnly 'com.h2database:h2' +} + +test { + useJUnitPlatform() +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java new file mode 100644 index 000000000..edee9c02e --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerApplication.java @@ -0,0 +1,29 @@ +package com.linkedin.openhouse.analyzer; + +import java.util.List; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.Bean; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** Entry point for the Optimizer Analyzer application. */ +@SpringBootApplication +@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.entity") +@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") +public class AnalyzerApplication { + + public static void main(String[] args) { + SpringApplication.run(AnalyzerApplication.class, args); + } + + /** + * Runs the analyzer once per registered {@link OperationAnalyzer} per process invocation. Each + * call is scoped to one operation type; the runner iterates databases internally. + */ + @Bean + public CommandLineRunner run(AnalyzerRunner runner, List analyzers) { + return args -> analyzers.forEach(a -> runner.analyze(a.getOperationType())); + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java new file mode 100644 index 000000000..265b9d303 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/AnalyzerRunner.java @@ -0,0 +1,146 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationType; +import com.linkedin.openhouse.optimizer.model.Table; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * Core analysis loop. For one operation type per call, iterates databases and evaluates each table + * in a database against the matching {@link OperationAnalyzer}. + * + *

Both sides of the join — current operations and latest history per (table, type) — are loaded + * into maps once per database before the table loop. This is correct at small scale (≤~100k + * tables); past that the per-db query shape and projection need further tuning. Scale-up work is + * tracked in BDP-102182. + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class AnalyzerRunner { + + private final List analyzers; + private final TableStatsRepository statsRepo; + private final TableOperationsRepository operationsRepo; + private final TableOperationsHistoryRepository historyRepo; + + /** + * Run the analysis loop for {@code operationType} across all databases, with no filters. + * Equivalent to {@link #analyze(OperationType, Optional, Optional, Optional)} with all-empty + * filters. + */ + public void analyze(OperationType operationType) { + analyze(operationType, Optional.empty(), Optional.empty(), Optional.empty()); + } + + /** + * Run the analysis loop for the given operation type, optionally scoped to a single database, + * table name, or table UUID. Iterates databases one at a time so the working set is bounded by + * tables-per-db, not tables-total. + */ + public void analyze( + OperationType operationType, + Optional databaseName, + Optional tableName, + Optional tableUuid) { + Optional analyzerOpt = + analyzers.stream().filter(a -> a.getOperationType() == operationType).findFirst(); + if (analyzerOpt.isEmpty()) { + log.warn("No analyzer registered for operation type {}; skipping", operationType); + return; + } + OperationAnalyzer analyzer = analyzerOpt.get(); + List dbs = databaseName.map(List::of).orElseGet(statsRepo::findDistinctDatabaseNames); + log.info("Analyzing {} across {} database(s)", operationType, dbs.size()); + dbs.forEach(db -> analyzeDatabase(analyzer, db, tableName, tableUuid)); + log.info("Analysis complete for {}", operationType); + } + + private void analyzeDatabase( + OperationAnalyzer analyzer, + String databaseName, + Optional tableName, + Optional tableUuid) { + + com.linkedin.openhouse.optimizer.db.OperationType dbOperationType = + analyzer.getOperationType().toDb(); + + // Pre-load the small sides of the joins — bounded by tables in this database. + Map currentOps = + operationsRepo + .find( + dbOperationType, null, tableUuid.orElse(null), databaseName, tableName.orElse(null)) + .stream() + .filter(e -> e.getTableUuid() != null) + .map(TableOperation::fromRow) + .collect( + Collectors.toMap( + TableOperation::getTableUuid, op -> op, TableOperation::mostRecent)); + + Map latestHistory = + historyRepo.findLatestPerTable(dbOperationType).stream() + .filter(r -> r.getTableUuid() != null) + .map(TableOperationsHistory::fromRow) + .collect( + Collectors.toMap( + TableOperationsHistory::getTableUuid, + h -> h, + AnalyzerRunner::moreRecentHistory)); + + List tables = + statsRepo.find(databaseName, tableName.orElse(null), tableUuid.orElse(null)).stream() + .filter(row -> row.getTableUuid() != null) + .map(Table::fromRow) + .collect(Collectors.toList()); + + /* + * For each table in this database, decide whether to create a new PENDING operation. + * + * 1. Skip tables not opted in to this operation type. The opt-in check today reads a + * table-property flag; in the future it will read a denormalized column. + * 2. Look up the table's current active operation (if any) and its most recent completed + * history entry from the maps loaded above. + * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy + * encapsulates cadence, retry policy, and any future per-operation signals. + * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass. + */ + tables.forEach( + table -> { + if (!analyzer.isEnabled(table)) { + return; + } + Optional currentOp = + Optional.ofNullable(currentOps.get(table.getTableUuid())); + Optional entry = + Optional.ofNullable(latestHistory.get(table.getTableUuid())); + if (analyzer.shouldSchedule(table, currentOp, entry)) { + TableOperation op = TableOperation.pending(table, analyzer.getOperationType()); + operationsRepo.save(op.toRow()); + log.info( + "Created PENDING {} operation for table {}.{}", + analyzer.getOperationType(), + table.getDatabaseName(), + table.getTableId()); + } + }); + } + + private static TableOperationsHistory moreRecentHistory( + TableOperationsHistory a, TableOperationsHistory b) { + Comparator byCompletedAt = + Comparator.comparing(r -> r.getCompletedAt() != null ? r.getCompletedAt() : Instant.EPOCH); + return byCompletedAt.compare(a, b) >= 0 ? a : b; + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java new file mode 100644 index 000000000..394b77eca --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzer.java @@ -0,0 +1,51 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationType; +import com.linkedin.openhouse.optimizer.model.Table; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; +import java.time.Duration; +import java.util.Optional; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** Analyzer for the {@link OperationType#ORPHAN_FILES_DELETION} operation type. */ +@Component +public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyzer { + + static final String OFD_ENABLED_PROPERTY = "maintenance.optimizer.ofd.enabled"; + + private final CadencePolicy cadencePolicy; + + @Autowired + public CadenceBasedOrphanFilesDeletionAnalyzer( + @Value("${ofd.success-retry-hours:24}") long successRetryHours, + @Value("${ofd.failure-retry-hours:1}") long failureRetryHours) { + this.cadencePolicy = + new CadencePolicy(Duration.ofHours(successRetryHours), Duration.ofHours(failureRetryHours)); + } + + /** Package-private for tests that supply a pre-built {@link CadencePolicy}. */ + CadenceBasedOrphanFilesDeletionAnalyzer(CadencePolicy cadencePolicy) { + this.cadencePolicy = cadencePolicy; + } + + @Override + public OperationType getOperationType() { + return OperationType.ORPHAN_FILES_DELETION; + } + + @Override + public boolean isEnabled(Table table) { + return "true".equals(table.getTableProperties().get(OFD_ENABLED_PROPERTY)); + } + + @Override + public boolean shouldSchedule( + Table table, + Optional currentOp, + Optional latestHistory) { + return cadencePolicy.shouldSchedule(currentOp, latestHistory); + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java new file mode 100644 index 000000000..6ce2db80c --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/CadencePolicy.java @@ -0,0 +1,57 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.HistoryStatus; +import com.linkedin.openhouse.optimizer.model.OperationStatus; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import lombok.RequiredArgsConstructor; + +/** + * Time-based scheduling policy. An analyzer delegates to {@link CadencePolicy} to decide whether to + * re-issue a recommendation for a table. + * + *

The analyzer stays out of any table that already has a non-CANCELED active operation — those + * belong to the scheduler. For tables with no active operation (or only a CANCELED one), the + * decision is based on the most recent completed-history entry: re-evaluate after {@code + * successRetryInterval} on success, or after {@code failureRetryInterval} on failure. + */ +@RequiredArgsConstructor +public class CadencePolicy { + + /** + * How long to wait after a successful operation before re-evaluating the table. For example, if + * set to 24 hours and OFD succeeded at 10:00 AM Monday, the table won't be scheduled again until + * after 10:00 AM Tuesday. + */ + private final Duration successRetryInterval; + + /** + * How long to wait after a failed operation before retrying. Shorter than the success interval to + * allow quick recovery. For example, if set to 1 hour and OFD failed at 2:00 PM, the table + * becomes eligible for retry at 3:00 PM. + */ + private final Duration failureRetryInterval; + + /** + * Returns {@code true} if a new or refreshed operation record should be upserted. + * + * @param currentOp the existing active operation record, or empty if none exists + * @param latestHistory the most recent history entry for this (table, type), or empty + */ + public boolean shouldSchedule( + Optional currentOp, Optional latestHistory) { + if (currentOp.isPresent() && currentOp.get().getStatus() != OperationStatus.CANCELED) { + return false; + } + return latestHistory.map(this::readyAfterHistoryEntry).orElse(true); + } + + private boolean readyAfterHistoryEntry(TableOperationsHistory entry) { + Duration interval = + entry.getStatus() == HistoryStatus.FAILED ? failureRetryInterval : successRetryInterval; + return Duration.between(entry.getCompletedAt(), Instant.now()).compareTo(interval) > 0; + } +} diff --git a/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java new file mode 100644 index 000000000..ba64f558a --- /dev/null +++ b/apps/optimizer-analyzer/src/main/java/com/linkedin/openhouse/analyzer/OperationAnalyzer.java @@ -0,0 +1,41 @@ +package com.linkedin.openhouse.analyzer; + +import com.linkedin.openhouse.optimizer.model.OperationType; +import com.linkedin.openhouse.optimizer.model.Table; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; +import java.util.Optional; + +/** + * Strategy interface for a single operation type. Each implementation decides whether a given table + * needs an operation recommendation upserted in the Optimizer Service. + * + *

// TODO(circuit-breaker): a chronically-failing table currently produces a new PENDING row on + * every Analyzer pass. Add a circuit breaker that suppresses scheduling for a (table, type) after N + * consecutive FAILED history entries. Requirements: configurable threshold per operation type, + * automatic reset via exponential backoff so tables can recover, and an operator-visible signal + * (metric or query) so tripped breakers are diagnosable. + */ +public interface OperationAnalyzer { + + /** The operation type this analyzer handles. */ + OperationType getOperationType(); + + /** + * Returns {@code true} if this operation is opted-in for the given table. Tables that return + * {@code false} are skipped entirely — no upsert is issued. + */ + boolean isEnabled(Table table); + + /** + * Returns {@code true} if a new or refreshed operation record should be upserted. + * + * @param table the table entry + * @param currentOp the existing active operation record, or empty if none exists + * @param latestHistory the most recent history entry for this (table, type), or empty + */ + boolean shouldSchedule( + Table table, + Optional currentOp, + Optional latestHistory); +} diff --git a/apps/optimizer-analyzer/src/main/resources/application.properties b/apps/optimizer-analyzer/src/main/resources/application.properties new file mode 100644 index 000000000..1df0bea15 --- /dev/null +++ b/apps/optimizer-analyzer/src/main/resources/application.properties @@ -0,0 +1,8 @@ +spring.application.name=openhouse-optimizer-analyzer +spring.main.web-application-type=none +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:analyzerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} +spring.datasource.username=${OPTIMIZER_DB_USER:sa} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} +spring.jpa.hibernate.ddl-auto=none +ofd.success-retry-hours=24 +ofd.failure-retry-hours=1 diff --git a/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java new file mode 100644 index 000000000..fe9561eb9 --- /dev/null +++ b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/AnalyzerRunnerTest.java @@ -0,0 +1,167 @@ +package com.linkedin.openhouse.analyzer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.OperationType; +import com.linkedin.openhouse.optimizer.model.Table; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import java.time.Instant; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class AnalyzerRunnerTest { + + private static final OperationType OFD_TYPE = OperationType.ORPHAN_FILES_DELETION; + private static final com.linkedin.openhouse.optimizer.db.OperationType OFD_DB = + com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION; + private static final String DB = "db1"; + + @Mock private TableStatsRepository statsRepo; + @Mock private TableOperationsRepository operationsRepo; + @Mock private TableOperationsHistoryRepository historyRepo; + @Mock private OperationAnalyzer analyzer; + + private AnalyzerRunner runner; + + @BeforeEach + void setUp() { + runner = new AnalyzerRunner(List.of(analyzer), statsRepo, operationsRepo, historyRepo); + when(analyzer.getOperationType()).thenReturn(OFD_TYPE); + when(statsRepo.findDistinctDatabaseNames()).thenReturn(List.of(DB)); + } + + @Test + void analyze_insertsNewRow_forEligibleTableWithNoExistingOp() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build(); + + Table expectedTable = Table.fromRow(statsEntity); + + when(statsRepo.find(DB, null, null)).thenReturn(List.of(statsEntity)); + when(operationsRepo.find(OFD_DB, null, null, DB, null)).thenReturn(Collections.emptyList()); + when(historyRepo.findLatestPerTable(OFD_DB)).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + when(analyzer.shouldSchedule(expectedTable, Optional.empty(), Optional.empty())) + .thenReturn(true); + + runner.analyze(OFD_TYPE); + + ArgumentCaptor captor = ArgumentCaptor.forClass(TableOperationsRow.class); + verify(operationsRepo).save(captor.capture()); + TableOperationsRow saved = captor.getValue(); + assertThat(saved.getTableUuid()).isEqualTo("uuid-1"); + assertThat(saved.getDatabaseName()).isEqualTo(DB); + assertThat(saved.getTableName()).isEqualTo("tbl1"); + assertThat(saved.getOperationType()).isEqualTo(OFD_DB); + assertThat(saved.getStatus()) + .isEqualTo(com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING); + assertThat(saved.getId()).isNotNull(); + } + + @Test + void analyze_noOp_whenCadencePolicyReturnsFalseForPending() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).tableName("tbl1").build(); + + Table expectedTable = Table.fromRow(statsEntity); + + TableOperationsRow existingEntity = + TableOperationsRow.builder() + .id("existing-op-id") + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.PENDING) + .tableUuid("uuid-1") + .operationType(OFD_DB) + .createdAt(Instant.now()) + .build(); + + when(statsRepo.find(DB, null, null)).thenReturn(List.of(statsEntity)); + when(operationsRepo.find(OFD_DB, null, null, DB, null)).thenReturn(List.of(existingEntity)); + when(historyRepo.findLatestPerTable(OFD_DB)).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + + TableOperation existingOp = TableOperation.fromRow(existingEntity); + when(analyzer.shouldSchedule(expectedTable, Optional.of(existingOp), Optional.empty())) + .thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenNotEnabled() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).build(); + + Table expectedTable = Table.fromRow(statsEntity); + + when(statsRepo.find(DB, null, null)).thenReturn(List.of(statsEntity)); + when(operationsRepo.find(OFD_DB, null, null, DB, null)).thenReturn(Collections.emptyList()); + when(historyRepo.findLatestPerTable(OFD_DB)).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenShouldScheduleReturnsFalse() { + TableStatsRow statsEntity = + TableStatsRow.builder().tableUuid("uuid-1").databaseName(DB).build(); + + Table expectedTable = Table.fromRow(statsEntity); + + TableOperationsRow scheduled = + TableOperationsRow.builder() + .id("op-id") + .status(com.linkedin.openhouse.optimizer.db.OperationStatus.SCHEDULED) + .tableUuid("uuid-1") + .operationType(OFD_DB) + .createdAt(Instant.now()) + .build(); + + when(statsRepo.find(DB, null, null)).thenReturn(List.of(statsEntity)); + when(operationsRepo.find(OFD_DB, null, null, DB, null)).thenReturn(List.of(scheduled)); + when(historyRepo.findLatestPerTable(OFD_DB)).thenReturn(Collections.emptyList()); + when(analyzer.isEnabled(expectedTable)).thenReturn(true); + + TableOperation scheduledOp = TableOperation.fromRow(scheduled); + when(analyzer.shouldSchedule(expectedTable, Optional.of(scheduledOp), Optional.empty())) + .thenReturn(false); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } + + @Test + void analyze_skipsTable_whenTableUuidIsNull() { + TableStatsRow statsEntity = TableStatsRow.builder().databaseName(DB).build(); + + when(statsRepo.find(DB, null, null)).thenReturn(List.of(statsEntity)); + when(operationsRepo.find(OFD_DB, null, null, DB, null)).thenReturn(Collections.emptyList()); + when(historyRepo.findLatestPerTable(any())).thenReturn(Collections.emptyList()); + + runner.analyze(OFD_TYPE); + + verify(operationsRepo, never()).save(any()); + } +} diff --git a/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java new file mode 100644 index 000000000..633c9dceb --- /dev/null +++ b/apps/optimizer-analyzer/src/test/java/com/linkedin/openhouse/analyzer/CadenceBasedOrphanFilesDeletionAnalyzerTest.java @@ -0,0 +1,203 @@ +package com.linkedin.openhouse.analyzer; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.HistoryStatus; +import com.linkedin.openhouse.optimizer.model.OperationStatus; +import com.linkedin.openhouse.optimizer.model.Table; +import com.linkedin.openhouse.optimizer.model.TableOperation; +import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; +import java.time.Duration; +import java.time.Instant; +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class CadenceBasedOrphanFilesDeletionAnalyzerTest { + + private static final Duration SUCCESS_INTERVAL = Duration.ofHours(24); + private static final Duration FAILURE_INTERVAL = Duration.ofHours(1); + + private CadenceBasedOrphanFilesDeletionAnalyzer analyzer; + + @BeforeEach + void setUp() { + analyzer = + new CadenceBasedOrphanFilesDeletionAnalyzer( + new CadencePolicy(SUCCESS_INTERVAL, FAILURE_INTERVAL)); + } + + // --- isEnabled --- + + @Test + void isEnabled_returnsTrue_whenPropertySet() { + assertThat(analyzer.isEnabled(tableWithProperty("true"))).isTrue(); + } + + @Test + void isEnabled_returnsFalse_whenPropertyAbsent() { + assertThat(analyzer.isEnabled(tableWithProperty(null))).isFalse(); + } + + @Test + void isEnabled_returnsFalse_whenPropertyFalse() { + assertThat(analyzer.isEnabled(tableWithProperty("false"))).isFalse(); + } + + @Test + void isEnabled_returnsFalse_whenTablePropertiesEmpty() { + Table table = Table.builder().tableUuid("uuid").build(); + assertThat(analyzer.isEnabled(table)).isFalse(); + } + + // --- shouldSchedule: no existing op --- + + @Test + void shouldSchedule_noOp_noHistory_returnsTrue() { + assertThat( + analyzer.shouldSchedule(tableWithProperty("true"), Optional.empty(), Optional.empty())) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_successHistoryAfterCooldown_returnsTrue() { + Instant longAgo = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatus.SUCCESS, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_successHistoryBeforeCooldown_returnsFalse() { + Instant recent = Instant.now().minus(SUCCESS_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatus.SUCCESS, recent)))) + .isFalse(); + } + + @Test + void shouldSchedule_noOp_failedHistoryAfterRetry_returnsTrue() { + Instant longAgo = Instant.now().minus(FAILURE_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatus.FAILED, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_noOp_failedHistoryBeforeRetry_returnsFalse() { + Instant recent = Instant.now().minus(FAILURE_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.empty(), + Optional.of(historyWithStatus(HistoryStatus.FAILED, recent)))) + .isFalse(); + } + + // --- shouldSchedule: active op (non-CANCELED) → analyzer stays out --- + + @Test + void shouldSchedule_pending_returnsFalse() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.PENDING)), + Optional.empty())) + .isFalse(); + } + + @Test + void shouldSchedule_scheduling_returnsFalse() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.SCHEDULING)), + Optional.empty())) + .isFalse(); + } + + @Test + void shouldSchedule_scheduled_returnsFalse_regardlessOfHistory() { + Instant historyAt = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.SCHEDULED)), + Optional.of(historyWithStatus(HistoryStatus.SUCCESS, historyAt)))) + .isFalse(); + } + + // --- shouldSchedule: CANCELED → cadence on history --- + + @Test + void shouldSchedule_canceled_successHistoryAfterCooldown_returnsTrue() { + Instant longAgo = Instant.now().minus(SUCCESS_INTERVAL).minusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.CANCELED)), + Optional.of(historyWithStatus(HistoryStatus.SUCCESS, longAgo)))) + .isTrue(); + } + + @Test + void shouldSchedule_canceled_successHistoryBeforeCooldown_returnsFalse() { + Instant recent = Instant.now().minus(SUCCESS_INTERVAL).plusSeconds(60); + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.CANCELED)), + Optional.of(historyWithStatus(HistoryStatus.SUCCESS, recent)))) + .isFalse(); + } + + @Test + void shouldSchedule_canceled_noHistory_returnsTrue() { + assertThat( + analyzer.shouldSchedule( + tableWithProperty("true"), + Optional.of(opWithStatus(OperationStatus.CANCELED)), + Optional.empty())) + .isTrue(); + } + + // --- helpers --- + + private Table tableWithProperty(String value) { + Map props = + value == null + ? Collections.emptyMap() + : Map.of(CadenceBasedOrphanFilesDeletionAnalyzer.OFD_ENABLED_PROPERTY, value); + return Table.builder() + .tableUuid("test-uuid") + .databaseName("db1") + .tableId("tbl1") + .tableProperties(props) + .build(); + } + + private TableOperation opWithStatus(OperationStatus status) { + return TableOperation.builder().status(status).build(); + } + + private TableOperationsHistory historyWithStatus(HistoryStatus status, Instant completedAt) { + return TableOperationsHistory.builder() + .id("hist-id") + .tableUuid("test-uuid") + .operationType(com.linkedin.openhouse.optimizer.model.OperationType.ORPHAN_FILES_DELETION) + .completedAt(completedAt) + .status(status) + .build(); + } +} diff --git a/settings.gradle b/settings.gradle index cad06785e..7942f44d3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -50,6 +50,7 @@ include ':services:common' include ':services:housetables' include ':services:jobs' include ':services:optimizer' +include ':apps:optimizer-analyzer' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'