Skip to content

Conversation

@fresh-borzoni
Copy link
Contributor

@fresh-borzoni fresh-borzoni commented Jan 21, 2026

Linked issue: close #140

This PR implements remote log segment downloading for the Rust client with priority-based scheduling and concurrency limiting, matching the Java client's behavior.

Change log

  • Added RemoteLogDownloader: Background coordinator that manages concurrent downloads of remote log segments from S3/filesystem storage
  • Priority-based scheduling: Downloads prioritized by segment timestamp (oldest first), then offset, matching Java's PriorityBlockingQueue ordering
  • Two-layer concurrency control:
    • Concurrency limit (default: 3) - limits simultaneous downloads
    • Prefetch limit (default: 4) - limits memory usage from downloaded-but-not-consumed segments
  • Exponential backoff retry: Failed downloads retry with exponential backoff (100ms to 5s) and jitter
  • RAII resource management: Temp files and semaphore permits automatically cleaned up via Drop
  • Configuration: Added scanner_remote_log_prefetch_num and scanner_remote_log_download_threads config options matching Java client defaults
  • Protocol changes: Extended FetchLogRequest to include remote_log_fetch_info for remote segment metadata

API and Format

Configuration API additions:

  • Config::scanner_remote_log_prefetch_num (default: 4)
  • Config::scanner_remote_log_download_threads (default: 3)

No breaking changes to existing public APIs.

Note:
The Rust implementation uses an event-driven actor model with a single coordinator task that owns all download state, eliminating the need for locks.
Unlike Java's thread pool approach with blocking threads and synchronization, Rust uses
async tasks and a select! loop that reacts to events (NewRequest, DownloadFinished, RecycleNotification). Additionally, Rust implements automatic retry with exponential backoff (100ms to 5s with jitter) for failed downloads, it's different from Java, but we need it to not thrash the PQ as it would be stuck on the same prioritised failed task and try to make progress. Worth discussing though!

@fresh-borzoni fresh-borzoni force-pushed the prioriry-prefetch-remote-log branch 2 times, most recently from f183f8a to 39733e9 Compare January 21, 2026 00:35
@fresh-borzoni
Copy link
Contributor Author

@luoyuxia @leekeiabstraction @zhaohaidao
PTAL 🙏

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements remote log segment downloading with priority-based scheduling for the Rust client, matching the Java client's behavior. The implementation addresses issue #140 by adding a priority queue for downloading remote segments with concurrency controls.

Changes:

  • Added RemoteLogDownloader coordinator with event-driven architecture using priority queue (BinaryHeap) for download scheduling
  • Implemented two-layer concurrency control: download threads limit (default: 3) and prefetch limit (default: 4)
  • Added file-backed streaming for remote log segments to avoid loading entire files into memory, using RAII guards for cleanup
  • Extended protobuf schema to include max_timestamp field for priority ordering
  • Added exponential backoff retry logic for failed downloads (100ms to 5s with jitter)

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
crates/fluss/src/client/table/remote_log.rs Core implementation of RemoteLogDownloader with priority queue, coordinator loop, and retry logic
crates/fluss/src/record/arrow.rs Added LogRecordsSource trait with MemorySource and FileSource implementations for file-backed streaming
crates/fluss/src/client/table/log_fetch_buffer.rs Added RemoteCompletedFetch and RemotePendingFetch implementations
crates/fluss/src/proto/fluss_api.proto Added optional max_timestamp field to PbRemoteLogSegment
crates/fluss/src/config.rs Added scanner_remote_log_prefetch_num and scanner_remote_log_download_threads config options
crates/fluss/src/client/table/scanner.rs Updated LogFetcher initialization to pass config parameters
crates/fluss/src/client/connection.rs Added config() getter method
crates/fluss/src/client/table/mod.rs Exported new constants
crates/fluss/src/util/mod.rs Removed delete_file() function (replaced by RAII)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@fresh-borzoni fresh-borzoni force-pushed the prioriry-prefetch-remote-log branch from 39733e9 to bec7bd2 Compare January 21, 2026 02:31
@fresh-borzoni
Copy link
Contributor Author

rebased and fixed failing test

Cancelled,
}

/// Production implementation of RemoteLogFetcher that downloads from actual storage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exactly does "Production" mean here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it means that it has counterpart struct FakeFetcher used in testing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add PriorityQueue/control backpressure for Scanner remote segments download logic like in Java Client

2 participants