[BDP-102028] feat(optimizer): [4/N] Scheduler app#534
Conversation
…park jobs Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that: 1. Loads PENDING operations from the optimizer DB 2. Bin-packs them by file count (first-fit descending) 3. Claims rows via CAS (PENDING → SCHEDULING → SCHEDULED) 4. Submits one batched Spark job per bin via the Jobs Service Adds three @Modifying CAS methods to TableOperationsRepository: cancelDuplicatePending, markScheduling, markScheduled — required for safe concurrent scheduling without double-submitting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| @@ -0,0 +1,74 @@ | |||
| package com.linkedin.openhouse.scheduler; | |||
|
|
|||
| import com.linkedin.openhouse.optimizer.entity.TableOperationRow; | |||
There was a problem hiding this comment.
Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.
So we should have a Binpacker interface and a mapping from operation to binpacker.
There was a problem hiding this comment.
Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.
So we should have a Binpacker interface and a mapping from operation to binpacker.
Done. BinPacker is now an interface with a single pack(List<TableOperation>) method. SchedulerConfig defines a Map<OperationType, BinPacker> bean — ORPHAN_FILES_DELETION is wired to a FileCountBinPacker instance. SchedulerApplication's CommandLineRunner loops over the map's keys and calls runner.schedule(opType) per entry, mirroring AnalyzerApplication. SchedulerRunner.schedule(OperationType, Optional<String> db, Optional<String> tableName) is the new signature.
| * @return list of bins, each bin being a non-empty list of rows | ||
| */ | ||
| public static List<List<TableOperationRow>> pack( | ||
| List<TableOperationRow> pending, Map<String, Long> fileCountByUuid, long maxFilesPerBin) { |
There was a problem hiding this comment.
This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)
There was a problem hiding this comment.
This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)
Done. fileCount is now a nullable field on the shared TableOperation domain object — populated by the scheduler at read time from table_stats. FileCountBinPacker.pack(...) reads it off each TableOperation directly; no more Map<uuid, fileCount> parameter. maxFilesPerBin is constructor-injected on FileCountBinPacker via the scheduler.ofd.max-files-per-bin config; not on pack().
| private String resultsEndpoint; | ||
|
|
||
| @Transactional | ||
| public void schedule() { |
There was a problem hiding this comment.
operation type required, database and table names optional.
There was a problem hiding this comment.
operation type required, database and table names optional.
Done. New signature: schedule(OperationType op) and schedule(OperationType op, Optional<String> databaseName, Optional<String> tableName) (SchedulerRunner.java:43,50). Operation type is required; database and table name are Optional<String>. Mirrors AnalyzerRunner.analyze.
| return count != null ? count : 0L; | ||
| })); | ||
|
|
||
| List<List<TableOperationRow>> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles); |
There was a problem hiding this comment.
We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.
There was a problem hiding this comment.
We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.
Done. TableOperation (and its companion enums) moved from the analyzer-internal package to apps/optimizer/.../model/ so both the analyzer and the scheduler consume the same domain type. SchedulerRunner now loads rows, converts them via TableOperation.from(row), enriches each with fileCount from table_stats, and hands List<TableOperation> to the BinPacker. Per-bin logic and the BinPacker interface work on TableOperation, not the JPA row.
| List<TableOperationRow> claimed = | ||
| bin.stream() | ||
| .filter( | ||
| r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1) |
There was a problem hiding this comment.
Can we batch the writes to limit round trips?
There was a problem hiding this comment.
Can we batch the writes to limit round trips?
Done. Added cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, and markPendingBatch @Modifying @Query methods on TableOperationsRepository. Each issues one UPDATE/DELETE for a list of IDs with the status-guarded WHERE clause preserved. SchedulerRunner.submitBin now uses these instead of the per-row variants.
| Optional<String> jobId = | ||
| jobsClient.launch(jobName, operationType, tableNames, opIds, resultsEndpoint); | ||
|
|
||
| if (jobId.isPresent()) { |
There was a problem hiding this comment.
else retry?
Done. On job-launch failure (jobsClient.launch returns Optional.empty()), SchedulerRunner now calls markPendingBatch(claimed-ids) to revert the claimed rows from SCHEDULING back to PENDING. The next scheduler pass picks them up. Replaces the previous log-and-leak that relied on the analyzer's now-removed scheduledTimeout.
…tion domain, batched writes, retry Addresses unresolved review threads on #534: - BinPacker becomes an interface with a single pack(...) method. Strategy is decoupled from operation type; binding to OFD lives in SchedulerConfig as a Map<OperationType, BinPacker> bean. - FileCountBinPacker is the first strategy — named after the dimension it packs on, not the op it serves. maxFilesPerBin is constructor-injected via scheduler.ofd.max-files-per-bin; pack(...) reads fileCount off TableOperation rather than taking a side map. - SchedulerRunner.schedule now takes required OperationType plus optional databaseName/tableName (mirror of AnalyzerRunner.analyze). It loads PENDING rows, converts row → TableOperation, fetches stats for fileCount enrichment, and dispatches to the registered BinPacker. No raw TableOperationRow flows through the per-bin logic. - Writes are batched: cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, markPendingBatch on TableOperationsRepository each issue one UPDATE/DELETE. - On job-launch failure the scheduler reverts claimed rows from SCHEDULING back to PENDING via markPendingBatch so the next pass retries (replaces the previous log-and-leak that relied on the analyzer's removed scheduledTimeout). - SchedulerApplication's CommandLineRunner loops over the configured Map<OperationType, BinPacker> and calls schedule(opType) per entry. Tests: FileCountBinPackerTest covers the packing strategy; SchedulerRunnerTest covers per-op dispatch, batched claim/mark-scheduled on success, batch-revert on submit failure, duplicate-PENDING dedup, and the unknown-operation-type no-op path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad CAS methods - apps/optimizer-scheduler/build.gradle: swap the now-removed `:apps:optimizer-data` dep for `:services:optimizer`. - TableOperationsRepository: remove the single-row `markScheduling`, `markScheduled`, and `cancelDuplicatePending` methods. They have no production callers; the scheduler exclusively uses the `*Batch` variants. Update the javadoc on TableOperationsRow.version accordingly.
Adapt the scheduler to the entity→db rename, the removal of factory methods on model/ data types, and the db/-typed repository signatures. - SchedulerRunner: inject ModelDbMapper. Imports switch from entity/ to db/. operationsRepo.find now takes (db.OperationType, db.OperationStatus, ...). Row → model conversion via dbMapper.toOperation. extractFileCount reads row.getSnapshot() directly — the row no longer wraps a TableStats envelope. cancelDuplicatePendingBatch takes a db.OperationType. - SchedulerRunnerTest rewritten: db-typed enums + new ModelDbMapper.
TableOperationsRow no longer carries a `version` field (the batch CAS pattern's atomicity is on status alone). The mock-built test row was still calling .version(0L), which now refers to a removed field.
…Mapper SchedulerRunner no longer injects ModelDbMapper. Conversion goes through the model types' static factories and instance methods: - Row → model: TableOperation.fromRow. - model.OperationType → db.OperationType via operationType.toDb(). Test: drop the ModelDbMapper field and the extra constructor argument.
Optimizer Stack
Summary
PR 4 of N in the optimizer stack.
Overall Project
Service Design doc.
Introduces
apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.Changes
Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.
Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.
Jobs client: WebClient-based REST client submitting
POST /jobsto the Jobs Service with table names, operation IDs, and results endpoint.Repository additions: Three
@ModifyingCAS methods onTableOperationsRepository—cancelDuplicatePending,markScheduling,markScheduled— required for safe concurrent scheduling.Testing Done
13 unit tests:
BinPackerTest(7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sortSchedulerRunnerTest(6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claimAdditional Information