-
Notifications
You must be signed in to change notification settings - Fork 77
[BDP-102028] feat(optimizer): [3/N] Analyzer #533
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: mkuchenb/optimizer-2
Are you sure you want to change the base?
Changes from all commits
c0802cb
63b0768
936f3f3
67538f8
11dd115
4f4158d
7170599
5f6fa3b
8054586
5af5f14
442f1e4
c4f194a
6da624a
52ba858
beedad8
e4a1ad1
f663537
d7e3a65
a5585f4
dd4faf2
91ba362
35bcd38
0dbe3d9
d44783f
a5df7e4
b0898e3
fb71bd9
7ea8868
d7767e8
4a8796c
ef453ca
ad11533
51dab67
619df83
0b30130
6a23755
95456be
d82c17f
1d56fa6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| plugins { | ||
| id 'openhouse.springboot-ext-conventions' | ||
| id 'org.springframework.boot' version '2.7.8' | ||
| } | ||
|
|
||
| dependencies { | ||
| implementation project(':services:optimizer') | ||
| implementation 'org.springframework.boot:spring-boot-starter:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' | ||
| implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' | ||
| runtimeOnly 'mysql:mysql-connector-java:8.0.33' | ||
| testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' | ||
| testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0' | ||
| testRuntimeOnly 'com.h2database:h2' | ||
| } | ||
|
|
||
| test { | ||
| useJUnitPlatform() | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| package com.linkedin.openhouse.analyzer; | ||
|
|
||
| import java.util.List; | ||
| import org.springframework.boot.CommandLineRunner; | ||
| import org.springframework.boot.SpringApplication; | ||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
| import org.springframework.boot.autoconfigure.domain.EntityScan; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.data.jpa.repository.config.EnableJpaRepositories; | ||
|
|
||
| /** Entry point for the Optimizer Analyzer application. */ | ||
| @SpringBootApplication | ||
| @EntityScan(basePackages = "com.linkedin.openhouse.optimizer.entity") | ||
| @EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") | ||
| public class AnalyzerApplication { | ||
|
|
||
| public static void main(String[] args) { | ||
| SpringApplication.run(AnalyzerApplication.class, args); | ||
| } | ||
|
|
||
| /** | ||
| * Runs the analyzer once per registered {@link OperationAnalyzer} per process invocation. Each | ||
| * call is scoped to one operation type; the runner iterates databases internally. | ||
| */ | ||
| @Bean | ||
| public CommandLineRunner run(AnalyzerRunner runner, List<OperationAnalyzer> analyzers) { | ||
| return args -> analyzers.forEach(a -> runner.analyze(a.getOperationType())); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,146 @@ | ||
| package com.linkedin.openhouse.analyzer; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationType; | ||
| import com.linkedin.openhouse.optimizer.model.Table; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperation; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; | ||
| import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository; | ||
| import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; | ||
| import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; | ||
| import java.time.Instant; | ||
| import java.util.Comparator; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Optional; | ||
| import java.util.stream.Collectors; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| /** | ||
| * Core analysis loop. For one operation type per call, iterates databases and evaluates each table | ||
| * in a database against the matching {@link OperationAnalyzer}. | ||
| * | ||
| * <p>Both sides of the join — current operations and latest history per (table, type) — are loaded | ||
| * into maps once per database before the table loop. This is correct at small scale (≤~100k | ||
| * tables); past that the per-db query shape and projection need further tuning. Scale-up work is | ||
| * tracked in <a href="https://linkedin.atlassian.net/browse/BDP-102182">BDP-102182</a>. | ||
| */ | ||
| @Slf4j | ||
| @Component | ||
| @RequiredArgsConstructor | ||
| public class AnalyzerRunner { | ||
|
|
||
| private final List<OperationAnalyzer> analyzers; | ||
| private final TableStatsRepository statsRepo; | ||
| private final TableOperationsRepository operationsRepo; | ||
| private final TableOperationsHistoryRepository historyRepo; | ||
|
|
||
| /** | ||
| * Run the analysis loop for {@code operationType} across all databases, with no filters. | ||
| * Equivalent to {@link #analyze(OperationType, Optional, Optional, Optional)} with all-empty | ||
| * filters. | ||
| */ | ||
| public void analyze(OperationType operationType) { | ||
| analyze(operationType, Optional.empty(), Optional.empty(), Optional.empty()); | ||
| } | ||
|
|
||
| /** | ||
| * Run the analysis loop for the given operation type, optionally scoped to a single database, | ||
| * table name, or table UUID. Iterates databases one at a time so the working set is bounded by | ||
| * tables-per-db, not tables-total. | ||
| */ | ||
| public void analyze( | ||
| OperationType operationType, | ||
| Optional<String> databaseName, | ||
| Optional<String> tableName, | ||
| Optional<String> tableUuid) { | ||
| Optional<OperationAnalyzer> analyzerOpt = | ||
| analyzers.stream().filter(a -> a.getOperationType() == operationType).findFirst(); | ||
| if (analyzerOpt.isEmpty()) { | ||
| log.warn("No analyzer registered for operation type {}; skipping", operationType); | ||
| return; | ||
| } | ||
| OperationAnalyzer analyzer = analyzerOpt.get(); | ||
| List<String> dbs = databaseName.map(List::of).orElseGet(statsRepo::findDistinctDatabaseNames); | ||
| log.info("Analyzing {} across {} database(s)", operationType, dbs.size()); | ||
| dbs.forEach(db -> analyzeDatabase(analyzer, db, tableName, tableUuid)); | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. .get is free. OperationAnalyzer analyzer = analyzerOpt.get(); |
||
| log.info("Analysis complete for {}", operationType); | ||
| } | ||
|
|
||
| private void analyzeDatabase( | ||
| OperationAnalyzer analyzer, | ||
| String databaseName, | ||
| Optional<String> tableName, | ||
| Optional<String> tableUuid) { | ||
|
|
||
| com.linkedin.openhouse.optimizer.db.OperationType dbOperationType = | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Import vs using fully qualified names. apply the rule across all prs in the stack. make a plan. |
||
| analyzer.getOperationType().toDb(); | ||
|
|
||
| // Pre-load the small sides of the joins — bounded by tables in this database. | ||
| Map<String, TableOperation> currentOps = | ||
| operationsRepo | ||
| .find( | ||
| dbOperationType, null, tableUuid.orElse(null), databaseName, tableName.orElse(null)) | ||
| .stream() | ||
| .filter(e -> e.getTableUuid() != null) | ||
| .map(TableOperation::fromRow) | ||
| .collect( | ||
| Collectors.toMap( | ||
| TableOperation::getTableUuid, op -> op, TableOperation::mostRecent)); | ||
|
|
||
| Map<String, TableOperationsHistory> latestHistory = | ||
| historyRepo.findLatestPerTable(dbOperationType).stream() | ||
| .filter(r -> r.getTableUuid() != null) | ||
| .map(TableOperationsHistory::fromRow) | ||
| .collect( | ||
| Collectors.toMap( | ||
| TableOperationsHistory::getTableUuid, | ||
| h -> h, | ||
| AnalyzerRunner::moreRecentHistory)); | ||
|
|
||
| List<Table> tables = | ||
| statsRepo.find(databaseName, tableName.orElse(null), tableUuid.orElse(null)).stream() | ||
| .filter(row -> row.getTableUuid() != null) | ||
| .map(Table::fromRow) | ||
| .collect(Collectors.toList()); | ||
|
|
||
| /* | ||
| * For each table in this database, decide whether to create a new PENDING operation. | ||
| * | ||
| * 1. Skip tables not opted in to this operation type. The opt-in check today reads a | ||
| * table-property flag; in the future it will read a denormalized column. | ||
| * 2. Look up the table's current active operation (if any) and its most recent completed | ||
| * history entry from the maps loaded above. | ||
| * 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy | ||
| * encapsulates cadence, retry policy, and any future per-operation signals. | ||
| * 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass. | ||
| */ | ||
| tables.forEach( | ||
|
mkuchenbecker marked this conversation as resolved.
|
||
| table -> { | ||
| if (!analyzer.isEnabled(table)) { | ||
| return; | ||
| } | ||
| Optional<TableOperation> currentOp = | ||
| Optional.ofNullable(currentOps.get(table.getTableUuid())); | ||
| Optional<TableOperationsHistory> entry = | ||
| Optional.ofNullable(latestHistory.get(table.getTableUuid())); | ||
| if (analyzer.shouldSchedule(table, currentOp, entry)) { | ||
| TableOperation op = TableOperation.pending(table, analyzer.getOperationType()); | ||
| operationsRepo.save(op.toRow()); | ||
| log.info( | ||
| "Created PENDING {} operation for table {}.{}", | ||
| analyzer.getOperationType(), | ||
| table.getDatabaseName(), | ||
| table.getTableId()); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private static TableOperationsHistory moreRecentHistory( | ||
| TableOperationsHistory a, TableOperationsHistory b) { | ||
| Comparator<TableOperationsHistory> byCompletedAt = | ||
| Comparator.comparing(r -> r.getCompletedAt() != null ? r.getCompletedAt() : Instant.EPOCH); | ||
| return byCompletedAt.compare(a, b) >= 0 ? a : b; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| package com.linkedin.openhouse.analyzer; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationType; | ||
| import com.linkedin.openhouse.optimizer.model.Table; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperation; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; | ||
| import java.time.Duration; | ||
| import java.util.Optional; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.stereotype.Component; | ||
|
|
||
| /** Analyzer for the {@link OperationType#ORPHAN_FILES_DELETION} operation type. */ | ||
| @Component | ||
| public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyzer { | ||
|
|
||
| static final String OFD_ENABLED_PROPERTY = "maintenance.optimizer.ofd.enabled"; | ||
|
|
||
| private final CadencePolicy cadencePolicy; | ||
|
|
||
| @Autowired | ||
| public CadenceBasedOrphanFilesDeletionAnalyzer( | ||
| @Value("${ofd.success-retry-hours:24}") long successRetryHours, | ||
| @Value("${ofd.failure-retry-hours:1}") long failureRetryHours) { | ||
| this.cadencePolicy = | ||
| new CadencePolicy(Duration.ofHours(successRetryHours), Duration.ofHours(failureRetryHours)); | ||
| } | ||
|
|
||
| /** Package-private for tests that supply a pre-built {@link CadencePolicy}. */ | ||
| CadenceBasedOrphanFilesDeletionAnalyzer(CadencePolicy cadencePolicy) { | ||
| this.cadencePolicy = cadencePolicy; | ||
| } | ||
|
|
||
| @Override | ||
| public OperationType getOperationType() { | ||
| return OperationType.ORPHAN_FILES_DELETION; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isEnabled(Table table) { | ||
| return "true".equals(table.getTableProperties().get(OFD_ENABLED_PROPERTY)); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean shouldSchedule( | ||
| Table table, | ||
| Optional<TableOperation> currentOp, | ||
| Optional<TableOperationsHistory> latestHistory) { | ||
| return cadencePolicy.shouldSchedule(currentOp, latestHistory); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| package com.linkedin.openhouse.analyzer; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.HistoryStatus; | ||
| import com.linkedin.openhouse.optimizer.model.OperationStatus; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperation; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.util.Optional; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
||
| /** | ||
| * Time-based scheduling policy. An analyzer delegates to {@link CadencePolicy} to decide whether to | ||
| * re-issue a recommendation for a table. | ||
| * | ||
| * <p>The analyzer stays out of any table that already has a non-CANCELED active operation — those | ||
| * belong to the scheduler. For tables with no active operation (or only a CANCELED one), the | ||
| * decision is based on the most recent completed-history entry: re-evaluate after {@code | ||
| * successRetryInterval} on success, or after {@code failureRetryInterval} on failure. | ||
| */ | ||
| @RequiredArgsConstructor | ||
| public class CadencePolicy { | ||
|
|
||
| /** | ||
| * How long to wait after a successful operation before re-evaluating the table. For example, if | ||
| * set to 24 hours and OFD succeeded at 10:00 AM Monday, the table won't be scheduled again until | ||
| * after 10:00 AM Tuesday. | ||
| */ | ||
| private final Duration successRetryInterval; | ||
|
|
||
| /** | ||
| * How long to wait after a failed operation before retrying. Shorter than the success interval to | ||
| * allow quick recovery. For example, if set to 1 hour and OFD failed at 2:00 PM, the table | ||
| * becomes eligible for retry at 3:00 PM. | ||
| */ | ||
| private final Duration failureRetryInterval; | ||
|
|
||
| /** | ||
| * Returns {@code true} if a new or refreshed operation record should be upserted. | ||
| * | ||
| * @param currentOp the existing active operation record, or empty if none exists | ||
| * @param latestHistory the most recent history entry for this (table, type), or empty | ||
| */ | ||
| public boolean shouldSchedule( | ||
| Optional<TableOperation> currentOp, Optional<TableOperationsHistory> latestHistory) { | ||
| if (currentOp.isPresent() && currentOp.get().getStatus() != OperationStatus.CANCELED) { | ||
| return false; | ||
| } | ||
| return latestHistory.map(this::readyAfterHistoryEntry).orElse(true); | ||
| } | ||
|
|
||
| private boolean readyAfterHistoryEntry(TableOperationsHistory entry) { | ||
| Duration interval = | ||
| entry.getStatus() == HistoryStatus.FAILED ? failureRetryInterval : successRetryInterval; | ||
| return Duration.between(entry.getCompletedAt(), Instant.now()).compareTo(interval) > 0; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| package com.linkedin.openhouse.analyzer; | ||
|
|
||
| import com.linkedin.openhouse.optimizer.model.OperationType; | ||
| import com.linkedin.openhouse.optimizer.model.Table; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperation; | ||
| import com.linkedin.openhouse.optimizer.model.TableOperationsHistory; | ||
| import java.util.Optional; | ||
|
|
||
| /** | ||
| * Strategy interface for a single operation type. Each implementation decides whether a given table | ||
| * needs an operation recommendation upserted in the Optimizer Service. | ||
| * | ||
| * <p>// TODO(circuit-breaker): a chronically-failing table currently produces a new PENDING row on | ||
| * every Analyzer pass. Add a circuit breaker that suppresses scheduling for a (table, type) after N | ||
| * consecutive FAILED history entries. Requirements: configurable threshold per operation type, | ||
| * automatic reset via exponential backoff so tables can recover, and an operator-visible signal | ||
| * (metric or query) so tripped breakers are diagnosable. | ||
| */ | ||
| public interface OperationAnalyzer { | ||
|
|
||
| /** The operation type this analyzer handles. */ | ||
| OperationType getOperationType(); | ||
|
|
||
| /** | ||
| * Returns {@code true} if this operation is opted-in for the given table. Tables that return | ||
| * {@code false} are skipped entirely — no upsert is issued. | ||
| */ | ||
| boolean isEnabled(Table table); | ||
|
|
||
| /** | ||
| * Returns {@code true} if a new or refreshed operation record should be upserted. | ||
| * | ||
| * @param table the table entry | ||
| * @param currentOp the existing active operation record, or empty if none exists | ||
| * @param latestHistory the most recent history entry for this (table, type), or empty | ||
| */ | ||
| boolean shouldSchedule( | ||
| Table table, | ||
| Optional<TableOperation> currentOp, | ||
| Optional<TableOperationsHistory> latestHistory); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| spring.application.name=openhouse-optimizer-analyzer | ||
| spring.main.web-application-type=none | ||
| spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:analyzerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} | ||
| spring.datasource.username=${OPTIMIZER_DB_USER:sa} | ||
| spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} | ||
| spring.jpa.hibernate.ddl-auto=none | ||
| ofd.success-retry-hours=24 | ||
| ofd.failure-retry-hours=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impossible this is false