Skip to content

fix: cleanup wal on eof in accumulator#3461

Merged
vaibhavtiwari33 merged 6 commits into
mainfrom
fix-eof-accum
Jun 17, 2026
Merged

fix: cleanup wal on eof in accumulator#3461
vaibhavtiwari33 merged 6 commits into
mainfrom
fix-eof-accum

Conversation

@yhl25

@yhl25 yhl25 commented Jun 8, 2026

Copy link
Copy Markdown
Contributor
  • Track closed windows which are yet to be GC'd for watermark propagation
  • Send closed window information in the close request so that the SDK's can send the window information in the eof response.

Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
@codecov

codecov Bot commented Jun 8, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 98.80952% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 82.88%. Comparing base (02489d3) to head (1072f39).

Files with missing lines Patch % Lines
...aflow-core/src/reduce/reducer/unaligned/reducer.rs 98.42% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3461      +/-   ##
==========================================
+ Coverage   82.86%   82.88%   +0.01%     
==========================================
  Files         308      308              
  Lines       78820    79024     +204     
==========================================
+ Hits        65318    65497     +179     
- Misses      12932    12960      +28     
+ Partials      570      567       -3     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@vigith

vigith commented Jun 8, 2026

Copy link
Copy Markdown
Member

should we merge to dev-xxx branch and give a test image for @tmenjo to test?

@tmenjo

tmenjo commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

@yhl25 @vigith Thank you for create a patch!

should we merge to dev-xxx branch and give a test image for @tmenjo to test?

Thank you for being considerate. That won't be necessary. I've fetched the fix-eof-accum branch and built my own image.

@yhl25

yhl25 commented Jun 9, 2026

Copy link
Copy Markdown
Contributor Author

@yhl25 @vigith Thank you for create a patch!

should we merge to dev-xxx branch and give a test image for @tmenjo to test?

Thank you for being considerate. That won't be necessary. I've fetched the fix-eof-accum branch and built my own image.

Also, in the accumulator can you make the below change to drop the unused messages once the stream has been ended(timeout), so that the watermark progresses and the WAL gets cleaned up.

self.logger.info('out of datums loop')

# The datums stream has ended (timeout or stream close). Any buffered source
# frames or inference results left here will never find a match, so drop them
# while carrying their watermark forward to progress the output watermark and
# release the remaining WAL state.
while self.sorted_source_frames:
    _, stale_datum = self.sorted_source_frames.popitem(index=0)
    await output.put(Message.to_drop(stale_datum))
while self.sorted_inference_results:
    _, stale_datum = self.sorted_inference_results.popitem(index=0)
    await output.put(Message.to_drop(stale_datum))
self.logger.info('dropped all unmatched buffered datums after datums loop')

You can use this numaflow-python branch to get the drop semantics for accumulator.

@tmenjo

tmenjo commented Jun 11, 2026

Copy link
Copy Markdown
Contributor

@yhl25 Thank you for your help.

I've run my pipeline fixed as yhl25 told me for 17 hours on Numaflow 685a84b and pynumaflow numaproj/numaflow-python@0b8e3e1 . However, the memory usage of the numa container (yellow line) have not reached a ceiling, as with the issue #3262.

image-20260611-001855

I don't know if it's related, but the memory usage looks to have periodicity. It bumps in every 140-150 minutes as far as I can see. The input video stream of my pipeline also has periodicity, but its cycle is different. It repeats a 46-second video file forever.

Debug logs are as follows. Sorry I don't know but they ended at 08:41:24.

@tmenjo

tmenjo commented Jun 12, 2026

Copy link
Copy Markdown
Contributor

I might have found a solution. My reduce merges a source frame and a inference result and outputs a merged datum. Here, two input datums but only one output. I think I should drop the other. I'm trying the following changes:

        # Output a message with fields (except value) restored from source
        await output.put(
            Message(
                keys=datum_source.keys,
                value=msgspec.msgpack.encode(payload_out),
                watermark=datum_source.watermark,
                event_time=datum_source.event_time,
                headers=datum_source.headers,
                id=datum_source.id,
            )
        )

        # Drop the leftover (inference datum) for watermark and WAL
        await output.put(Message.to_drop(datum_inference))  # <===== here

@vigith

vigith commented Jun 12, 2026

Copy link
Copy Markdown
Member

I might have found a solution. My reduce merges a source frame and a inference result and outputs a merged datum. Here, two input datums but only one output. I think I should drop the other.

Ideally the timeout should reap it via GC, @yhl25 wdyt?

@tmenjo

tmenjo commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

I still have the issue. I have tried the changes to my accumulator described above, but the memory usage of the numa container have not reached a ceiling. Here is that chart for the last 24 hours:

Image

@vigith

vigith commented Jun 15, 2026

Copy link
Copy Markdown
Member

I still have the issue. I have tried the changes to my accumulator described above, but the memory usage of the numa container have not reached a ceiling. Here is that chart for the last 24 hours:

Can you share the new logs?

@tmenjo

tmenjo commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Can you share the new logs?

Sorry but I forgot to collect log... I will run the test again today. Please stay tuned.

timeout: Duration,
/// Active windows mapped by combined key.
active_windows: Arc<RwLock<HashMap<String, WindowState>>>,
/// Closed windows mapped by combined key. These windows have been closed (sent to the SDK) but

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These windows have been closed (sent to the SDK)

nit: this should to be reworded.
This makes it sounds like only closed windows are sent to the SDK

@vaibhavtiwari33 vaibhavtiwari33 dismissed their stale review June 15, 2026 14:40

User's validation pending

@vaibhavtiwari33

Copy link
Copy Markdown
Contributor

@tmenjo Let me create a similar fix for the python SDK as well to help you test the with the correct state that the client expects (echo back of closed keyed window).

@tmenjo

tmenjo commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

@vigith Thank you for waiting. I've run my pipeline again and collected debug logs.

@tmenjo Let me create a similar fix for the python SDK as well to help you test the with the correct state that the client expects (echo back of closed keyed window).

Thank you so much! Please let me know when you update the Python SDK.

Chart

The yellow line is for the memory usage of the numa container.

image

Logs

Sorry but there are logs only for the first 4 and a half hours. (How I collect logs may be incorrect. I'll fix it next time.)

@vaibhavtiwari33

vaibhavtiwari33 commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Please let me know when you update the Python SDK.

@tmenjo I've created a PR for Python SDK: numaproj/numaflow-python#360
Please let me know if you're able to utilize this for your testing.

@tmenjo

tmenjo commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

@vaibhavtiwari33 Thanks! I'll fetch your pull-request branch and build my own pynumaflow image for my testing, but I have a question: Is the PR numaproj/numaflow-python#358 that @yhl25 told me on this thread also required for fixing the issue?

@vaibhavtiwari33

vaibhavtiwari33 commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Is the PR numaproj/numaflow-python#358 that @yhl25 told me on this thread also required for fixing the issue?

No, you don't need to add explicit drops when running numaproj/numaflow-python#360 SDK on #3461 (this PR)

@yhl25 yhl25 mentioned this pull request Jun 17, 2026
3 tasks
@yhl25 yhl25 marked this pull request as ready for review June 17, 2026 15:27
@yhl25 yhl25 requested review from vigith and whynowy as code owners June 17, 2026 15:27
@vaibhavtiwari33

Copy link
Copy Markdown
Contributor

Changes verified against both numaproj/numaflow-python#360 and numaproj/numaflow-rs#177

I see consistent memory/disk usage across 2-3 hours when the user code doesn't explicitly returns mapped responses and windows are closed after timeout.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 enabled auto-merge (squash) June 17, 2026 21:47
@vaibhavtiwari33 vaibhavtiwari33 merged commit 51747b2 into main Jun 17, 2026
27 checks passed
@vaibhavtiwari33 vaibhavtiwari33 deleted the fix-eof-accum branch June 17, 2026 22:21
vaibhavtiwari33 added a commit that referenced this pull request Jun 18, 2026
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Co-authored-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@tmenjo

tmenjo commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

@vigith @vaibhavtiwari33 I've run my pipeline for 24 hours on Numaflow nightly-20260618 (51747b2) and numaflow-python numaproj/numaflow-python@e6434ba. The memory usage of the numa container have been drastically improved, but unfortunately it have not reached a ceiling yet.

Screenshot

The yellow line is for the memory usage of the numa container. It looks like a sawtooth wave increasing gradually. I'd say this tendency can be seen on long-running test.

image

(The Time (X axis) is in UTC.)

Debug logs

Each timestamp is in UTC+9.

Environment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants