Skip to content

US1124088: Payload Batching job-service enhancement #567

Open
rajasekar-balasubramani wants to merge 10 commits into
developfrom
US1124088
Open

US1124088: Payload Batching job-service enhancement #567
rajasekar-balasubramani wants to merge 10 commits into
developfrom
US1124088

Conversation

@rajasekar-balasubramani

@rajasekar-balasubramani rajasekar-balasubramani commented May 13, 2026

Copy link
Copy Markdown

@buildmachine-sou-jenkins2

Copy link
Copy Markdown
Contributor

@buildmachine-sou-jenkins2

Copy link
Copy Markdown
Contributor

The Documentation QA site for this branch has been built:
https://jobservice-ci-job-service-us1124088-0775db.glpages.otxlab.net

@rajasekar-balasubramani rajasekar-balasubramani marked this pull request as draft May 14, 2026 09:56
@dermot-hardy

Copy link
Copy Markdown
Member

These are comments from Copilot. Feel free to disagree with them because I have not had a chance to go through them myself.


PR: #567 — US1124088: Payload Batching job-service enhancement
Author: rajasekar-balasubramani
Branch: US1124088develop
Base SHA: 37146e5cf5c6f1ed592ab808241729ae87fc623b
Head SHA: efa7c5859af8f6e4b3a79f91abbaff664ca15ca7
State: open, mergeable_state=dirty (rebase needed against develop).
Stats: +2241 / -5 across 13 files. No prior PR review comments.
ValueEdge: not directly linked; PR body points to job-service-payload-batching design notes on GitLab. Story ID US1124088 is in the title.

What it does: Adds an opt-in payload-batching pipeline for DocumentWorkerTask jobs whose taskPipe is prefixed with DocumentWorkerSubdocumentBatcher() . When such a job's document.subdocuments array exceeds BATCH_SIZE (hardcoded 200), QueueServices.sendMessage splits it into N RabbitMQ messages with subtask IDs {baseTaskId}.{n} and {baseTaskId}.{N}* for the final one (the * marker is what job tracking uses to detect "this is the last subtask"). New utility classes:

  • PayloadBatchingService — gate ("should we batch?")
  • SubdocumentBatchSplitter — sublist views + per-batch taskData cloning
  • SubtaskIdGenerator — subtask ID + final-marker formatting
  • TaskPipeUtilhasSubdocumentBatcherPrefix / stripBatcherPrefix

Verdict: Request changes. The design is sound and the test coverage (1015 lines of unit + container + acceptance) is impressive, but there's a correctness gap around partial-batch failure that needs to be addressed before this lands, plus a couple of preexisting bugs in the modified path that this PR could clean up while it's there.


Must-fix

1. sendBatchedMessages is not atomic — partial failures leave orphaned messages and break job-tracking

QueueServices.java sendBatchedMessages loops over batches and publishes each one in turn, throwing on the first failure. Consider this sequence:

  1. Job has 1,000 subdocuments → totalBatches = 5.
  2. baseTaskId = UUID.random() (generated fresh inside this method, not persisted anywhere).
  3. Batch 1 published with subtask ID {base}.1.
  4. Batch 2 published with subtask ID {base}.2.
  5. Batch 3 publish fails with TimeoutException.
  6. Method throws, caller is DatabasePoller.sendMessageToQueueMessaging.

Now look at the caller's error path:

  • If ExceptionAnalyzer.analyzeException classifies the timeout as TRANSIENT, the dependent job stays in the retry table and the WHOLE thing is retried later — which generates a fresh baseTaskId and republishes batches 1, 2, 3, 4, 5. The downstream worker now sees {baseA}.1, {baseA}.2, and {baseB}.1..5 — two competing subtask families for the same parent job. The final-subtask * marker is on {baseB}.5*, so job tracking eventually completes the parent. But {baseA}.1 and {baseA}.2 are processed as ghost subtasks and may corrupt progress counters that key off baseTaskId.
  • If it's classified PERMANENT, the job is marked failed and deleted from the retry table — but batches 1 and 2 are still in RabbitMQ and will be picked up by the document worker, which will then try to report progress for a job that was just deleted.

The MR's design assumes "publish either succeeds for all batches or fails before publishing any", which is not what the code does. Options to consider:

  • Idempotent baseTaskId: derive baseTaskId deterministically from (partitionId, jobId) rather than UUID.randomUUID() so retries produce the same subtask IDs and a downstream dedupe can drop the duplicates. The existing JobTaskId(partitionId, jobId).getMessageId() already gives you one for baseJobTaskId — using something parallel for baseTaskId keeps subtask IDs stable across retries.
  • Send-once semantics via tx/publisher-confirms in bulk: use a confirm batch on the channel, only mark the dependent job deleted if ALL batches confirm.
  • Mark partial-batch failures non-retryable so the parent job fails cleanly rather than spawning a second batch family.

This is the one item I'd genuinely block on — the rest of the PR is fine.

2. BATCH_SIZE = 200 is hardcoded; should be configurable

PayloadBatchingService.java:48. The whole point of payload batching is to fit under a memory/message-size limit, and that limit varies by deployment (worker heap, RabbitMQ frame-max, downstream OpenSearch bulk-size). 200 is a reasonable default but it must be tunable without a code change. Surface as an env var via ScheduledExecutorConfig, mirroring how RabbitMQPublishTimeoutSeconds and StatusCheckIntervalSeconds are already done in that config class. Default to 200, document the env var in the release note.

This is also necessary to test the batching behaviour itself — the integration test currently has to construct 200+ subdocuments to trigger a single batch boundary; with a tunable size it can run far cheaper.


Should-fix

3. NonBatchingWorkerAction violates Java naming conventions

QueueServices.java:116:

final WorkerAction NonBatchingWorkerAction;

Local variables start lower-case. Rename to nonBatchingWorkerAction (and the comment one line up explains why the rewrite happens — keep it).

4. The WorkerAction rebuild in sendSingleMessage and sendBatchedMessages duplicates 7 setter calls

QueueServices.java:118-125 and QueueServices.java:208-215 both copy every field of a WorkerAction into a new instance and then change one or two fields. Pull a private static WorkerAction copyWithTaskPipeAndData(WorkerAction src, String newTaskPipe, String newTaskData, TaskDataEncodingEnum newEncoding) helper. If WorkerAction is a Swagger-generated type and you don't want to maintain the helper next to it, even a 4-line method in QueueServices is better than two open-coded copies that can drift.

Or — since WorkerAction appears to be a mutable bean — sendSingleMessage could just call workerAction.setTaskPipe(strippedTaskPipe) directly and skip the whole copy. Check whether the caller passes a shared instance; if it's freshly constructed by DatabasePoller.sendMessageToQueueMessaging per call (it is — line 65 of DatabasePoller), mutating it in place is fine.

5. getTaskMessage has a preexisting unguarded cast that becomes more dangerous with batching

QueueServices.java:325:

final Object taskDataObj = workerAction.getTaskData();
final String taskDataStr = (String) taskDataObj;        // <-- unconditional cast
final WorkerAction.TaskDataEncodingEnum encoding = workerAction.getTaskDataEncoding();

if (taskDataObj instanceof String) {
    ...
} else if (taskDataObj instanceof Map<?, ?>) {           // <-- unreachable: cast above already threw
    ...
}

The Map<?, ?> branch is unreachable because the cast on line 325 throws ClassCastException for any non-String first. This is preexisting, not your bug. But since you're touching this method's caller path heavily, two options:

  • Leave it and file a follow-up issue.
  • Fix in this PR: move final String taskDataStr = (String) taskDataObj; inside the instanceof String branch where it's actually used.

I'd take the second option — it's a 2-line change with no behavior change for the working path and removes a footgun in a method whose caller surface just doubled.

6. shouldBatchPayload deserializes the full taskData JSON just to count subdocs, then sendBatchedMessages deserializes it again

For large payloads (the whole point of this feature) that's a non-trivial cost. PayloadBatchingService.shouldBatchPayload calls getSubdocumentsCountdeserializeTaskData (full Jackson parse) → extractSubdocuments.size(). Then sendBatchedMessages calls deserializeTaskData again. Either:

  • Have shouldBatchPayload return the deserialized Map (and a sentinel for "don't batch"), so the caller can reuse it; or
  • Add a BatchingDecision result object: {shouldBatch: boolean, taskDataMap: Map, subdocuments: List, totalBatches: int} populated once.

For a 10 MB payload this matters. For sub-MB it doesn't. Worth doing while the code is new.

7. SubdocumentBatchSplitter.createBatchedTaskData clones the entire taskData map for every batch

SubdocumentBatchSplitter.java:140 does new HashMap<>(originalTaskData) per batch. For a job with 100,000 subdocuments and BATCH_SIZE=200 that's 500 shallow copies of the parent taskData (which may carry custom-data, fields-template, etc.). The class doc explicitly says it avoids copying subdocuments by using subList views (great) — but the surrounding map still gets cloned 500 times. For one batched job at a time this is probably fine; under concurrent load on a small heap it's worth profiling.

Suggestion: build the per-batch document map with a Jackson ObjectNode shallow override of just subdocuments, or use a wrapper Map that delegates everything except document/subdocuments to the original.

8. BATCH_SIZE = 200 interaction with RabbitMQ frame_max

Batching solves the worker-heap problem but a batched message can still exceed RabbitMQ's frame_max (default 128 MB on modern brokers, but smaller in some Otext deployments) if 200 subdocuments are each multi-MB. Document the per-deployment guidance: BATCH_SIZE must be chosen such that 200 * max_subdoc_size < frame_max - taskData_overhead. Or, better: emit a warning log if the batched message bytes exceed a threshold.


Smaller / nits

9. PayloadBatchingService.calculateTotalBatches and getBatchSize() appear to be test-only API surface

QueueServices.sendBatchedMessages calls PayloadBatchingService.getBatchSize() and SubdocumentBatchSplitter.calculateBatchCount(...) directly — not calculateTotalBatches. Confirm calculateTotalBatches is actually used by something (it may be reached only through tests). If test-only, either delete it or move to a test helper.

10. SubdocumentBatchSplitter.getSubdocumentsBatchView exposes a live subList view

The Javadoc correctly calls out "subList VIEW" — good. But there's no guard against the caller modifying the view (which would propagate to the original list) or against the original being modified between view creation and use (ConcurrentModificationException risk if anything external touches it). Add a one-line note: "Must not be used after the underlying taskData map is mutated". The codec serializes the view immediately in sendBatchedMessages so this is fine in practice; document it for future maintainers.

11. release-notes-10.1.2.md could mention the unconfigurable batch size and the opt-in mechanism more visibly

Right now it says "Enabled via DocumentWorkerSubdocumentBatcher() prefix" — good — but doesn't mention that batch size is 200 (and per item #2 above, should mention the configuration env var once added). For operators reading release notes, "messages are split at 200 subdocuments per batch (configurable via CAF_JS_PAYLOAD_BATCH_SIZE)" would be more actionable than the current text.

12. TaskPipeUtilTest.testStripBatcherPrefix_OnlyPrefix returning ""

TaskPipeUtilTest.java:99 verifies that stripping the prefix from a string that's exactly the prefix yields "". That's a valid input semantics test, but worth asking: should shouldBatchPayload (or job creation upstream in job-service-core) reject job definitions whose taskPipe is exactly the prefix (i.e., empty target queue)? Currently this would silently publish to the empty-string queue. Defense-in-depth: log+skip or fail at job-definition time.

13. Imports in QueueServices.java are tacked on after the original alphabetical block

QueueServices.java:34-37: the four new com.github.jobservice.scheduledexecutor.batching.* + com.github.jobservice.util.TaskPipeUtil imports come after java.util.concurrent.TimeoutException rather than being merged into the existing alphabetical sort. Cosmetic; an IDE auto-organize would handle it.

14. DatabasePoller.sendMessageToQueueMessaging already strips prefix to compute the queue name, then QueueServices.sendSingleMessage strips it again from the WorkerAction

DatabasePoller.java:92 does:

final String actualTargetQueue = TaskPipeUtil.stripBatcherPrefix(workerAction.getTaskPipe());

…then passes workerAction (with the prefixed taskPipe still in it) into queueServices.sendMessage. sendSingleMessage strips it again from the WorkerAction to keep the message body clean. Both strips are correct (one for the queue name, one for the message contents), but they could be done once at the DatabasePoller boundary: strip in DatabasePoller, set the stripped value back on workerAction, and pass on. Then QueueServices doesn't need to know about the prefix at all (except sendBatchedMessages which gates on it). This separation of concerns better matches what the package boundaries imply.

15. JobTrackingWorkerUtil.createDependentJobTaskMessage also strips the prefix

JobTrackingWorkerUtil.java:67. Good — this is the path for dependent jobs that don't go through QueueServices.sendBatchedMessages (they go straight to RabbitMQ via the task message returned here). One thing to confirm: dependent jobs are not subject to payload batching by this code path. So if a dependent job has 10,000 subdocuments and the DocumentWorkerSubdocumentBatcher() prefix, it will get the prefix stripped but published as a single huge message — silently bypassing the batching that the prefix is supposed to opt into. Is that intentional? If so, document it in the release note ("batching applies only to top-level jobs, not dependent jobs"). If not, this path needs the same fan-out logic.

16. The PR is in dirty merge state

mergeable_state=dirty means it needs a rebase against develop. Mention this so it doesn't get lost.

17. Hardcoded JSON keys

PayloadBatchingService.DOCUMENT_WORKER_TASK_CLASSIFIER = "DocumentWorkerTask", SubdocumentBatchSplitter.DOCUMENT_KEY = "document", SUBDOCUMENTS_KEY = "subdocuments". These are protocol-level constants shared with worker-document. If worker-document-interface (which is already a <scope>provided</scope> dep in your POM — see job-service-scheduled-executor/pom.xml) defines constants for these, reuse them rather than re-declaring as string literals.

18. SubdocumentBatchSplitter.calculateBatchCount uses Math.ceil((double) totalItems / batchSize)

Works for the sizes in question. The idiom (totalItems + batchSize - 1) / batchSize is the same answer without floating-point. Cosmetic.


What's good

  • Index-based / sublist-view batching rather than allocating N split lists upfront — exactly the right approach for memory.
  • Opt-in via task-pipe prefix — clean backward-compatible activation that doesn't perturb existing jobs.
  • Subtask ID scheme with the * final-marker maps cleanly onto the existing job-tracking convention and required no schema changes.
  • Three small focused new classes (PayloadBatchingService, SubdocumentBatchSplitter, SubtaskIdGenerator) — each with a single responsibility, all final with private constructors, properly Javadoc'd.
  • Test coverage: ~1,470 lines of new test code (471 unit + 544 container + 455 acceptance + 125 util). Includes edge cases like the partial prefix not matching (testHasSubdocumentBatcherPrefix_PartialPrefix) and OnlyPrefix"".
  • All defensive null/instanceof checks in extractSubdocuments — good.
  • Subtask ID validation in SubtaskIdGenerator.validateInputs is thorough.
  • Release note is succinct and accurately summarises the feature for operators.
  • Bumped copyright headers to 2026 consistently across all touched/new files.

Summary

Item 1 (atomicity of batched publish + behaviour under transient/permanent failure) is the real blocker — the rest is straightforward. Items 2, 5, 6 are quick wins that significantly improve operability and remove footguns. The rest can land as follow-ups if you want to merge the core feature soon. Also rebase against develop (item 16).

@rajasekar-balasubramani

Copy link
Copy Markdown
Author

CODE REVIEW CHANGES:

  1. As discussed, skipping this comment.
  2. Made changes to read JOB_SERVICE_PAYLOAD_BATCH_SIZE as the batching size from the scheduled-executor config, if not found defaults to 200. And release notes is also updated.
  3. Renamed the variable according to Java naming convention.
  4. Created a helper method to create WorkerAction with stripped task pipe to reuse in both methods.
  5. Irrelevant to the ticket, so skipping it.
  6. Got the deserialized taskData beforehand and sent it to methods to avoid calling deserializeTaskData method twice. And deserialization is called on the taskData only if it is opted for payload batching which will be verified beforehand by checking the task pipe for batcher prefix.
  7. Created a reusable taskData template with document.subdocuments removed. This avoids re-copying unchanged top-level and document entries for every batch. And when whole taskData is needed we pass subdocument view to build it.
  8. Batch size is configurable now. Even if we face an issue with RabbitMQ frame, we can resolve it by configuring the appropriate batch size.
  9. It was used only in test. So, removed it from class and edited the test case to work without that method.
  10. Added the comment Must not be used after the underlying taskData map is mutated to avoid ConcurrentModificationException.
  11. Updated the release notes about the configurable batch size.
  12. Made changes to throw it as RunTimeException with message "Task pipe must not be empty after stripping batcher prefix."
  13. Imports are sorted in alphabetical order.
  14. The queue creation and sending message through queues are in different parts of code and we are deciding whether we want to split the payload or not in sending messages part, we need to propagate the task pipe as it is from DatabasePoller to QueueServices via WorkerAction object. I think it’s okay we have the stripping of prefix in both places.
  15. It is very unlikely the dependent jobs would have this problem. Updated the release notes that payload batching is for top-level jobs.
  16. Rebased the DEV branch.
  17. We don’t have constants for all these defined literals. So, it is better to have them as literals itself.
  18. Used Math.ceilDiv(totalItems, batchSize) to calculate batch size.

Also added a manual retry of three times for every batched message if the exception is transient. To avoid retrying batches from start.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants