Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
e1fbbfc
feat(optimizer): add scheduler app — claims PENDING ops and submits S…
mkuchenbecker Apr 7, 2026
185a7bd
Merge mkuchenb/optimizer-3 into optimizer-4
mkuchenbecker Apr 7, 2026
aa0ba6b
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker Apr 30, 2026
275ec88
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 1, 2026
d4b9bca
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 1, 2026
3e03fdb
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 1, 2026
6e44c50
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
a9104ae
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
cbddc5d
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
790994e
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
9913b77
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
f63a952
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 12, 2026
8ad9cac
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 13, 2026
0069b92
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 13, 2026
0668486
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 13, 2026
b315c11
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 13, 2026
e80f9ce
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
ded97d0
refactor(optimizer-scheduler): per-op BinPacker interface, TableOpera…
mkuchenbecker May 14, 2026
ef78c17
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
7a0d6d2
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
8142af3
refactor(optimizer-scheduler): depend on :services:optimizer; drop de…
mkuchenbecker May 14, 2026
3ec0b0a
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
949e814
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
0c36ff2
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
3a4f0a8
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
3fee7ab
refactor(optimizer-scheduler): consume model/ types and ModelDbMapper
mkuchenbecker May 14, 2026
a04cad6
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
e183906
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
b2fd321
fix(optimizer-scheduler): drop .version(0L) from SchedulerRunnerTest
mkuchenbecker May 14, 2026
b9620e3
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 14, 2026
de9b0e1
refactor(optimizer-scheduler): use type to/from methods; drop ModelDb…
mkuchenbecker May 14, 2026
f74c6f9
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 15, 2026
d0ec73e
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 15, 2026
2903e0b
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 18, 2026
9ad8861
refactor(scheduler): TableStats at BinPacker interface; Bin owns its …
mkuchenbecker May 18, 2026
a325684
refactor(scheduler): BinPacker takes List<SchedulingCandidate> pairs
mkuchenbecker May 18, 2026
9e5fbae
style(scheduler): convert FileCountBinPacker for-loops to functional …
mkuchenbecker May 18, 2026
7dd1e97
refactor(scheduler): final resultsEndpoint, throw on missing packer, …
mkuchenbecker May 18, 2026
7935f43
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 18, 2026
c73d8cb
fix(scheduler): only launch + mark SCHEDULED the ops actually claimed
mkuchenbecker May 18, 2026
ad1bf4c
feat(scheduler): add SingletonBinPacker; register for SNAPSHOT_EXPIRA…
mkuchenbecker May 18, 2026
d9ffe48
docs(scheduler): strip editorial commentary from SingletonBinPacker j…
mkuchenbecker May 18, 2026
ad88893
Revert "docs(scheduler): strip editorial commentary from SingletonBin…
mkuchenbecker May 18, 2026
ac7f84d
Revert "feat(scheduler): add SingletonBinPacker; register for SNAPSHO…
mkuchenbecker May 18, 2026
e492f7c
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 19, 2026
9a0ab09
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 19, 2026
d315227
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 19, 2026
bb52e7a
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 20, 2026
0b87381
fix(scheduler): point @EntityScan at the actual db package
mkuchenbecker May 20, 2026
428cb17
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 20, 2026
f6c4674
refactor(scheduler): update model refs after Dto suffix swap
mkuchenbecker May 20, 2026
5cbdbcb
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 20, 2026
a89a600
fix(scheduler): four runtime gaps surfaced by docker e2e
mkuchenbecker May 20, 2026
c99bc3a
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 20, 2026
6dd07bd
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 21, 2026
66aa3e7
refactor(scheduler): switch to Optional repo API + dedup per cycle
mkuchenbecker May 21, 2026
231320c
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 21, 2026
3309983
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 21, 2026
e8d2c7a
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 22, 2026
1866033
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 22, 2026
020e94a
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 22, 2026
d7dead0
Merge branch 'mkuchenb/optimizer-3' into mkuchenb/optimizer-4
mkuchenbecker May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions apps/optimizer-scheduler/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
id 'openhouse.springboot-ext-conventions'
id 'org.springframework.boot' version '2.7.8'
}

dependencies {
implementation project(':services:optimizer')
implementation 'org.springframework.boot:spring-boot-starter:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8'
implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8'
runtimeOnly 'mysql:mysql-connector-java:8.0.33'
testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8'
testRuntimeOnly 'com.h2database:h2'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
import com.linkedin.openhouse.optimizer.model.TableOperationDto;
import com.linkedin.openhouse.scheduler.client.JobsServiceClient;
import java.time.Instant;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* A set of operations the scheduler will submit together as a single Spark job. A bin owns its own
* launch — callers ask it to schedule itself and react to the returned job id. The surrounding
* status-update machinery (claim, mark-scheduled, revert-to-pending) lives in the scheduler because
* it is shared across all bins regardless of operation type.
*/
@RequiredArgsConstructor
public class Bin {

@Getter private final OperationTypeDto operationType;
@Getter private final List<TableOperationDto> operations;

/** Operation UUIDs in this bin, parallel to {@link #getTableNames()}. */
public List<String> getOperationIds() {
return operations.stream().map(TableOperationDto::getId).collect(Collectors.toList());
}

/** Fully-qualified {@code database.table} identifiers for the operations in this bin. */
public List<String> getTableNames() {
return operations.stream()
.map(op -> op.getDatabaseName() + "." + op.getTableName())
.collect(Collectors.toList());
}

/**
* Return a new {@link Bin} containing only the operations whose IDs are in {@code keepIds}. Used
* by the scheduler to narrow the bin to the rows it actually claimed before launching the job.
*/
public Bin subset(Collection<String> keepIds) {
Set<String> keep = new HashSet<>(keepIds);
List<TableOperationDto> filtered =
operations.stream().filter(op -> keep.contains(op.getId())).collect(Collectors.toList());
return new Bin(operationType, filtered);
}

/**
* Submit this bin as a single Spark job. Returns the job id on success, or empty on submission
* failure — the caller is responsible for the surrounding status updates.
*/
public Optional<String> schedule(JobsServiceClient client, String resultsEndpoint) {
String jobName =
"batched-" + operationType.name().toLowerCase() + "-" + Instant.now().toEpochMilli();
return client.launch(
jobName, operationType.name(), getTableNames(), getOperationIds(), resultsEndpoint);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.model.TableStatsDto;
import java.util.List;

/**
* Strategy for packing a set of operations into bins for batched job submission. Implementations
* encode the constraints of a particular packing dimension (file count, partition count, etc.);
* binding to an operation type is the responsibility of the scheduler configuration, not the
* strategy class.
*
* <p>{@link TableStatsDto} is the cost source at the interface boundary, carried alongside each
* operation in a {@link SchedulingCandidate}. Implementations project the stats down to the minimal
* data needed to make their packing decision (e.g. file count for OFD) and do not retain the full
* stats payload in the returned bins.
*/
public interface BinPacker {

/**
* Pack {@code pending} into one or more {@link Bin}s. Each returned bin is non-empty; the
* scheduler dispatches one Spark job per bin.
*/
List<Bin> pack(List<SchedulingCandidate> pending);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
import com.linkedin.openhouse.optimizer.model.TableOperationDto;
import com.linkedin.openhouse.optimizer.model.TableStatsDto;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;

/**
* Greedy first-fit-descending bin-packer keyed on per-table file count, projected from each
* candidate's {@link TableStatsDto}.
*
* <p>Candidates are sorted by descending file count, then assigned to the first bin whose running
* total stays at or below {@code maxFilesPerBin}. An operation larger than the limit gets its own
* bin (oversized bins are allowed — we never drop an operation).
*/
@RequiredArgsConstructor
public class FileCountBinPacker implements BinPacker {

private final OperationTypeDto operationType;
private final long maxFilesPerBin;

@Override
public List<Bin> pack(List<SchedulingCandidate> pending) {
if (pending.isEmpty()) {
return List.of();
}

// Project once: each candidate's packing cost is just a long, keyed by operation id.
Map<String, Long> costByOperationId =
pending.stream()
.collect(Collectors.toMap(c -> c.getOperation().getId(), c -> cost(c.getStats())));

List<TableOperationDto> sorted =
pending.stream()
.map(SchedulingCandidate::getOperation)
.sorted(
Comparator.comparingLong(
(TableOperationDto op) -> costByOperationId.get(op.getId()))
.reversed())
.collect(Collectors.toList());

// First-fit-descending is inherently stateful — each placement depends on the running totals
// for bins assembled so far.
List<List<TableOperationDto>> binContents = new ArrayList<>();
List<Long> binTotals = new ArrayList<>();
sorted.forEach(
op -> {
long c = costByOperationId.get(op.getId());
OptionalInt placed =
IntStream.range(0, binContents.size())
.filter(i -> binTotals.get(i) + c <= maxFilesPerBin || binTotals.get(i) == 0)
.findFirst();
if (placed.isPresent()) {
int idx = placed.getAsInt();
binContents.get(idx).add(op);
binTotals.set(idx, binTotals.get(idx) + c);
} else {
List<TableOperationDto> newBin = new ArrayList<>();
newBin.add(op);
binContents.add(newBin);
binTotals.add(c);
}
});

return binContents.stream()
.map(ops -> new Bin(operationType, ops))
.collect(Collectors.toList());
}

private static long cost(TableStatsDto stats) {
if (stats == null || stats.getSnapshot() == null) {
return 0L;
}
Long n = stats.getSnapshot().getNumCurrentFiles();
return n != null ? n : 0L;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.model.OperationTypeDto;
import java.util.Map;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.context.annotation.Bean;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

/** Entry point for the Optimizer Scheduler application. */
@SpringBootApplication
@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db")
@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository")
public class SchedulerApplication {

public static void main(String[] args) {
SpringApplication.run(SchedulerApplication.class, args);
}

/**
* Runs the scheduler once per registered {@link BinPacker} per process invocation. Each call is
* scoped to one operation type.
*/
@Bean
public CommandLineRunner run(
SchedulerRunner runner, Map<OperationTypeDto, BinPacker> binPackers) {
return args -> binPackers.keySet().forEach(runner::schedule);
}
}
Loading