fix: cleanup wal on eof in accumulator#3461
Conversation
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3461 +/- ##
==========================================
+ Coverage 82.86% 82.88% +0.01%
==========================================
Files 308 308
Lines 78820 79024 +204
==========================================
+ Hits 65318 65497 +179
- Misses 12932 12960 +28
+ Partials 570 567 -3 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
should we merge to dev-xxx branch and give a test image for @tmenjo to test? |
Also, in the accumulator can you make the below change to drop the unused messages once the stream has been ended(timeout), so that the watermark progresses and the WAL gets cleaned up. self.logger.info('out of datums loop')
# The datums stream has ended (timeout or stream close). Any buffered source
# frames or inference results left here will never find a match, so drop them
# while carrying their watermark forward to progress the output watermark and
# release the remaining WAL state.
while self.sorted_source_frames:
_, stale_datum = self.sorted_source_frames.popitem(index=0)
await output.put(Message.to_drop(stale_datum))
while self.sorted_inference_results:
_, stale_datum = self.sorted_inference_results.popitem(index=0)
await output.put(Message.to_drop(stale_datum))
self.logger.info('dropped all unmatched buffered datums after datums loop')You can use this numaflow-python branch to get the drop semantics for accumulator. |
…s from accum Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com>
|
@yhl25 Thank you for your help. I've run my pipeline fixed as yhl25 told me for 17 hours on Numaflow 685a84b and pynumaflow numaproj/numaflow-python@0b8e3e1 . However, the memory usage of the numa container (yellow line) have not reached a ceiling, as with the issue #3262.
I don't know if it's related, but the memory usage looks to have periodicity. It bumps in every 140-150 minutes as far as I can see. The input video stream of my pipeline also has periodicity, but its cycle is different. It repeats a 46-second video file forever. Debug logs are as follows. Sorry I don't know but they ended at 08:41:24. |
|
I might have found a solution. My reduce merges a source frame and a inference result and outputs a merged datum. Here, two input datums but only one output. I think I should drop the other. I'm trying the following changes: # Output a message with fields (except value) restored from source
await output.put(
Message(
keys=datum_source.keys,
value=msgspec.msgpack.encode(payload_out),
watermark=datum_source.watermark,
event_time=datum_source.event_time,
headers=datum_source.headers,
id=datum_source.id,
)
)
# Drop the leftover (inference datum) for watermark and WAL
await output.put(Message.to_drop(datum_inference)) # <===== here |
Ideally the timeout should reap it via GC, @yhl25 wdyt? |
Can you share the new logs? |
Sorry but I forgot to collect log... I will run the test again today. Please stay tuned. |
| timeout: Duration, | ||
| /// Active windows mapped by combined key. | ||
| active_windows: Arc<RwLock<HashMap<String, WindowState>>>, | ||
| /// Closed windows mapped by combined key. These windows have been closed (sent to the SDK) but |
There was a problem hiding this comment.
These windows have been closed (sent to the SDK)
nit: this should to be reworded.
This makes it sounds like only closed windows are sent to the SDK
|
@tmenjo Let me create a similar fix for the python SDK as well to help you test the with the correct state that the client expects (echo back of closed keyed window). |
|
@vigith Thank you for waiting. I've run my pipeline again and collected debug logs.
Thank you so much! Please let me know when you update the Python SDK. ChartThe yellow line is for the memory usage of the numa container.
LogsSorry but there are logs only for the first 4 and a half hours. (How I collect logs may be incorrect. I'll fix it next time.) |
@tmenjo I've created a PR for Python SDK: numaproj/numaflow-python#360 |
|
@vaibhavtiwari33 Thanks! I'll fetch your pull-request branch and build my own pynumaflow image for my testing, but I have a question: Is the PR numaproj/numaflow-python#358 that @yhl25 told me on this thread also required for fixing the issue? |
No, you don't need to add explicit drops when running numaproj/numaflow-python#360 SDK on #3461 (this PR) |
|
Changes verified against both numaproj/numaflow-python#360 and numaproj/numaflow-rs#177 I see consistent memory/disk usage across 2-3 hours when the user code doesn't explicitly returns mapped responses and windows are closed after timeout. |
Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
Signed-off-by: Yashash Lokesh <yashashhl25@gmail.com> Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com> Co-authored-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>




Uh oh!
There was an error while loading. Please reload this page.