diff --git a/apps/optimizer-scheduler/build.gradle b/apps/optimizer-scheduler/build.gradle new file mode 100644 index 000000000..48def1c0c --- /dev/null +++ b/apps/optimizer-scheduler/build.gradle @@ -0,0 +1,19 @@ +plugins { + id 'openhouse.springboot-ext-conventions' + id 'org.springframework.boot' version '2.7.8' +} + +dependencies { + implementation project(':services:optimizer') + implementation 'org.springframework.boot:spring-boot-starter:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-webflux:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-data-jpa:2.7.8' + implementation 'org.springframework.boot:spring-boot-starter-aop:2.7.8' + runtimeOnly 'mysql:mysql-connector-java:8.0.33' + testImplementation 'org.springframework.boot:spring-boot-starter-test:2.7.8' + testRuntimeOnly 'com.h2database:h2' +} + +test { + useJUnitPlatform() +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/Bin.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/Bin.java new file mode 100644 index 000000000..f72a3eaa8 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/Bin.java @@ -0,0 +1,61 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +/** + * A set of operations the scheduler will submit together as a single Spark job. A bin owns its own + * launch — callers ask it to schedule itself and react to the returned job id. The surrounding + * status-update machinery (claim, mark-scheduled, revert-to-pending) lives in the scheduler because + * it is shared across all bins regardless of operation type. + */ +@RequiredArgsConstructor +public class Bin { + + @Getter private final OperationTypeDto operationType; + @Getter private final List operations; + + /** Operation UUIDs in this bin, parallel to {@link #getTableNames()}. */ + public List getOperationIds() { + return operations.stream().map(TableOperationDto::getId).collect(Collectors.toList()); + } + + /** Fully-qualified {@code database.table} identifiers for the operations in this bin. */ + public List getTableNames() { + return operations.stream() + .map(op -> op.getDatabaseName() + "." + op.getTableName()) + .collect(Collectors.toList()); + } + + /** + * Return a new {@link Bin} containing only the operations whose IDs are in {@code keepIds}. Used + * by the scheduler to narrow the bin to the rows it actually claimed before launching the job. + */ + public Bin subset(Collection keepIds) { + Set keep = new HashSet<>(keepIds); + List filtered = + operations.stream().filter(op -> keep.contains(op.getId())).collect(Collectors.toList()); + return new Bin(operationType, filtered); + } + + /** + * Submit this bin as a single Spark job. Returns the job id on success, or empty on submission + * failure — the caller is responsible for the surrounding status updates. + */ + public Optional schedule(JobsServiceClient client, String resultsEndpoint) { + String jobName = + "batched-" + operationType.name().toLowerCase() + "-" + Instant.now().toEpochMilli(); + return client.launch( + jobName, operationType.name(), getTableNames(), getOperationIds(), resultsEndpoint); + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java new file mode 100644 index 000000000..504c2b910 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/BinPacker.java @@ -0,0 +1,24 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.List; + +/** + * Strategy for packing a set of operations into bins for batched job submission. Implementations + * encode the constraints of a particular packing dimension (file count, partition count, etc.); + * binding to an operation type is the responsibility of the scheduler configuration, not the + * strategy class. + * + *

{@link TableStatsDto} is the cost source at the interface boundary, carried alongside each + * operation in a {@link SchedulingCandidate}. Implementations project the stats down to the minimal + * data needed to make their packing decision (e.g. file count for OFD) and do not retain the full + * stats payload in the returned bins. + */ +public interface BinPacker { + + /** + * Pack {@code pending} into one or more {@link Bin}s. Each returned bin is non-empty; the + * scheduler dispatches one Spark job per bin. + */ + List pack(List pending); +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/FileCountBinPacker.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/FileCountBinPacker.java new file mode 100644 index 000000000..c8c5e83fe --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/FileCountBinPacker.java @@ -0,0 +1,84 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import lombok.RequiredArgsConstructor; + +/** + * Greedy first-fit-descending bin-packer keyed on per-table file count, projected from each + * candidate's {@link TableStatsDto}. + * + *

Candidates are sorted by descending file count, then assigned to the first bin whose running + * total stays at or below {@code maxFilesPerBin}. An operation larger than the limit gets its own + * bin (oversized bins are allowed — we never drop an operation). + */ +@RequiredArgsConstructor +public class FileCountBinPacker implements BinPacker { + + private final OperationTypeDto operationType; + private final long maxFilesPerBin; + + @Override + public List pack(List pending) { + if (pending.isEmpty()) { + return List.of(); + } + + // Project once: each candidate's packing cost is just a long, keyed by operation id. + Map costByOperationId = + pending.stream() + .collect(Collectors.toMap(c -> c.getOperation().getId(), c -> cost(c.getStats()))); + + List sorted = + pending.stream() + .map(SchedulingCandidate::getOperation) + .sorted( + Comparator.comparingLong( + (TableOperationDto op) -> costByOperationId.get(op.getId())) + .reversed()) + .collect(Collectors.toList()); + + // First-fit-descending is inherently stateful — each placement depends on the running totals + // for bins assembled so far. + List> binContents = new ArrayList<>(); + List binTotals = new ArrayList<>(); + sorted.forEach( + op -> { + long c = costByOperationId.get(op.getId()); + OptionalInt placed = + IntStream.range(0, binContents.size()) + .filter(i -> binTotals.get(i) + c <= maxFilesPerBin || binTotals.get(i) == 0) + .findFirst(); + if (placed.isPresent()) { + int idx = placed.getAsInt(); + binContents.get(idx).add(op); + binTotals.set(idx, binTotals.get(idx) + c); + } else { + List newBin = new ArrayList<>(); + newBin.add(op); + binContents.add(newBin); + binTotals.add(c); + } + }); + + return binContents.stream() + .map(ops -> new Bin(operationType, ops)) + .collect(Collectors.toList()); + } + + private static long cost(TableStatsDto stats) { + if (stats == null || stats.getSnapshot() == null) { + return 0L; + } + Long n = stats.getSnapshot().getNumCurrentFiles(); + return n != null ? n : 0L; + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java new file mode 100644 index 000000000..9b6cb4579 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerApplication.java @@ -0,0 +1,31 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import java.util.Map; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.domain.EntityScan; +import org.springframework.context.annotation.Bean; +import org.springframework.data.jpa.repository.config.EnableJpaRepositories; + +/** Entry point for the Optimizer Scheduler application. */ +@SpringBootApplication +@EntityScan(basePackages = "com.linkedin.openhouse.optimizer.db") +@EnableJpaRepositories(basePackages = "com.linkedin.openhouse.optimizer.repository") +public class SchedulerApplication { + + public static void main(String[] args) { + SpringApplication.run(SchedulerApplication.class, args); + } + + /** + * Runs the scheduler once per registered {@link BinPacker} per process invocation. Each call is + * scoped to one operation type. + */ + @Bean + public CommandLineRunner run( + SchedulerRunner runner, Map binPackers) { + return args -> binPackers.keySet().forEach(runner::schedule); + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java new file mode 100644 index 000000000..25cee0f46 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulerRunner.java @@ -0,0 +1,246 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.db.OperationStatus; +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.data.domain.PageRequest; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +/** + * For one operation type per call, reads PENDING rows, looks up per-table stats, dispatches to the + * registered {@link BinPacker}, and submits one Spark job per returned {@link Bin}. The {@link + * com.linkedin.openhouse.scheduler.SchedulerApplication}'s CommandLineRunner loops over the + * registered packers and invokes {@code schedule(opType)} for each. + */ +@Slf4j +@Component +public class SchedulerRunner { + private final TableOperationsRepository operationsRepo; + private final TableStatsRepository statsRepo; + private final JobsServiceClient jobsClient; + private final Map binPackers; + private final String resultsEndpoint; + + @Value("${optimizer.repo.default-limit:10000}") + private int defaultLimit = 10_000; + + public SchedulerRunner( + TableOperationsRepository operationsRepo, + TableStatsRepository statsRepo, + JobsServiceClient jobsClient, + Map binPackers, + @Value("${scheduler.results-endpoint}") String resultsEndpoint) { + this.operationsRepo = operationsRepo; + this.statsRepo = statsRepo; + this.jobsClient = jobsClient; + this.binPackers = binPackers; + this.resultsEndpoint = resultsEndpoint; + } + + /** Schedule all PENDING operations of the given type across all databases. */ + @Transactional + public void schedule(OperationTypeDto operationType) { + schedule(operationType, Optional.empty(), Optional.empty()); + } + + /** + * Schedule PENDING operations for {@code operationType}, optionally scoped to a single database + * or table name. + */ + @Transactional + public void schedule( + OperationTypeDto operationType, Optional databaseName, Optional tableName) { + + BinPacker packer = binPackers.get(operationType); + if (packer == null) { + throw new IllegalStateException( + "No BinPacker registered for operation type " + operationType); + } + + PageRequest page = PageRequest.of(0, defaultLimit); + List pendingRows = + operationsRepo.find( + Optional.of(operationType.toDb()), + Optional.of(OperationStatus.PENDING), + Optional.empty(), + databaseName, + tableName, + Optional.empty(), + Optional.empty(), + page); + if (pendingRows.isEmpty()) { + log.info("No PENDING operations of type {}; nothing to schedule", operationType); + return; + } + + // Deduplicate before claiming: if multiple PENDING rows exist for the same tableUuid, keep + // the oldest (lex-tiebreak on id) and cancel the rest. Per-cycle, not per-bin — running this + // inside the bin loop nuked rows belonging to other bins of the same cycle. + List survivors = cancelDuplicates(pendingRows); + if (survivors.isEmpty()) { + return; + } + + List pending = + survivors.stream().map(TableOperationDto::fromRow).collect(Collectors.toList()); + + // Tradeoff: we fetch fresh table_stats per scheduling cycle (one batched query) rather than + // denormalizing the relevant fields onto TableOperationDto. The denormalized alternative would + // remove the per-cycle lookup but widen the TableOperationDto row and serve staler data; the + // current shape favors smaller operations + freshness over fewer queries. + Set uuids = + pending.stream().map(TableOperationDto::getTableUuid).collect(Collectors.toSet()); + Map statsByUuid = + statsRepo.findAllById(uuids).stream() + .collect(Collectors.toMap(TableStatsRow::getTableUuid, TableStatsDto::fromRow)); + + // Filter at the boundary so SchedulingCandidate.stats is guaranteed non-null. A table without + // a stats row gets skipped this cycle and reconsidered after stats land. + List withStats = + pending.stream() + .filter(op -> statsByUuid.containsKey(op.getTableUuid())) + .collect(Collectors.toList()); + if (withStats.size() < pending.size()) { + log.warn( + "Skipped {} {} operations with no table_stats row", + pending.size() - withStats.size(), + operationType); + } + if (withStats.isEmpty()) { + return; + } + + List candidates = + withStats.stream() + .map(op -> new SchedulingCandidate(op, statsByUuid.get(op.getTableUuid()))) + .collect(Collectors.toList()); + + List bins = packer.pack(candidates); + log.info( + "Packed {} PENDING {} operations into {} bins", + candidates.size(), + operationType, + bins.size()); + + bins.forEach(this::submitBin); + } + + /** + * Group {@code pendingRows} by {@code tableUuid}; for any group with more than one row, cancel + * all but the oldest (lex-tiebreak on id). Returns the survivors in input order. Deterministic. + */ + private List cancelDuplicates(List pendingRows) { + Map> byTableUuid = + pendingRows.stream().collect(Collectors.groupingBy(TableOperationsRow::getTableUuid)); + + List duplicateIds = + byTableUuid.values().stream() + .filter(rows -> rows.size() > 1) + .flatMap( + rows -> + rows.stream() + .sorted( + Comparator.comparing(TableOperationsRow::getCreatedAt) + .thenComparing(TableOperationsRow::getId)) + .skip(1)) + .map(TableOperationsRow::getId) + .collect(Collectors.toList()); + + if (duplicateIds.isEmpty()) { + return pendingRows; + } + + int cancelled = operationsRepo.cancel(duplicateIds); + log.warn("Cancelled {} duplicate PENDING rows", cancelled); + + Set cancelledIds = Set.copyOf(duplicateIds); + return pendingRows.stream() + .filter(r -> !cancelledIds.contains(r.getId())) + .collect(Collectors.toList()); + } + + private void submitBin(Bin bin) { + List ids = bin.getOperationIds(); + + // Claim the rows in one batched UPDATE: PENDING → SCHEDULING. The UPDATE's row count is just + // an aggregate — to know *which* rows we own, re-query for SCHEDULING rows tagged with our + // scheduledAt watermark. Anything not in that subset belongs to another instance or was + // canceled, and must not be submitted or marked SCHEDULED. + Instant claimedAt = Instant.now(); + operationsRepo.updateBatch( + ids, + OperationStatus.PENDING, + OperationStatus.SCHEDULING, + Optional.of(claimedAt), + Optional.empty()); + List claimedIds = + operationsRepo + .find( + Optional.empty(), + Optional.of(OperationStatus.SCHEDULING), + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.of(claimedAt), + Optional.of(ids), + PageRequest.of(0, defaultLimit)) + .stream() + .map(TableOperationsRow::getId) + .collect(Collectors.toList()); + if (claimedIds.isEmpty()) { + log.info("All rows in bin already claimed by another scheduler instance; skipping"); + return; + } + if (claimedIds.size() < ids.size()) { + log.info( + "Partial claim: {} of {} ops in bin claimed; launching job for claimed subset only", + claimedIds.size(), + ids.size()); + } + + Bin claimedBin = bin.subset(claimedIds); + Optional jobId = claimedBin.schedule(jobsClient, resultsEndpoint); + if (jobId.isPresent()) { + int updated = + operationsRepo.updateBatch( + claimedIds, + OperationStatus.SCHEDULING, + OperationStatus.SCHEDULED, + Optional.empty(), + Optional.of(jobId.get())); + log.info( + "Submitted job {} for {} tables ({} rows marked SCHEDULED)", + jobId.get(), + claimedBin.getOperations().size(), + updated); + } else { + int reverted = + operationsRepo.updateBatch( + claimedIds, + OperationStatus.SCHEDULING, + OperationStatus.PENDING, + Optional.empty(), + Optional.empty()); + log.warn( + "Job submission failed; reverted {} claimed rows back to PENDING for retry on the next" + + " pass", + reverted); + } + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulingCandidate.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulingCandidate.java new file mode 100644 index 000000000..d375e6a6f --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/SchedulingCandidate.java @@ -0,0 +1,19 @@ +package com.linkedin.openhouse.scheduler; + +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import lombok.NonNull; +import lombok.Value; + +/** + * A pending operation paired with the stats the bin packer will use as its cost source. Built by + * the scheduler at scheduling time and handed to the {@link BinPacker} as the unit of packing. + * + *

Both fields are non-null. The scheduler filters out operations whose tables have no stats row + * before constructing candidates. + */ +@Value +public class SchedulingCandidate { + @NonNull TableOperationDto operation; + @NonNull TableStatsDto stats; +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java new file mode 100644 index 000000000..76a96f957 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/client/JobsServiceClient.java @@ -0,0 +1,80 @@ +package com.linkedin.openhouse.scheduler.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Client for the OpenHouse Jobs Service. + * + *

Submits one {@code BatchedOrphanFilesDeletionSparkApp} job per bin via {@code POST /jobs}. + */ +@Slf4j +public class JobsServiceClient { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final Duration TIMEOUT = Duration.ofSeconds(30); + + private final WebClient webClient; + private final String clusterId; + + public JobsServiceClient(WebClient webClient, String clusterId) { + this.webClient = webClient; + this.clusterId = clusterId; + } + + /** + * Submit a batched Spark job for the given tables. + * + * @param jobName human-readable name for the job + * @param jobType operation type string (e.g. "ORPHAN_FILES_DELETION") + * @param tableNames fully-qualified table names (db.table) + * @param operationIds operation UUIDs — parallel to tableNames + * @param resultsEndpoint base URL the Spark app PATCHes results back to + * @return job ID if the submission succeeded, empty if an error occurred + */ + public Optional launch( + String jobName, + String jobType, + List tableNames, + List operationIds, + String resultsEndpoint) { + try { + ObjectNode body = MAPPER.createObjectNode(); + body.put("jobName", jobName); + body.put("clusterId", clusterId); + + ObjectNode jobConf = body.putObject("jobConf"); + jobConf.put("jobType", jobType); + + ArrayNode args = jobConf.putArray("args"); + args.add("--tableNames"); + args.add(String.join(",", tableNames)); + args.add("--operationIds"); + args.add(String.join(",", operationIds)); + args.add("--resultsEndpoint"); + args.add(resultsEndpoint); + + String responseBody = + webClient + .post() + .uri("/jobs") + .bodyValue(body) + .retrieve() + .bodyToMono(String.class) + .timeout(TIMEOUT) + .block(); + + String jobId = MAPPER.readTree(responseBody).path("jobId").asText(null); + return Optional.ofNullable(jobId); + } catch (Exception e) { + log.error("Failed to submit job '{}': {}", jobName, e.getMessage()); + return Optional.empty(); + } + } +} diff --git a/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java new file mode 100644 index 000000000..5ac86c070 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/java/com/linkedin/openhouse/scheduler/config/SchedulerConfig.java @@ -0,0 +1,46 @@ +package com.linkedin.openhouse.scheduler.config; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.scheduler.BinPacker; +import com.linkedin.openhouse.scheduler.FileCountBinPacker; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.util.Map; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.client.WebClient; + +@Configuration +public class SchedulerConfig { + + @Value("${jobs.base-uri}") + private String jobsBaseUri; + + @Value("${scheduler.cluster-id}") + private String clusterId; + + @Value("${scheduler.ofd.max-files-per-bin}") + private long ofdMaxFilesPerBin; + + @Bean + public WebClient jobsWebClient() { + return WebClient.builder().baseUrl(jobsBaseUri).build(); + } + + @Bean + public JobsServiceClient jobsServiceClient(WebClient jobsWebClient) { + return new JobsServiceClient(jobsWebClient, clusterId); + } + + /** + * Map of {@link OperationTypeDto} to the {@link BinPacker} strategy that handles it. Adding a new + * operation type means adding an entry here and configuring its packer; the strategy class itself + * stays generic. + */ + @Bean + public Map binPackers() { + return Map.of( + OperationTypeDto.ORPHAN_FILES_DELETION, + new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, ofdMaxFilesPerBin)); + } +} diff --git a/apps/optimizer-scheduler/src/main/resources/application.properties b/apps/optimizer-scheduler/src/main/resources/application.properties new file mode 100644 index 000000000..442cd98a4 --- /dev/null +++ b/apps/optimizer-scheduler/src/main/resources/application.properties @@ -0,0 +1,11 @@ +spring.application.name=openhouse-optimizer-scheduler +spring.main.web-application-type=none +spring.datasource.url=${OPTIMIZER_DB_URL:jdbc:h2:mem:schedulerdb;DB_CLOSE_DELAY=-1;MODE=MySQL} +spring.datasource.username=${OPTIMIZER_DB_USER:sa} +spring.datasource.password=${OPTIMIZER_DB_PASSWORD:} +spring.jpa.hibernate.ddl-auto=none +jobs.base-uri=${JOBS_BASE_URI:http://localhost:8002} +scheduler.ofd.max-files-per-bin=${SCHEDULER_OFD_MAX_FILES_PER_BIN:1000000} +scheduler.results-endpoint=${SCHEDULER_RESULTS_ENDPOINT:http://openhouse-optimizer:8080/v1/optimizer/operations} +scheduler.cluster-id=${SCHEDULER_CLUSTER_ID:LocalHadoopCluster} +optimizer.repo.default-limit=${OPTIMIZER_REPO_DEFAULT_LIMIT:10000} diff --git a/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/FileCountBinPackerTest.java b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/FileCountBinPackerTest.java new file mode 100644 index 000000000..2cda81bd9 --- /dev/null +++ b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/FileCountBinPackerTest.java @@ -0,0 +1,104 @@ +package com.linkedin.openhouse.scheduler; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.model.TableOperationDto; +import com.linkedin.openhouse.optimizer.model.TableStatsDto; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; + +class FileCountBinPackerTest { + + private static final long MAX = 1_000_000L; + private final FileCountBinPacker packer = + new FileCountBinPacker(OperationTypeDto.ORPHAN_FILES_DELETION, MAX); + + private static TableOperationDto op(String uuid) { + return TableOperationDto.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(uuid) + .databaseName("db") + .tableName("tbl_" + uuid) + .operationType(OperationTypeDto.ORPHAN_FILES_DELETION) + .build(); + } + + private static TableStatsDto stats(Long fileCount) { + return TableStatsDto.builder() + .snapshot(TableStatsDto.SnapshotMetrics.builder().numCurrentFiles(fileCount).build()) + .build(); + } + + private static SchedulingCandidate candidate(String uuid, Long fileCount) { + return new SchedulingCandidate(op(uuid), stats(fileCount)); + } + + @Test + void emptyInput_returnsEmptyBins() { + assertThat(packer.pack(List.of())).isEmpty(); + } + + @Test + void singleTable_oneBin() { + SchedulingCandidate c = candidate("uuid-1", 100L); + List bins = packer.pack(List.of(c)); + assertThat(bins).hasSize(1); + assertThat(bins.get(0).getOperations()).containsExactly(c.getOperation()); + } + + @Test + void tablesUnderLimit_oneBin() { + List bins = + packer.pack( + List.of(candidate("a", 300_000L), candidate("b", 300_000L), candidate("c", 300_000L))); + assertThat(bins).hasSize(1); + assertThat(bins.get(0).getOperations()).hasSize(3); + } + + @Test + void tablesOverLimit_twoBins() { + List bins = + packer.pack( + List.of(candidate("a", 600_000L), candidate("b", 600_000L), candidate("c", 400_000L))); + assertThat(bins).hasSize(2); + assertThat(bins.get(0).getOperations()).hasSize(2); // 600k + 400k + assertThat(bins.get(1).getOperations()).hasSize(1); // 600k alone + } + + @Test + void largeTableAlone_exceedsLimitSingleBin() { + SchedulingCandidate big = candidate("big", 5_000_000L); + List bins = packer.pack(List.of(big)); + assertThat(bins).hasSize(1); + assertThat(bins.get(0).getOperations()).containsExactly(big.getOperation()); + } + + @Test + void nullFileCount_treatedAsZero() { + List bins = packer.pack(List.of(candidate("x", null), candidate("y", null))); + assertThat(bins).hasSize(1); + assertThat(bins.get(0).getOperations()).hasSize(2); + } + + @Test + void sortedDescending_largestFirst() { + SchedulingCandidate small = candidate("small", 100L); + SchedulingCandidate large = candidate("large", 900_000L); + List bins = packer.pack(List.of(small, large)); + assertThat(bins).hasSize(1); + List ordered = + bins.get(0).getOperations().stream() + .map(TableOperationDto::getTableUuid) + .collect(Collectors.toList()); + assertThat(ordered).containsExactly("large", "small"); + } + + @Test + void binCarriesOperationType() { + List bins = packer.pack(List.of(candidate("u", 1L))); + assertThat(bins.get(0).getOperationType()).isEqualTo(OperationTypeDto.ORPHAN_FILES_DELETION); + } +} diff --git a/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java new file mode 100644 index 000000000..71e91eb39 --- /dev/null +++ b/apps/optimizer-scheduler/src/test/java/com/linkedin/openhouse/scheduler/SchedulerRunnerTest.java @@ -0,0 +1,364 @@ +package com.linkedin.openhouse.scheduler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.linkedin.openhouse.optimizer.db.OperationStatus; +import com.linkedin.openhouse.optimizer.db.SnapshotMetrics; +import com.linkedin.openhouse.optimizer.db.TableOperationsRow; +import com.linkedin.openhouse.optimizer.db.TableStatsRow; +import com.linkedin.openhouse.optimizer.model.OperationTypeDto; +import com.linkedin.openhouse.optimizer.repository.TableOperationsRepository; +import com.linkedin.openhouse.optimizer.repository.TableStatsRepository; +import com.linkedin.openhouse.scheduler.client.JobsServiceClient; +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class SchedulerRunnerTest { + + private static final OperationTypeDto OFD = OperationTypeDto.ORPHAN_FILES_DELETION; + private static final com.linkedin.openhouse.optimizer.db.OperationType OFD_DB = + com.linkedin.openhouse.optimizer.db.OperationType.ORPHAN_FILES_DELETION; + private static final String OFD_STR = OFD.name(); + private static final String RESULTS_ENDPOINT = "http://localhost:8080/v1/optimizer/operations"; + + @Mock private TableOperationsRepository operationsRepo; + @Mock private TableStatsRepository statsRepo; + @Mock private JobsServiceClient jobsClient; + @Mock private BinPacker binPacker; + + private SchedulerRunner runner; + + @BeforeEach + void setUp() { + runner = + new SchedulerRunner( + operationsRepo, statsRepo, jobsClient, Map.of(OFD, binPacker), RESULTS_ENDPOINT); + } + + // ---- Stubbing helpers ---- + + /** Stubs the initial "find PENDING" call. */ + private void stubFindPending(List rows) { + when(operationsRepo.find( + eq(Optional.of(OFD_DB)), + eq(Optional.of(OperationStatus.PENDING)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any())) + .thenReturn(rows); + } + + /** Stubs the post-claim "find SCHEDULING" call. */ + private void stubFindClaimed(List rows) { + when(operationsRepo.find( + eq(Optional.empty()), + eq(Optional.of(OperationStatus.SCHEDULING)), + eq(Optional.empty()), + eq(Optional.empty()), + eq(Optional.empty()), + any(), + any(), + any())) + .thenReturn(rows); + } + + /** Stubs the bin packer to return one bin containing every candidate. */ + private void stubOneBinForAllCandidates() { + when(binPacker.pack(anyList())) + .thenAnswer( + inv -> + List.of( + new Bin( + OFD, + inv.>getArgument(0).stream() + .map(SchedulingCandidate::getOperation) + .collect(Collectors.toList())))); + } + + private TableOperationsRow pendingRow(String uuid, String db, String table) { + return TableOperationsRow.builder() + .id(UUID.randomUUID().toString()) + .tableUuid(uuid) + .databaseName(db) + .tableName(table) + .operationType(OFD_DB) + .status(OperationStatus.PENDING) + .createdAt(Instant.now()) + .build(); + } + + private TableOperationsRow schedulingRow(TableOperationsRow source) { + return source.toBuilder().status(OperationStatus.SCHEDULING).build(); + } + + private TableStatsRow statsRow(String uuid, long numCurrentFiles) { + return TableStatsRow.builder() + .tableUuid(uuid) + .snapshot(SnapshotMetrics.builder().numCurrentFiles(numCurrentFiles).build()) + .build(); + } + + // ---- Tests ---- + + @Test + void schedule_noPendingOps_noJobSubmitted() { + stubFindPending(List.of()); + + runner.schedule(OFD); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + verify(binPacker, never()).pack(anyList()); + } + + @Test + void schedule_unknownOperationType_throws() { + SchedulerRunner emptyRunner = + new SchedulerRunner(operationsRepo, statsRepo, jobsClient, Map.of(), RESULTS_ENDPOINT); + + assertThatThrownBy(() -> emptyRunner.schedule(OFD)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("No BinPacker registered"); + + verify(operationsRepo, never()).find(any(), any(), any(), any(), any(), any(), any(), any()); + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } + + @Test + void schedule_singleBin_claimsAndMarksScheduled() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100_000L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(row))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-123")); + + runner.schedule(OFD); + + verify(operationsRepo) + .updateBatch( + eq(List.of(row.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.SCHEDULED), + eq(Optional.empty()), + eq(Optional.of("job-123"))); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()); + + ArgumentCaptor> tableNames = ArgumentCaptor.forClass(List.class); + verify(jobsClient) + .launch(anyString(), eq(OFD_STR), tableNames.capture(), anyList(), anyString()); + assertThat(tableNames.getValue()).containsExactly("db1.tbl1"); + } + + @Test + void schedule_jobLaunchFails_marksPendingForRetry() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(row))); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.empty()); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any())) + .thenReturn(1); + + runner.schedule(OFD); + + verify(operationsRepo) + .updateBatch( + eq(List.of(row.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.PENDING), + eq(Optional.empty()), + eq(Optional.empty())); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()); + } + + @Test + void schedule_rowsAlreadyClaimed_skipsSubmit() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(0); + stubFindClaimed(List.of()); + + runner.schedule(OFD); + + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any()); + verify(operationsRepo, never()) + .updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.PENDING), any(), any()); + } + + @Test + void schedule_cancelsDuplicatePendingPerCycle() { + String uuid = UUID.randomUUID().toString(); + TableOperationsRow row1 = pendingRow(uuid, "db1", "tbl1"); + TableOperationsRow row2 = pendingRow(uuid, "db1", "tbl1"); + + stubFindPending(List.of(row1, row2)); + when(operationsRepo.cancel(anyList())).thenReturn(1); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(uuid, 100L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + // After dedup, only row1 (oldest by createdAt then id) survives. + TableOperationsRow survivor = row1.getCreatedAt().isBefore(row2.getCreatedAt()) ? row1 : row2; + if (row1.getCreatedAt().equals(row2.getCreatedAt())) { + survivor = row1.getId().compareTo(row2.getId()) <= 0 ? row1 : row2; + } + stubFindClaimed(List.of(schedulingRow(survivor))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-dedup")); + + runner.schedule(OFD); + + // Exactly one ID was cancelled (the duplicate). + ArgumentCaptor> cancelled = ArgumentCaptor.forClass(List.class); + verify(operationsRepo).cancel(cancelled.capture()); + assertThat(cancelled.getValue()).hasSize(1); + } + + @Test + void schedule_partialClaim_launchesAndMarksOnlyClaimedSubset() { + String uuidA = UUID.randomUUID().toString(); + String uuidB = UUID.randomUUID().toString(); + TableOperationsRow rowA = pendingRow(uuidA, "db1", "tblA"); + TableOperationsRow rowB = pendingRow(uuidB, "db1", "tblB"); + + stubFindPending(List.of(rowA, rowB)); + when(statsRepo.findAllById(any())) + .thenReturn(List.of(statsRow(uuidA, 100L), statsRow(uuidB, 100L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + // Only A actually claimed (B owned by another instance). + stubFindClaimed(List.of(schedulingRow(rowA))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-partial")); + + runner.schedule(OFD); + + ArgumentCaptor> launchedTableNames = ArgumentCaptor.forClass(List.class); + ArgumentCaptor> launchedOpIds = ArgumentCaptor.forClass(List.class); + verify(jobsClient) + .launch( + anyString(), + anyString(), + launchedTableNames.capture(), + launchedOpIds.capture(), + anyString()); + assertThat(launchedTableNames.getValue()).containsExactly("db1.tblA"); + assertThat(launchedOpIds.getValue()).containsExactly(rowA.getId()); + + verify(operationsRepo) + .updateBatch( + eq(List.of(rowA.getId())), + eq(OperationStatus.SCHEDULING), + eq(OperationStatus.SCHEDULED), + eq(Optional.empty()), + eq(Optional.of("job-partial"))); + } + + @Test + void schedule_opsWithoutStats_skipped() { + String withStats = UUID.randomUUID().toString(); + String missing = UUID.randomUUID().toString(); + TableOperationsRow withStatsRow = pendingRow(withStats, "db1", "tblA"); + TableOperationsRow missingRow = pendingRow(missing, "db1", "tblB"); + + stubFindPending(List.of(withStatsRow, missingRow)); + when(statsRepo.findAllById(any())).thenReturn(List.of(statsRow(withStats, 50L))); + stubOneBinForAllCandidates(); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.PENDING), eq(OperationStatus.SCHEDULING), any(), any())) + .thenReturn(1); + stubFindClaimed(List.of(schedulingRow(withStatsRow))); + when(operationsRepo.updateBatch( + anyList(), eq(OperationStatus.SCHEDULING), eq(OperationStatus.SCHEDULED), any(), any())) + .thenReturn(1); + when(jobsClient.launch(anyString(), anyString(), anyList(), anyList(), anyString())) + .thenReturn(Optional.of("job-skip")); + + runner.schedule(OFD); + + ArgumentCaptor> ids = ArgumentCaptor.forClass(List.class); + verify(operationsRepo) + .updateBatch( + ids.capture(), + eq(OperationStatus.PENDING), + eq(OperationStatus.SCHEDULING), + any(), + any()); + assertThat(ids.getValue()).containsExactly(withStatsRow.getId()); + } + + @Test + void schedule_allOpsWithoutStats_noJobSubmitted() { + TableOperationsRow row = pendingRow(UUID.randomUUID().toString(), "db1", "tbl1"); + + stubFindPending(List.of(row)); + when(statsRepo.findAllById(any())).thenReturn(List.of()); + + runner.schedule(OFD); + + verify(binPacker, never()).pack(anyList()); + verify(jobsClient, never()).launch(anyString(), anyString(), anyList(), anyList(), anyString()); + } +} diff --git a/apps/optimizer-scheduler/src/test/resources/application-test.properties b/apps/optimizer-scheduler/src/test/resources/application-test.properties new file mode 100644 index 000000000..35233829b --- /dev/null +++ b/apps/optimizer-scheduler/src/test/resources/application-test.properties @@ -0,0 +1,10 @@ +spring.datasource.url=jdbc:h2:mem:schedulertestdb;DB_CLOSE_DELAY=-1;MODE=MySQL +spring.datasource.username=sa +spring.datasource.password= +spring.jpa.hibernate.ddl-auto=none +spring.sql.init.mode=always +spring.sql.init.schema-locations=classpath:schema.sql +jobs.base-uri=http://localhost:9999 +scheduler.ofd.max-files-per-bin=1000000 +scheduler.results-endpoint=http://localhost:8080/v1/optimizer/operations +scheduler.cluster-id=test-cluster diff --git a/apps/optimizer-scheduler/src/test/resources/schema.sql b/apps/optimizer-scheduler/src/test/resources/schema.sql new file mode 100644 index 000000000..75de24d3d --- /dev/null +++ b/apps/optimizer-scheduler/src/test/resources/schema.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS table_operations ( + id VARCHAR(36) NOT NULL, + table_uuid VARCHAR(36) NOT NULL, + database_name VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + operation_type VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL, + created_at TIMESTAMP(6) NOT NULL, + scheduled_at TIMESTAMP(6), + job_id VARCHAR(255), + version BIGINT, + PRIMARY KEY (id) +); + +CREATE TABLE IF NOT EXISTS table_stats ( + table_uuid VARCHAR(36) NOT NULL, + database_id VARCHAR(255) NOT NULL, + table_name VARCHAR(255) NOT NULL, + stats TEXT, + table_properties TEXT, + updated_at TIMESTAMP(6) NOT NULL, + PRIMARY KEY (table_uuid) +); diff --git a/settings.gradle b/settings.gradle index 7942f44d3..8b9593465 100644 --- a/settings.gradle +++ b/settings.gradle @@ -51,6 +51,7 @@ include ':services:housetables' include ':services:jobs' include ':services:optimizer' include ':apps:optimizer-analyzer' +include ':apps:optimizer-scheduler' include ':services:tables' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.2' include ':tables-test-fixtures:tables-test-fixtures-iceberg-1.5'