Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c0802cb
feat(optimizer): add analyzer app — continuous table operation schedu…
mkuchenbecker Apr 7, 2026
63b0768
fix: address PR review feedback on optimizer-3 analyzer
mkuchenbecker Apr 7, 2026
936f3f3
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker Apr 30, 2026
67538f8
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 1, 2026
11dd115
fix(optimizer): propagate optimizer-0 renames into apps/optimizer + a…
mkuchenbecker May 1, 2026
4f4158d
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 1, 2026
7170599
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 1, 2026
5f6fa3b
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 12, 2026
8054586
refactor(optimizer-analyzer): delete unused AnalyzerConfig
mkuchenbecker May 12, 2026
5af5f14
refactor(optimizer-analyzer): remove circuit breaker, defer with TODO
mkuchenbecker May 12, 2026
442f1e4
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 12, 2026
c4f194a
perf(optimizer-analyzer): use findLatestPerTable for history lookup
mkuchenbecker May 12, 2026
6da624a
refactor(optimizer-analyzer): typed OperationType/Status, polish cade…
mkuchenbecker May 12, 2026
52ba858
refactor(optimizer-analyzer): rename OrphanFilesDeletionAnalyzer → Ca…
mkuchenbecker May 12, 2026
beedad8
fix(optimizer-analyzer): update class name inside renamed files
mkuchenbecker May 12, 2026
e4a1ad1
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 13, 2026
f663537
docs(optimizer-analyzer): add scale roadmap as block comment
mkuchenbecker May 13, 2026
d7e3a65
docs(optimizer-analyzer): move scale roadmap to BDP-102182
mkuchenbecker May 13, 2026
a5585f4
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 13, 2026
dd4faf2
refactor(optimizer-analyzer): address PR review — required op, per-db…
mkuchenbecker May 13, 2026
91ba362
style(optimizer-analyzer): tighten AnalyzerRunner.analyze body
mkuchenbecker May 13, 2026
35bcd38
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
0dbe3d9
refactor(optimizer-analyzer): import shared model + explanatory comment
mkuchenbecker May 14, 2026
d44783f
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
a5df7e4
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
b0898e3
refactor(optimizer-analyzer): depend on :services:optimizer
mkuchenbecker May 14, 2026
fb71bd9
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
7ea8868
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
d7767e8
fix(optimizer-analyzer): rewrite AnalyzerRunnerTest to use entity bui…
mkuchenbecker May 14, 2026
4a8796c
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
ef453ca
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
ad11533
refactor(optimizer-analyzer): consume model/ types and ModelDbMapper
mkuchenbecker May 14, 2026
51dab67
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
619df83
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
0b30130
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
6a23755
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 14, 2026
95456be
refactor(optimizer-analyzer): use type to/from methods; drop ModelDbM…
mkuchenbecker May 14, 2026
d82c17f
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 15, 2026
1d56fa6
Merge branch 'mkuchenb/optimizer-2' into mkuchenb/optimizer-3
mkuchenbecker May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions apps/optimizer-analyzer/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id 'openhouse.springboot-ext-conventions'
id 'org.springframework.boot' version '2.7.8'
}

dependencies {
implementation project(':services:optimizer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8'
runtimeOnly 'mysql:mysql-connector-java:8.0.33'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
testImplementation 'com.squareup.okhttp3:mockwebserver:4.10.0'
testRuntimeOnly 'com.h2database:h2'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.openhouse.analyzer;

import java.util.List;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

/** Entry point for the Optimizer Analyzer application. */
@SpringBootApplication
@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.entity")
@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository")
public class AnalyzerApplication {

public static void main(String[] args) {
SpringApplication.run(AnalyzerApplication.class, args);
}

/**
* Runs the analyzer once per registered {@link OperationAnalyzer} per process invocation. Each
* call is scoped to one operation type; the runner iterates databases internally.
*/
@Bean
public CommandLineRunner run(AnalyzerRunner runner, List<OperationAnalyzer> analyzers) {
return args -> analyzers.forEach(a -> runner.analyze(a.getOperationType()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package com.linkedin.openhouse.analyzer;

import com.linkedin.openhouse.optimizer.model.OperationType;
import com.linkedin.openhouse.optimizer.model.Table;
import com.linkedin.openhouse.optimizer.model.TableOperation;
import com.linkedin.openhouse.optimizer.model.TableOperationsHistory;
import com.linkedin.openhouse.optimizer.repository.TableOperationsHistoryRepository;
import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository;
import com.linkedin.openhouse.optimizer.repository.TableStatsRepository;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* Core analysis loop. For one operation type per call, iterates databases and evaluates each table
* in a database against the matching {@link OperationAnalyzer}.
*
* <p>Both sides of the join — current operations and latest history per (table, type) — are loaded
* into maps once per database before the table loop. This is correct at small scale (≤~100k
* tables); past that the per-db query shape and projection need further tuning. Scale-up work is
* tracked in <a href="https://linkedin.atlassian.net/browse/BDP-102182">BDP-102182</a>.
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AnalyzerRunner {

private final List<OperationAnalyzer> analyzers;
private final TableStatsRepository statsRepo;
private final TableOperationsRepository operationsRepo;
private final TableOperationsHistoryRepository historyRepo;

/**
* Run the analysis loop for {@code operationType} across all databases, with no filters.
* Equivalent to {@link #analyze(OperationType, Optional, Optional, Optional)} with all-empty
* filters.
*/
public void analyze(OperationType operationType) {
analyze(operationType, Optional.empty(), Optional.empty(), Optional.empty());
}

/**
* Run the analysis loop for the given operation type, optionally scoped to a single database,
* table name, or table UUID. Iterates databases one at a time so the working set is bounded by
* tables-per-db, not tables-total.
*/
public void analyze(
OperationType operationType,
Optional<String> databaseName,
Optional<String> tableName,
Optional<String> tableUuid) {
Optional<OperationAnalyzer> analyzerOpt =
analyzers.stream().filter(a -> a.getOperationType() == operationType).findFirst();
if (analyzerOpt.isEmpty()) {
log.warn("No analyzer registered for operation type {}; skipping", operationType);
return;
}
Comment on lines +60 to +63
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impossible this is false

OperationAnalyzer analyzer = analyzerOpt.get();
List<String> dbs = databaseName.map(List::of).orElseGet(statsRepo::findDistinctDatabaseNames);
log.info("Analyzing {} across {} database(s)", operationType, dbs.size());
dbs.forEach(db -> analyzeDatabase(analyzer, db, tableName, tableUuid));
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.get is free. OperationAnalyzer analyzer = analyzerOpt.get();

log.info("Analysis complete for {}", operationType);
}

private void analyzeDatabase(
OperationAnalyzer analyzer,
String databaseName,
Optional<String> tableName,
Optional<String> tableUuid) {

com.linkedin.openhouse.optimizer.db.OperationType dbOperationType =
Copy link
Copy Markdown
Collaborator Author

@mkuchenbecker mkuchenbecker May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import vs using fully qualified names. apply the rule across all prs in the stack. make a plan.

analyzer.getOperationType().toDb();

// Pre-load the small sides of the joins — bounded by tables in this database.
Map<String, TableOperation> currentOps =
operationsRepo
.find(
dbOperationType, null, tableUuid.orElse(null), databaseName, tableName.orElse(null))
.stream()
.filter(e -> e.getTableUuid() != null)
.map(TableOperation::fromRow)
.collect(
Collectors.toMap(
TableOperation::getTableUuid, op -> op, TableOperation::mostRecent));

Map<String, TableOperationsHistory> latestHistory =
historyRepo.findLatestPerTable(dbOperationType).stream()
.filter(r -> r.getTableUuid() != null)
.map(TableOperationsHistory::fromRow)
.collect(
Collectors.toMap(
TableOperationsHistory::getTableUuid,
h -> h,
AnalyzerRunner::moreRecentHistory));

List<Table> tables =
statsRepo.find(databaseName, tableName.orElse(null), tableUuid.orElse(null)).stream()
.filter(row -> row.getTableUuid() != null)
.map(Table::fromRow)
.collect(Collectors.toList());

/*
* For each table in this database, decide whether to create a new PENDING operation.
*
* 1. Skip tables not opted in to this operation type. The opt-in check today reads a
* table-property flag; in the future it will read a denormalized column.
* 2. Look up the table's current active operation (if any) and its most recent completed
* history entry from the maps loaded above.
* 3. Delegate the schedule-or-not decision to the analyzer's shouldSchedule — strategy
* encapsulates cadence, retry policy, and any future per-operation signals.
* 4. On true, persist a new PENDING operation. The scheduler picks it up on its next pass.
*/
tables.forEach(
Comment thread
mkuchenbecker marked this conversation as resolved.
table -> {
if (!analyzer.isEnabled(table)) {
return;
}
Optional<TableOperation> currentOp =
Optional.ofNullable(currentOps.get(table.getTableUuid()));
Optional<TableOperationsHistory> entry =
Optional.ofNullable(latestHistory.get(table.getTableUuid()));
if (analyzer.shouldSchedule(table, currentOp, entry)) {
TableOperation op = TableOperation.pending(table, analyzer.getOperationType());
operationsRepo.save(op.toRow());
log.info(
"Created PENDING {} operation for table {}.{}",
analyzer.getOperationType(),
table.getDatabaseName(),
table.getTableId());
}
});
}

private static TableOperationsHistory moreRecentHistory(
TableOperationsHistory a, TableOperationsHistory b) {
Comparator<TableOperationsHistory> byCompletedAt =
Comparator.comparing(r -> r.getCompletedAt() != null ? r.getCompletedAt() : Instant.EPOCH);
return byCompletedAt.compare(a, b) >= 0 ? a : b;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.openhouse.analyzer;

import com.linkedin.openhouse.optimizer.model.OperationType;
import com.linkedin.openhouse.optimizer.model.Table;
import com.linkedin.openhouse.optimizer.model.TableOperation;
import com.linkedin.openhouse.optimizer.model.TableOperationsHistory;
import java.time.Duration;
import java.util.Optional;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/** Analyzer for the {@link OperationType#ORPHAN_FILES_DELETION} operation type. */
@Component
public class CadenceBasedOrphanFilesDeletionAnalyzer implements OperationAnalyzer {

static final String OFD_ENABLED_PROPERTY = "maintenance.optimizer.ofd.enabled";

private final CadencePolicy cadencePolicy;

@Autowired
public CadenceBasedOrphanFilesDeletionAnalyzer(
@Value("${ofd.success-retry-hours:24}") long successRetryHours,
@Value("${ofd.failure-retry-hours:1}") long failureRetryHours) {
this.cadencePolicy =
new CadencePolicy(Duration.ofHours(successRetryHours), Duration.ofHours(failureRetryHours));
}

/** Package-private for tests that supply a pre-built {@link CadencePolicy}. */
CadenceBasedOrphanFilesDeletionAnalyzer(CadencePolicy cadencePolicy) {
this.cadencePolicy = cadencePolicy;
}

@Override
public OperationType getOperationType() {
return OperationType.ORPHAN_FILES_DELETION;
}

@Override
public boolean isEnabled(Table table) {
return "true".equals(table.getTableProperties().get(OFD_ENABLED_PROPERTY));
}

@Override
public boolean shouldSchedule(
Table table,
Optional<TableOperation> currentOp,
Optional<TableOperationsHistory> latestHistory) {
return cadencePolicy.shouldSchedule(currentOp, latestHistory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.linkedin.openhouse.analyzer;

import com.linkedin.openhouse.optimizer.model.HistoryStatus;
import com.linkedin.openhouse.optimizer.model.OperationStatus;
import com.linkedin.openhouse.optimizer.model.TableOperation;
import com.linkedin.openhouse.optimizer.model.TableOperationsHistory;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import lombok.RequiredArgsConstructor;

/**
* Time-based scheduling policy. An analyzer delegates to {@link CadencePolicy} to decide whether to
* re-issue a recommendation for a table.
*
* <p>The analyzer stays out of any table that already has a non-CANCELED active operation — those
* belong to the scheduler. For tables with no active operation (or only a CANCELED one), the
* decision is based on the most recent completed-history entry: re-evaluate after {@code
* successRetryInterval} on success, or after {@code failureRetryInterval} on failure.
*/
@RequiredArgsConstructor
public class CadencePolicy {

/**
* How long to wait after a successful operation before re-evaluating the table. For example, if
* set to 24 hours and OFD succeeded at 10:00 AM Monday, the table won't be scheduled again until
* after 10:00 AM Tuesday.
*/
private final Duration successRetryInterval;

/**
* How long to wait after a failed operation before retrying. Shorter than the success interval to
* allow quick recovery. For example, if set to 1 hour and OFD failed at 2:00 PM, the table
* becomes eligible for retry at 3:00 PM.
*/
private final Duration failureRetryInterval;

/**
* Returns {@code true} if a new or refreshed operation record should be upserted.
*
* @param currentOp the existing active operation record, or empty if none exists
* @param latestHistory the most recent history entry for this (table, type), or empty
*/
public boolean shouldSchedule(
Optional<TableOperation> currentOp, Optional<TableOperationsHistory> latestHistory) {
if (currentOp.isPresent() && currentOp.get().getStatus() != OperationStatus.CANCELED) {
return false;
}
return latestHistory.map(this::readyAfterHistoryEntry).orElse(true);
}

private boolean readyAfterHistoryEntry(TableOperationsHistory entry) {
Duration interval =
entry.getStatus() == HistoryStatus.FAILED ? failureRetryInterval : successRetryInterval;
return Duration.between(entry.getCompletedAt(), Instant.now()).compareTo(interval) > 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.openhouse.analyzer;

import com.linkedin.openhouse.optimizer.model.OperationType;
import com.linkedin.openhouse.optimizer.model.Table;
import com.linkedin.openhouse.optimizer.model.TableOperation;
import com.linkedin.openhouse.optimizer.model.TableOperationsHistory;
import java.util.Optional;

/**
* Strategy interface for a single operation type. Each implementation decides whether a given table
* needs an operation recommendation upserted in the Optimizer Service.
*
* <p>// TODO(circuit-breaker): a chronically-failing table currently produces a new PENDING row on
* every Analyzer pass. Add a circuit breaker that suppresses scheduling for a (table, type) after N
* consecutive FAILED history entries. Requirements: configurable threshold per operation type,
* automatic reset via exponential backoff so tables can recover, and an operator-visible signal
* (metric or query) so tripped breakers are diagnosable.
*/
public interface OperationAnalyzer {

/** The operation type this analyzer handles. */
OperationType getOperationType();

/**
* Returns {@code true} if this operation is opted-in for the given table. Tables that return
* {@code false} are skipped entirely — no upsert is issued.
*/
boolean isEnabled(Table table);

/**
* Returns {@code true} if a new or refreshed operation record should be upserted.
*
* @param table the table entry
* @param currentOp the existing active operation record, or empty if none exists
* @param latestHistory the most recent history entry for this (table, type), or empty
*/
boolean shouldSchedule(
Table table,
Optional<TableOperation> currentOp,
Optional<TableOperationsHistory> latestHistory);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
spring.application.name=openhouse-optimizer-analyzer
spring.main.web-application-type=none
spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:analyzerdb;DB_CLOSE_DELAY=-1;MODE=MySQL}
spring.datasource.username=${OPTIMIZER_DB_USER:sa}
spring.datasource.password=${OPTIMIZER_DB_PASSWORD:}
spring.jpa.hibernate.ddl-auto=none
ofd.success-retry-hours=24
ofd.failure-retry-hours=1
Loading