Skip to content

perf: Reduce overhead of flushing stdout stream for every tap record#3674

Draft
edgarrmondragon wants to merge 1 commit into
test/measure-perf-singer-stream-writefrom
perf/record-flush-overhead
Draft

perf: Reduce overhead of flushing stdout stream for every tap record#3674
edgarrmondragon wants to merge 1 commit into
test/measure-perf-singer-stream-writefrom
perf/record-flush-overhead

Conversation

@edgarrmondragon

@edgarrmondragon edgarrmondragon commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator

SSIA

Related

Summary by Sourcery

Enhancements:

  • Avoid flushing stdout after each record message in both msgspec and simple encoders, flushing only for non-record messages instead.

@edgarrmondragon edgarrmondragon self-assigned this Jun 11, 2026
@sourcery-ai

sourcery-ai Bot commented Jun 11, 2026

Copy link
Copy Markdown
Contributor
Reviewer's guide (collapsed on small PRs)

Reviewer's Guide

Optimizes stdout flushing in Singer SDK message writers by skipping flushes for high-volume RecordMessage instances while retaining flush behavior for other message types.

Sequence diagram for conditional stdout flushing in write_message

sequenceDiagram
    participant Writer
    participant sys_stdout

    Writer->>sys_stdout: write
    alt [message is RecordMessage]
        Note over Writer,sys_stdout: No flush for high-volume RecordMessage
    else [message is not RecordMessage]
        Writer->>sys_stdout: flush
    end
Loading

File-Level Changes

Change Details Files
Avoid flushing stdout on every record message to reduce I/O overhead while preserving flushes for non-record messages.
  • Extend msgspec encoding module to import RecordMessage type along with Message.
  • Update msgspec-based writer to write bytes to sys.stdout.buffer and only flush when the message is not a RecordMessage.
  • Update simple writer implementation to append a newline, write to sys.stdout, and only flush when the message is not a RecordMessage.
singer_sdk/contrib/msgspec.py
singer_sdk/singerlib/encoding/simple.py

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 1 issue, and left some high level feedback:

  • Since RecordMessage writes are now buffered without an immediate flush, consider adding a clear comment or helper function documenting where/when the final flush is expected to occur to avoid confusion about output ordering and buffering behavior.
  • The conditional flush logic is duplicated in both msgspec and simple writers; consider centralizing this behavior in a shared helper or base class method to keep the flush semantics consistent and easier to maintain.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- Since `RecordMessage` writes are now buffered without an immediate flush, consider adding a clear comment or helper function documenting where/when the final flush is expected to occur to avoid confusion about output ordering and buffering behavior.
- The conditional flush logic is duplicated in both `msgspec` and `simple` writers; consider centralizing this behavior in a shared helper or base class method to keep the flush semantics consistent and easier to maintain.

## Individual Comments

### Comment 1
<location path="singer_sdk/contrib/msgspec.py" line_range="120" />
<code_context>
         """
         sys.stdout.buffer.write(self.format_message(message))
-        sys.stdout.flush()
+        if not isinstance(message, RecordMessage):
+            sys.stdout.flush()
</code_context>
<issue_to_address>
**issue (bug_risk):** Changing flush behavior only for non-record messages may impact streaming semantics for some consumers.

Previously, all messages were flushed immediately; now `RecordMessage` output may sit in the buffer indefinitely unless another (non-record) message is written. For long-running processes or consumers that expect records to appear as soon as they’re written, this can change behavior (e.g., increased latency or seemingly stuck pipelines). If the goal is to batch records for performance, consider adding a configurable flush policy (e.g., count- or time-based) or an explicit API to force flushing so callers can opt into immediate delivery when needed.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

"""
sys.stdout.buffer.write(self.format_message(message))
sys.stdout.flush()
if not isinstance(message, RecordMessage):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Changing flush behavior only for non-record messages may impact streaming semantics for some consumers.

Previously, all messages were flushed immediately; now RecordMessage output may sit in the buffer indefinitely unless another (non-record) message is written. For long-running processes or consumers that expect records to appear as soon as they’re written, this can change behavior (e.g., increased latency or seemingly stuck pipelines). If the goal is to batch records for performance, consider adding a configurable flush policy (e.g., count- or time-based) or an explicit API to force flushing so callers can opt into immediate delivery when needed.

@codecov

codecov Bot commented Jun 11, 2026

Copy link
Copy Markdown

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.13%. Comparing base (f2b5a3e) to head (a66352b).

Additional details and impacted files
@@                          Coverage Diff                           @@
##           test/measure-perf-singer-stream-write    #3674   +/-   ##
======================================================================
  Coverage                                  94.12%   94.13%           
======================================================================
  Files                                         73       73           
  Lines                                       6200     6202    +2     
  Branches                                     762      764    +2     
======================================================================
+ Hits                                        5836     5838    +2     
  Misses                                       270      270           
  Partials                                      94       94           
Flag Coverage Δ
core 82.94% <40.00%> (-0.02%) ⬇️
end-to-end 76.03% <100.00%> (+<0.01%) ⬆️
optional-components 44.79% <20.00%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq

codspeed-hq Bot commented Jun 11, 2026

Copy link
Copy Markdown

Merging this PR will improve performance by 42.78%

⚡ 1 improved benchmark
✅ 10 untouched benchmarks

Performance Changes

Benchmark BASE HEAD Efficiency
test_bench_write_record_messages_msgspec 7.2 ms 5.1 ms +42.78%

Tip

Curious why this is faster? Comment @codspeedbot explain why this is faster on this PR, or directly use the CodSpeed MCP with your agent.


Comparing perf/record-flush-overhead (a66352b) with test/measure-perf-singer-stream-write (f2b5a3e)

Open in CodSpeed

@edgarrmondragon edgarrmondragon marked this pull request as draft June 11, 2026 05:18
@edgarrmondragon edgarrmondragon force-pushed the test/measure-perf-singer-stream-write branch from 358e96f to f2b5a3e Compare June 16, 2026 01:28
Signed-off-by: Edgar Ramírez Mondragón <edgarrm358@gmail.com>
@edgarrmondragon edgarrmondragon force-pushed the perf/record-flush-overhead branch from 0fcc284 to a66352b Compare June 16, 2026 01:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant