perf: Reduce overhead of flushing stdout stream for every tap record#3674
perf: Reduce overhead of flushing stdout stream for every tap record#3674edgarrmondragon wants to merge 1 commit into
Conversation
Reviewer's guide (collapsed on small PRs)Reviewer's GuideOptimizes stdout flushing in Singer SDK message writers by skipping flushes for high-volume RecordMessage instances while retaining flush behavior for other message types. Sequence diagram for conditional stdout flushing in write_messagesequenceDiagram
participant Writer
participant sys_stdout
Writer->>sys_stdout: write
alt [message is RecordMessage]
Note over Writer,sys_stdout: No flush for high-volume RecordMessage
else [message is not RecordMessage]
Writer->>sys_stdout: flush
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- Since
RecordMessagewrites are now buffered without an immediate flush, consider adding a clear comment or helper function documenting where/when the final flush is expected to occur to avoid confusion about output ordering and buffering behavior. - The conditional flush logic is duplicated in both
msgspecandsimplewriters; consider centralizing this behavior in a shared helper or base class method to keep the flush semantics consistent and easier to maintain.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Since `RecordMessage` writes are now buffered without an immediate flush, consider adding a clear comment or helper function documenting where/when the final flush is expected to occur to avoid confusion about output ordering and buffering behavior.
- The conditional flush logic is duplicated in both `msgspec` and `simple` writers; consider centralizing this behavior in a shared helper or base class method to keep the flush semantics consistent and easier to maintain.
## Individual Comments
### Comment 1
<location path="singer_sdk/contrib/msgspec.py" line_range="120" />
<code_context>
"""
sys.stdout.buffer.write(self.format_message(message))
- sys.stdout.flush()
+ if not isinstance(message, RecordMessage):
+ sys.stdout.flush()
</code_context>
<issue_to_address>
**issue (bug_risk):** Changing flush behavior only for non-record messages may impact streaming semantics for some consumers.
Previously, all messages were flushed immediately; now `RecordMessage` output may sit in the buffer indefinitely unless another (non-record) message is written. For long-running processes or consumers that expect records to appear as soon as they’re written, this can change behavior (e.g., increased latency or seemingly stuck pipelines). If the goal is to batch records for performance, consider adding a configurable flush policy (e.g., count- or time-based) or an explicit API to force flushing so callers can opt into immediate delivery when needed.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| """ | ||
| sys.stdout.buffer.write(self.format_message(message)) | ||
| sys.stdout.flush() | ||
| if not isinstance(message, RecordMessage): |
There was a problem hiding this comment.
issue (bug_risk): Changing flush behavior only for non-record messages may impact streaming semantics for some consumers.
Previously, all messages were flushed immediately; now RecordMessage output may sit in the buffer indefinitely unless another (non-record) message is written. For long-running processes or consumers that expect records to appear as soon as they’re written, this can change behavior (e.g., increased latency or seemingly stuck pipelines). If the goal is to batch records for performance, consider adding a configurable flush policy (e.g., count- or time-based) or an explicit API to force flushing so callers can opt into immediate delivery when needed.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## test/measure-perf-singer-stream-write #3674 +/- ##
======================================================================
Coverage 94.12% 94.13%
======================================================================
Files 73 73
Lines 6200 6202 +2
Branches 762 764 +2
======================================================================
+ Hits 5836 5838 +2
Misses 270 270
Partials 94 94
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Merging this PR will improve performance by 42.78%
Performance Changes
Tip Curious why this is faster? Comment Comparing |
358e96f to
f2b5a3e
Compare
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
0fcc284 to
a66352b
Compare
SSIA
Related
Summary by Sourcery
Enhancements: