Skip to content

feat(infra): add SharedMemory IPC fast-path for RTensor#1133

Closed
guozhihao-224 wants to merge 2 commits intoinclusionAI:mainfrom
guozhihao-224:feat/rtensor-shm-ipc
Closed

feat(infra): add SharedMemory IPC fast-path for RTensor#1133
guozhihao-224 wants to merge 2 commits intoinclusionAI:mainfrom
guozhihao-224:feat/rtensor-shm-ipc

Conversation

@guozhihao-224
Copy link
Copy Markdown
Contributor

@guozhihao-224 guozhihao-224 commented Apr 2, 2026

Description

Add zero-copy inter-process tensor transfer using POSIX SharedMemory
for same-node RTensor shards. This bypasses HTTP overhead when the
client and server are on the same machine.

The IPC path is automatically used when shard.node_addr points to
localhost, 127.0.0.1, or the machine's hostname/IP, falling back to
HTTP for remote shards.

Key changes:

  • Add SharedMemory segment creation in _store_local() with header encoding (dtype, shape)
  • Add _fetch_local() to read tensors from shared memory segments
  • Add is_local_addr() utility to detect local node addresses
  • HttpRTensorBackend.fetch() now routes local shards via IPC, remote via HTTP
  • Add atexit cleanup to prevent shared memory leaks
  • Add comprehensive tests for IPC path and edge cases

Related Issue

#1117

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Test coverage improvement

Checklist

  • I have read the Contributing Guide
  • I have run formatting tools (pre-commit or manual)
  • I have run relevant unit tests and they pass
  • I have added tests for new functionality
  • I have updated documentation if needed
  • My branch is up to date with main
  • This PR introduces breaking changes (if yes, fill out details below)
  • If this PR changes documentation, I have built and previewed it locally with jb build docs
  • No critical issues raised by AI reviewers (/gemini review)

Breaking Change Details (if applicable):

N/A

Additional Context

Files changed:

  • areal/infra/rpc/rtensor.py: SharedMemory IPC implementation (+334 lines)
  • areal/utils/network.py: is_local_addr() utility (+50 lines)
  • tests/test_rtensor.py: Comprehensive IPC tests (+453 lines)

Total: +802 lines, -35 lines

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a SharedMemory-based IPC fast-path for RTensor to optimize same-node tensor transfers, significantly reducing network overhead. Key changes include the addition of IPC helpers for encoding/decoding tensor metadata, partitioning fetch requests into local and remote groups, and implementing shared memory segment creation and cleanup. Feedback focuses on further optimizing memory usage by avoiding unnecessary copies during serialization and deserialization, addressing potential race conditions during shared memory segment creation, and clarifying the 'zero-copy' claim given the use of .clone() in the local fetch path.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +718 to +728
try:
shm = _SharedMemory(name=name, create=True, size=total_size)
except FileExistsError:
# Leftover from a previous run — unlink and recreate.
try:
old = _SharedMemory(name=name, create=False)
old.close()
old.unlink()
except FileNotFoundError:
pass
shm = _SharedMemory(name=name, create=True, size=total_size)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is a potential race condition if multiple processes attempt to create a SharedMemory segment for the same shard_id simultaneously. Both might receive FileExistsError, and both might attempt to unlink() the segment that the other just created, leading to a failure on the subsequent create=True call. While shard_id is typically a UUID, this could be an issue if deterministic IDs are used or in high-concurrency scenarios. Consider handling FileExistsError more robustly or attempting to open the existing segment if creation fails.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment thread areal/infra/rpc/rtensor.py Outdated
# which becomes invalid after shm.close() below
raw = torch.frombuffer(
shm.buf[header_size:], dtype=torch.uint8, count=nbytes
).clone()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The PR description claims "zero-copy inter-process tensor transfer", but the use of .clone() here introduces a full memory copy. While this simplifies lifecycle management by allowing the SharedMemory segment to be closed immediately, it negates the performance benefits of zero-copy for large tensors. If true zero-copy is a goal, consider keeping the SharedMemory handle alive (e.g., in the _fetch_buffer) and returning a view. If the copy is intentional for safety, please update the PR description to reflect that it is a low-overhead IPC path rather than zero-copy.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +689 to +691
# Create SharedMemory segment for IPC access
_create_shm(shard_id, tensor)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a shm segment for each tensor will incur a huge overhead. The better solution should be allocating a large buffer in advance for all tensors, not just tensors with the same shard id. We can use HTTP to communicate the buffer indices to read.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +678 to +679
# SharedMemory IPC handles: shard_id -> SharedMemory object
_shm_handles: dict[str, _SharedMemory] = {}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of ad-hoc if we use global variables to manage shm. We can abstract the functions and variables in a dedicated RTensorShmBuffer class and call the methods of the buffer. The buffer should self-manage its resources like exceptions, shm handle, and the lock.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +716 to +726
try:
shm = _SharedMemory(name=name, create=True, size=total_size)
except FileExistsError:
# Leftover from a previous run — unlink and recreate.
try:
old = _SharedMemory(name=name, create=False)
old.close()
old.unlink()
except FileNotFoundError:
pass
shm = _SharedMemory(name=name, create=True, size=total_size)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This exception handling is wierd because shard id or shm name is created from uuid.uuid4(), which should not duplicate across different runs. The current approach unlinks the previous shm and creates a new one, which is too brute-force.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +829 to +830
if not _storage_lock.acquire(timeout=2.0):
return
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a separate lock to manage shm.

Comment thread areal/infra/rpc/rtensor.py Outdated
Comment on lines +322 to +333
try:
local_tensors = _fetch_local(local_shard_ids)
for (original_index, _), tensor in zip(
local_grouped, local_tensors, strict=True
):
results[original_index] = tensor
except (KeyError, ValueError, struct.error):
# SharedMemory segment not found, corrupted header, or
# truncated buffer. Fall back to HTTP for these shards.
logger.debug("IPC fetch failed for local shards, falling back to HTTP")
for idx, shard in local_grouped:
remote_by_node[shard.node_addr].append((idx, shard))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should skip the fetched tensors and only use HTTP to fetch the remaining.

@guozhihao-224
Copy link
Copy Markdown
Contributor Author

@garrett4wade Thank you for your review.I have two questions here:

  1. Can we utilize the existing RPC serialization mechanism of the TensorShardInfo dataclass to transmit pool metadata via RPC Piggyback, instead of the current HTTP-based index transmission, in order to improve overall efficiency and consistency?

  2. Does allocating a large buffer for all tensors beforehand mean implementing a partitioning algorithm? I'm not sure if I've misunderstood you. Such as :

slab_quotas = {
  16 * MB:  16,   # 256 MB
  64 * MB:   8,   # 512 MB
  256 * MB:  2,   # 512 MB
  1024 * MB: 1,   # 1 GB    
}

@garrett4wade
Copy link
Copy Markdown
Collaborator

@guozhihao-224 Those are great questions!

  1. I think your proposal is better—let’s go with that.

  2. I suggest we linearly scan the buffer when allocating tensors, and reset the pointer to zero if the next tensor would exceed the buffer size limit. Since tensors are released after each step, allocations within a batch should remain contiguous as long as the buffer is large enough. We can also add assertions to ensure that live tensors do not have overlapping slices.

Add RTensorShmPool with bump allocation for same-node tensor IPC,
bypassing HTTP overhead when client and server share a machine.
The pool is writer-owned with a reserve-write-publish protocol
and automatic reclamation via try_reset() at shard removal.

Key changes:
- RTensorShmPool: pre-allocated shm segment with 64-byte aligned
  bump allocator, in-flight tracking, and _closing flag for safe
  concurrent shutdown
- TensorShardInfo: pool metadata fields for direct IPC routing
- HttpRTensorBackend: pool-aware store/fetch with fallback to HTTP
- rpc_server: pool lifecycle with atexit + cleanup-hook dual safety
- is_local_addr: cached local-address detection for IPC routing
- Comprehensive unit and integration tests for pool lifecycle

Refs: inclusionAI#1117
@guozhihao-224
Copy link
Copy Markdown
Contributor Author

@garrett4wade Hi, this PR has been updated based on the previous review feedback. Replaced per-tensor SharedMemory segments with a single pre-allocated pool (RTensorShmPool) using a bump allocator, as discussed.

Copy link
Copy Markdown
Collaborator

@garrett4wade garrett4wade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current implementation unnecessarily compliates the data transfer logic. We may defer this optimization until it really becomes a bottleneck in training.

Comment on lines +187 to +195
# Step 1: reserve space (under lock)
with self._lock:
if self._closing:
return False
aligned = (self._next_offset + 63) & ~63
if aligned + nbytes > self._pool_size:
return False
self._next_offset = aligned + nbytes
self._in_flight += 1
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is writing properly locked across different processes?

@guozhihao-224
Copy link
Copy Markdown
Contributor Author

I think the current implementation unnecessarily compliates the data transfer logic. We may defer this optimization until it really becomes a bottleneck in training.

@garrett4wade Thank you for your review. I agree that doing so would increase complexity; let's postpone the optimization for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants