-
Notifications
You must be signed in to change notification settings - Fork 22
[TASK-140] Priority Queue for downloading remote segments #187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[TASK-140] Priority Queue for downloading remote segments #187
Conversation
f183f8a to
39733e9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements remote log segment downloading with priority-based scheduling for the Rust client, matching the Java client's behavior. The implementation addresses issue #140 by adding a priority queue for downloading remote segments with concurrency controls.
Changes:
- Added RemoteLogDownloader coordinator with event-driven architecture using priority queue (BinaryHeap) for download scheduling
- Implemented two-layer concurrency control: download threads limit (default: 3) and prefetch limit (default: 4)
- Added file-backed streaming for remote log segments to avoid loading entire files into memory, using RAII guards for cleanup
- Extended protobuf schema to include max_timestamp field for priority ordering
- Added exponential backoff retry logic for failed downloads (100ms to 5s with jitter)
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/fluss/src/client/table/remote_log.rs | Core implementation of RemoteLogDownloader with priority queue, coordinator loop, and retry logic |
| crates/fluss/src/record/arrow.rs | Added LogRecordsSource trait with MemorySource and FileSource implementations for file-backed streaming |
| crates/fluss/src/client/table/log_fetch_buffer.rs | Added RemoteCompletedFetch and RemotePendingFetch implementations |
| crates/fluss/src/proto/fluss_api.proto | Added optional max_timestamp field to PbRemoteLogSegment |
| crates/fluss/src/config.rs | Added scanner_remote_log_prefetch_num and scanner_remote_log_download_threads config options |
| crates/fluss/src/client/table/scanner.rs | Updated LogFetcher initialization to pass config parameters |
| crates/fluss/src/client/connection.rs | Added config() getter method |
| crates/fluss/src/client/table/mod.rs | Exported new constants |
| crates/fluss/src/util/mod.rs | Removed delete_file() function (replaced by RAII) |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
39733e9 to
bec7bd2
Compare
|
rebased and fixed failing test |
| Cancelled, | ||
| } | ||
|
|
||
| /// Production implementation of RemoteLogFetcher that downloads from actual storage |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What exactly does "Production" mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it means that it has counterpart struct FakeFetcher used in testing
Linked issue: close #140
This PR implements remote log segment downloading for the Rust client with priority-based scheduling and concurrency limiting, matching the Java client's behavior.
Change log
RemoteLogDownloader: Background coordinator that manages concurrent downloads of remote log segments from S3/filesystem storagePriorityBlockingQueueorderingscanner_remote_log_prefetch_numandscanner_remote_log_download_threadsconfig options matching Java client defaultsFetchLogRequestto includeremote_log_fetch_infofor remote segment metadataAPI and Format
Configuration API additions:
Config::scanner_remote_log_prefetch_num(default: 4)Config::scanner_remote_log_download_threads(default: 3)No breaking changes to existing public APIs.
Note:
The Rust implementation uses an event-driven actor model with a single coordinator task that owns all download state, eliminating the need for locks.
Unlike Java's thread pool approach with blocking threads and synchronization, Rust uses
async tasks and a
select!loop that reacts to events (NewRequest, DownloadFinished, RecycleNotification). Additionally, Rust implements automatic retry with exponential backoff (100ms to 5s with jitter) for failed downloads, it's different from Java, but we need it to not thrash the PQ as it would be stuck on the same prioritised failed task and try to make progress. Worth discussing though!