feat(infra): add SharedMemory IPC fast-path for RTensor#1133
feat(infra): add SharedMemory IPC fast-path for RTensor#1133guozhihao-224 wants to merge 2 commits intoinclusionAI:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a SharedMemory-based IPC fast-path for RTensor to optimize same-node tensor transfers, significantly reducing network overhead. Key changes include the addition of IPC helpers for encoding/decoding tensor metadata, partitioning fetch requests into local and remote groups, and implementing shared memory segment creation and cleanup. Feedback focuses on further optimizing memory usage by avoiding unnecessary copies during serialization and deserialization, addressing potential race conditions during shared memory segment creation, and clarifying the 'zero-copy' claim given the use of .clone() in the local fetch path.
| try: | ||
| shm = _SharedMemory(name=name, create=True, size=total_size) | ||
| except FileExistsError: | ||
| # Leftover from a previous run — unlink and recreate. | ||
| try: | ||
| old = _SharedMemory(name=name, create=False) | ||
| old.close() | ||
| old.unlink() | ||
| except FileNotFoundError: | ||
| pass | ||
| shm = _SharedMemory(name=name, create=True, size=total_size) |
There was a problem hiding this comment.
There is a potential race condition if multiple processes attempt to create a SharedMemory segment for the same shard_id simultaneously. Both might receive FileExistsError, and both might attempt to unlink() the segment that the other just created, leading to a failure on the subsequent create=True call. While shard_id is typically a UUID, this could be an issue if deterministic IDs are used or in high-concurrency scenarios. Consider handling FileExistsError more robustly or attempting to open the existing segment if creation fails.
| # which becomes invalid after shm.close() below | ||
| raw = torch.frombuffer( | ||
| shm.buf[header_size:], dtype=torch.uint8, count=nbytes | ||
| ).clone() |
There was a problem hiding this comment.
The PR description claims "zero-copy inter-process tensor transfer", but the use of .clone() here introduces a full memory copy. While this simplifies lifecycle management by allowing the SharedMemory segment to be closed immediately, it negates the performance benefits of zero-copy for large tensors. If true zero-copy is a goal, consider keeping the SharedMemory handle alive (e.g., in the _fetch_buffer) and returning a view. If the copy is intentional for safety, please update the PR description to reflect that it is a low-overhead IPC path rather than zero-copy.
| # Create SharedMemory segment for IPC access | ||
| _create_shm(shard_id, tensor) | ||
|
|
There was a problem hiding this comment.
Creating a shm segment for each tensor will incur a huge overhead. The better solution should be allocating a large buffer in advance for all tensors, not just tensors with the same shard id. We can use HTTP to communicate the buffer indices to read.
| # SharedMemory IPC handles: shard_id -> SharedMemory object | ||
| _shm_handles: dict[str, _SharedMemory] = {} |
There was a problem hiding this comment.
It's kind of ad-hoc if we use global variables to manage shm. We can abstract the functions and variables in a dedicated RTensorShmBuffer class and call the methods of the buffer. The buffer should self-manage its resources like exceptions, shm handle, and the lock.
| try: | ||
| shm = _SharedMemory(name=name, create=True, size=total_size) | ||
| except FileExistsError: | ||
| # Leftover from a previous run — unlink and recreate. | ||
| try: | ||
| old = _SharedMemory(name=name, create=False) | ||
| old.close() | ||
| old.unlink() | ||
| except FileNotFoundError: | ||
| pass | ||
| shm = _SharedMemory(name=name, create=True, size=total_size) |
There was a problem hiding this comment.
This exception handling is wierd because shard id or shm name is created from uuid.uuid4(), which should not duplicate across different runs. The current approach unlinks the previous shm and creates a new one, which is too brute-force.
| if not _storage_lock.acquire(timeout=2.0): | ||
| return |
There was a problem hiding this comment.
Use a separate lock to manage shm.
| try: | ||
| local_tensors = _fetch_local(local_shard_ids) | ||
| for (original_index, _), tensor in zip( | ||
| local_grouped, local_tensors, strict=True | ||
| ): | ||
| results[original_index] = tensor | ||
| except (KeyError, ValueError, struct.error): | ||
| # SharedMemory segment not found, corrupted header, or | ||
| # truncated buffer. Fall back to HTTP for these shards. | ||
| logger.debug("IPC fetch failed for local shards, falling back to HTTP") | ||
| for idx, shard in local_grouped: | ||
| remote_by_node[shard.node_addr].append((idx, shard)) |
There was a problem hiding this comment.
Should skip the fetched tensors and only use HTTP to fetch the remaining.
|
@garrett4wade Thank you for your review.I have two questions here:
slab_quotas = {
16 * MB: 16, # 256 MB
64 * MB: 8, # 512 MB
256 * MB: 2, # 512 MB
1024 * MB: 1, # 1 GB
} |
|
@guozhihao-224 Those are great questions!
|
Add RTensorShmPool with bump allocation for same-node tensor IPC, bypassing HTTP overhead when client and server share a machine. The pool is writer-owned with a reserve-write-publish protocol and automatic reclamation via try_reset() at shard removal. Key changes: - RTensorShmPool: pre-allocated shm segment with 64-byte aligned bump allocator, in-flight tracking, and _closing flag for safe concurrent shutdown - TensorShardInfo: pool metadata fields for direct IPC routing - HttpRTensorBackend: pool-aware store/fetch with fallback to HTTP - rpc_server: pool lifecycle with atexit + cleanup-hook dual safety - is_local_addr: cached local-address detection for IPC routing - Comprehensive unit and integration tests for pool lifecycle Refs: inclusionAI#1117
7bd8d3a to
4b43edf
Compare
|
@garrett4wade Hi, this PR has been updated based on the previous review feedback. Replaced per-tensor SharedMemory segments with a single pre-allocated pool (RTensorShmPool) using a bump allocator, as discussed. |
garrett4wade
left a comment
There was a problem hiding this comment.
I think the current implementation unnecessarily compliates the data transfer logic. We may defer this optimization until it really becomes a bottleneck in training.
| # Step 1: reserve space (under lock) | ||
| with self._lock: | ||
| if self._closing: | ||
| return False | ||
| aligned = (self._next_offset + 63) & ~63 | ||
| if aligned + nbytes > self._pool_size: | ||
| return False | ||
| self._next_offset = aligned + nbytes | ||
| self._in_flight += 1 |
There was a problem hiding this comment.
Is writing properly locked across different processes?
@garrett4wade Thank you for your review. I agree that doing so would increase complexity; let's postpone the optimization for now. |
Description
Add zero-copy inter-process tensor transfer using POSIX SharedMemory
for same-node RTensor shards. This bypasses HTTP overhead when the
client and server are on the same machine.
The IPC path is automatically used when
shard.node_addrpoints tolocalhost, 127.0.0.1, or the machine's hostname/IP, falling back to
HTTP for remote shards.
Key changes:
_store_local()with header encoding (dtype, shape)_fetch_local()to read tensors from shared memory segmentsis_local_addr()utility to detect local node addressesHttpRTensorBackend.fetch()now routes local shards via IPC, remote via HTTPRelated Issue
#1117
Type of Change
Checklist
jb build docs/gemini review)Breaking Change Details (if applicable):
N/A
Additional Context
Files changed:
areal/infra/rpc/rtensor.py: SharedMemory IPC implementation (+334 lines)areal/utils/network.py:is_local_addr()utility (+50 lines)tests/test_rtensor.py: Comprehensive IPC tests (+453 lines)Total: +802 lines, -35 lines