fix: Use atomic for degree_list and use memory fence to ensure thread safety#386
Open
zhanglei1949 wants to merge 4 commits into
Open
fix: Use atomic for degree_list and use memory fence to ensure thread safety#386zhanglei1949 wants to merge 4 commits into
zhanglei1949 wants to merge 4 commits into
Conversation
…sistency in generic view
Contributor
There was a problem hiding this comment.
Pull request overview
This PR aims to fix the reported concurrent mutable-CSR reallocation race (Issue #381) by making per-vertex degree reads/writes atomic and using acquire/release ordering so lock-free readers can safely traverse adjacency buffers while writers insert edges and trigger reallocations.
Changes:
- Refactors mutable CSR degree storage access to use atomic operations in writer paths and reader views.
- Updates
GenericView/TypedViewneighbor iteration to snapshot degree with acquire loads. - Adds concurrent read/write regression tests for
get_edges()andforeach_nbr_lt().
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
tests/storage/test_mutable_csr.cc |
Adds multithreaded regression tests intended to catch torn degree/buffer snapshot races. |
src/storages/csr/mutable_csr.cc |
Converts degree array accesses to atomic loads/stores in multiple mutation/maintenance code paths. |
include/neug/storages/csr/mutable_csr.h |
Updates put_edge and related helpers to use atomic degree operations and adjusts vertex capacity computation. |
include/neug/storages/csr/generic_view.h |
Uses atomic degree loads in get_edges() and typed neighbor iteration to avoid torn reads during concurrent writes. |
Comments suppressed due to low confidence (1)
include/neug/storages/csr/mutable_csr.h:146
- The degree is incremented via fetch_add(memory_order_release) before the new neighbor entry is fully written. A concurrent reader that bounds iteration by degree (acquire load) can observe the increased degree and read an uninitialized/partially-written slot. Reserve an index, write neighbor/data/timestamp, then publish the new degree with a release store (or otherwise ensure the release operation occurs after the entry is initialized).
int32_t prev_size = sizes[src].fetch_add(1, std::memory_order_release);
auto& nbr = buffers[src][prev_size];
nbr.neighbor = dst;
nbr.data = data;
nbr.timestamp.store(ts);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return 0; | ||
| } | ||
| return degree_list_->GetDataSize() / sizeof(int); | ||
| return degree_list_->GetDataSize() / sizeof(std::atomic<int>); |
Comment on lines
220
to
+224
| adj_list_buffer_->Resize(vnum * sizeof(nbr_t*)); | ||
| degree_list_->Resize(vnum * sizeof(int)); | ||
| cap_list_->Resize(vnum * sizeof(int)); | ||
| auto** buf_arr = reinterpret_cast<nbr_t**>(adj_list_buffer_->GetData()); | ||
| auto* sz_arr = reinterpret_cast<int*>(degree_list_->GetData()); | ||
| auto* sz_arr = reinterpret_cast<std::atomic<int>*>(degree_list_->GetData()); |
Comment on lines
+515
to
518
| auto& nbr = buf_arr[src][sz_arr[src].fetch_add(1, std::memory_order_relaxed)]; | ||
| nbr.neighbor = dst; | ||
| nbr.data = data; | ||
| nbr.timestamp.store(ts); |
Comment on lines
+409
to
+414
| // Atomic loads for torn-read safety: snapshot degree and buffer pointer | ||
| const int deg = reinterpret_cast<const std::atomic<int>*>(degrees)[v].load( | ||
| std::memory_order_acquire); | ||
| const MutableNbr<T>* base = adjlists[v]; | ||
| const MutableNbr<T>* ptr = base + deg - 1; | ||
| const MutableNbr<T>* end = base - 1; |
| // Atomic load for torn-read safety: snapshot degree | ||
| const int deg = reinterpret_cast<const std::atomic<int>*>(degrees)[v].load( | ||
| std::memory_order_acquire); | ||
| const MutableNbr<T>* base = adjlists[v]; |
Comment on lines
616
to
620
| const char* start_ptr = reinterpret_cast<const char*>( | ||
| reinterpret_cast<const int64_t*>(adjlists_)[v]); | ||
| ret.start_ptr = start_ptr; | ||
| ret.end_ptr = start_ptr + degrees_[v] * cfg_.stride; | ||
| ret.end_ptr = start_ptr + deg * cfg_.stride; | ||
| } |
Comment on lines
+694
to
+695
| // This is a regression test for the torn-read bug fixed by using | ||
| // std::atomic_ref in put_edge / GenericView / TypedView. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
FIx #381
This pull request refactors the mutable CSR (Compressed Sparse Row) graph storage implementation to use
std::atomic<int>for vertex degree storage, ensuring thread safety and preventing torn reads/writes in concurrent environments. The changes update all relevant code paths to use atomic operations, with appropriate memory orderings, and adjust buffer management accordingly.The most important changes are:
Thread Safety and Atomic Degree Management:
intdegree arrays withstd::atomic<int>in both the in-memory representation and all access/manipulation code, ensuring atomicity and visibility guarantees for concurrent readers and writers. All reads and writes to degrees now use explicit memory orderings (acquirefor reads,releasefor writes, andrelaxedwhere appropriate). [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12] [13] [14] [15] [16] [17] [18]Reader/Writer Synchronization:
Buffer Management Adjustments:
Test and Include Updates:
<atomic>,<thread>, and<vector>in test files to support atomic operations and potential multithreaded tests.Overall, these changes make the mutable CSR implementation safe for concurrent modifications and queries, which is crucial for high-performance, multi-threaded graph processing.