Skip to content

[BDP-102028] feat(optimizer): [4/N] Scheduler app#534

Draft
mkuchenbecker wants to merge 33 commits into
mkuchenb/optimizer-3from
mkuchenb/optimizer-4
Draft

[BDP-102028] feat(optimizer): [4/N] Scheduler app#534
mkuchenbecker wants to merge 33 commits into
mkuchenb/optimizer-3from
mkuchenb/optimizer-4

Conversation

@mkuchenbecker
Copy link
Copy Markdown
Collaborator

@mkuchenbecker mkuchenbecker commented Apr 7, 2026

Optimizer Stack

PR Content
#527 Data Model
#530 Database Repos
#531 REST service
#533 Analyzer app
#534 (this) Scheduler app
#tbd Spark BatchedOFD app
#tbd Infra, docker-compose, smoke test

Summary

PR 4 of N in the optimizer stack.
Overall Project
Service Design doc.

Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that claims PENDING operations and submits batched Spark jobs via the Jobs Service.

Changes

  • Client-facing API Changes
  • Internal API Changes
  • Bug Fixes
  • New Features
  • Performance Improvements
  • Code Style
  • Refactoring
  • Documentation
  • Tests

Scheduler runner: Loads PENDING ops, bin-packs by file count, claims via two-step CAS (PENDING → SCHEDULING → SCHEDULED), submits one Spark job per bin.

Bin packer: Greedy first-fit descending algorithm. Oversized tables get their own bin (never dropped). Tables with no stats default to cost 0.

Jobs client: WebClient-based REST client submitting POST /jobs to the Jobs Service with table names, operation IDs, and results endpoint.

Repository additions: Three @Modifying CAS methods on TableOperationsRepositorycancelDuplicatePending, markScheduling, markScheduled — required for safe concurrent scheduling.

Testing Done

  • Manually Tested on local docker setup. Please include commands ran, and their output.
  • Added new tests for the changes made.
  • Updated existing tests to reflect the changes made.
  • No tests added or updated. Please explain why. If unsure, please feel free to ask for help.
  • Some other form of testing like staging or soak time in production. Please explain.

13 unit tests:

  • BinPackerTest (7 tests) — empty input, single table, under/over limit, oversized table, no stats, descending sort
  • SchedulerRunnerTest (6 tests) — no pending ops, two-step claim + schedule, launch failure, already-claimed skip, duplicate cancellation, multi-row bin claim
./gradlew :apps:optimizer-scheduler:test
# BUILD SUCCESSFUL — 13 tests pass

Additional Information

  • Breaking Changes
  • Deprecations
  • Large PR broken into smaller PRs, and PR plan linked in the description.

…park jobs

Introduces apps/optimizer-scheduler, a Spring Boot CommandLineRunner that:
1. Loads PENDING operations from the optimizer DB
2. Bin-packs them by file count (first-fit descending)
3. Claims rows via CAS (PENDING → SCHEDULING → SCHEDULED)
4. Submits one batched Spark job per bin via the Jobs Service

Adds three @Modifying CAS methods to TableOperationsRepository:
cancelDuplicatePending, markScheduling, markScheduled — required for
safe concurrent scheduling without double-submitting.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@mkuchenbecker mkuchenbecker changed the title feat(optimizer): [4/N] Scheduler app [BDP-102028] feat(optimizer): [4/N] Scheduler app May 13, 2026
@@ -0,0 +1,74 @@
package com.linkedin.openhouse.scheduler;

import com.linkedin.openhouse.optimizer.entity.TableOperationRow;
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.

So we should have a Binpacker interface and a mapping from operation to binpacker.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every operation needs it own binpacker. Its different based on each operation as constraints are different. Rather than generic binpacking, we should have an OFD scheduler that is attempting to binpack pending ofd jobs. it should be a similar pattern to the analyzer where we run the scheduler requires an operation type but database and table name are optional.

So we should have a Binpacker interface and a mapping from operation to binpacker.

Done. BinPacker is now an interface with a single pack(List<TableOperation>) method. SchedulerConfig defines a Map<OperationType, BinPacker> bean — ORPHAN_FILES_DELETION is wired to a FileCountBinPacker instance. SchedulerApplication's CommandLineRunner loops over the map's keys and calls runner.schedule(opType) per entry, mirroring AnalyzerApplication. SchedulerRunner.schedule(OperationType, Optional<String> db, Optional<String> tableName) is the new signature.

* @return list of bins, each bin being a non-empty list of rows
*/
public static List<List<TableOperationRow>> pack(
List<TableOperationRow> pending, Map<String, Long> fileCountByUuid, long maxFilesPerBin) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface is bad and inflexible. TableOperationRow should contain all the nessasary information. FileCount should be a field. maxFilesPerBin is specific to OFD and it should not be a aparameter but rather something done when constructing the bin packer instance (or a conf)

Done. fileCount is now a nullable field on the shared TableOperation domain object — populated by the scheduler at read time from table_stats. FileCountBinPacker.pack(...) reads it off each TableOperation directly; no more Map<uuid, fileCount> parameter. maxFilesPerBin is constructor-injected on FileCountBinPacker via the scheduler.ofd.max-files-per-bin config; not on pack().

private String resultsEndpoint;

@Transactional
public void schedule() {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operation type required, database and table names optional.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

operation type required, database and table names optional.

Done. New signature: schedule(OperationType op) and schedule(OperationType op, Optional<String> databaseName, Optional<String> tableName) (SchedulerRunner.java:43,50). Operation type is required; database and table name are Optional<String>. Mirrors AnalyzerRunner.analyze.

return count != null ? count : 0L;
}));

List<List<TableOperationRow>> bins = BinPacker.pack(pending, fileCountByUuid, maxFiles);
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be handling rows, that is the database model. Use TableOperation from the analyzer pr.

Done. TableOperation (and its companion enums) moved from the analyzer-internal package to apps/optimizer/.../model/ so both the analyzer and the scheduler consume the same domain type. SchedulerRunner now loads rows, converts them via TableOperation.from(row), enriches each with fileCount from table_stats, and hands List<TableOperation> to the BinPacker. Per-bin logic and the BinPacker interface work on TableOperation, not the JPA row.

List<TableOperationRow> claimed =
bin.stream()
.filter(
r -> operationsRepo.markScheduling(r.getId(), r.getVersion(), Instant.now()) == 1)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we batch the writes to limit round trips?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we batch the writes to limit round trips?

Done. Added cancelDuplicatePendingBatch, markSchedulingBatch, markScheduledBatch, and markPendingBatch @Modifying @Query methods on TableOperationsRepository. Each issues one UPDATE/DELETE for a list of IDs with the status-guarded WHERE clause preserved. SchedulerRunner.submitBin now uses these instead of the per-row variants.

Optional<String> jobId =
jobsClient.launch(jobName, operationType, tableNames, opIds, resultsEndpoint);

if (jobId.isPresent()) {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else retry?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else retry?

Done. On job-launch failure (jobsClient.launch returns Optional.empty()), SchedulerRunner now calls markPendingBatch(claimed-ids) to revert the claimed rows from SCHEDULING back to PENDING. The next scheduler pass picks them up. Replaces the previous log-and-leak that relied on the analyzer's now-removed scheduledTimeout.

mkuchenbecker and others added 3 commits May 13, 2026 17:04
…tion domain, batched writes, retry

Addresses unresolved review threads on #534:

- BinPacker becomes an interface with a single pack(...) method. Strategy
  is decoupled from operation type; binding to OFD lives in
  SchedulerConfig as a Map<OperationType, BinPacker> bean.
- FileCountBinPacker is the first strategy — named after the dimension it
  packs on, not the op it serves. maxFilesPerBin is constructor-injected
  via scheduler.ofd.max-files-per-bin; pack(...) reads fileCount off
  TableOperation rather than taking a side map.
- SchedulerRunner.schedule now takes required OperationType plus optional
  databaseName/tableName (mirror of AnalyzerRunner.analyze). It loads
  PENDING rows, converts row → TableOperation, fetches stats for
  fileCount enrichment, and dispatches to the registered BinPacker. No
  raw TableOperationRow flows through the per-bin logic.
- Writes are batched: cancelDuplicatePendingBatch, markSchedulingBatch,
  markScheduledBatch, markPendingBatch on TableOperationsRepository each
  issue one UPDATE/DELETE.
- On job-launch failure the scheduler reverts claimed rows from SCHEDULING
  back to PENDING via markPendingBatch so the next pass retries (replaces
  the previous log-and-leak that relied on the analyzer's removed
  scheduledTimeout).
- SchedulerApplication's CommandLineRunner loops over the configured
  Map<OperationType, BinPacker> and calls schedule(opType) per entry.

Tests: FileCountBinPackerTest covers the packing strategy;
SchedulerRunnerTest covers per-op dispatch, batched claim/mark-scheduled
on success, batch-revert on submit failure, duplicate-PENDING dedup, and
the unknown-operation-type no-op path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…ad CAS methods

- apps/optimizer-scheduler/build.gradle: swap the now-removed
  `:apps:optimizer-data` dep for `:services:optimizer`.
- TableOperationsRepository: remove the single-row `markScheduling`,
  `markScheduled`, and `cancelDuplicatePending` methods. They have no
  production callers; the scheduler exclusively uses the `*Batch`
  variants. Update the javadoc on TableOperationsRow.version
  accordingly.
Adapt the scheduler to the entity→db rename, the removal of factory
methods on model/ data types, and the db/-typed repository signatures.

- SchedulerRunner: inject ModelDbMapper. Imports switch from entity/
  to db/. operationsRepo.find now takes (db.OperationType,
  db.OperationStatus, ...). Row → model conversion via
  dbMapper.toOperation. extractFileCount reads row.getSnapshot()
  directly — the row no longer wraps a TableStats envelope.
  cancelDuplicatePendingBatch takes a db.OperationType.
- SchedulerRunnerTest rewritten: db-typed enums + new ModelDbMapper.
TableOperationsRow no longer carries a `version` field (the batch
CAS pattern's atomicity is on status alone). The mock-built test
row was still calling .version(0L), which now refers to a removed
field.
…Mapper

SchedulerRunner no longer injects ModelDbMapper. Conversion goes
through the model types' static factories and instance methods:
- Row → model: TableOperation.fromRow.
- model.OperationType → db.OperationType via operationType.toDb().

Test: drop the ModelDbMapper field and the extra constructor argument.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant