Skip to content
Draft

1 #168

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5e005f7
add case
sunkaixuan2018 Mar 2, 2026
0d9c9f6
fix(gemm_gather): inline gather logic in kernel_entry for a2a3 set_fl…
sunkaixuan2018 Mar 2, 2026
d80a83d
fix(gemm_gather): golden.py use numel/item for torch tensors from cod…
sunkaixuan2018 Mar 2, 2026
59f714e
创建多卡计算的case和修改启动入口
sunkaixuan2018 Mar 4, 2026
8f12f18
fix: 多卡子进程传入 --n-devices 1 避免递归 spawn,并添加批量结束 Python 进程脚本
sunkaixuan2018 Mar 4, 2026
97de768
feat: 编译与运行分离,多卡 multi_bgemm 编译一次并行运行
sunkaixuan2018 Mar 4, 2026
48c0bc2
refactor: 多卡改用 ProcessPoolExecutor + CodeRunner.run() 替代 subprocess
sunkaixuan2018 Mar 4, 2026
05ecec7
feat(cpt_and_comm): add compute-then-comm multi-card example
sunkaixuan2018 Mar 5, 2026
e7b22cb
feat(hccl): C++ helper lib for HCCL, Python calls via ctypes
sunkaixuan2018 Mar 5, 2026
0447fdd
docs: use setenv.bash path for CANN env in hccl_helper
sunkaixuan2018 Mar 5, 2026
05964ba
fix(hccl_helper): align types with HcclRootInfo/HcclCommInitRootInfo
sunkaixuan2018 Mar 5, 2026
449a89d
fix(hccl_bindings): copy root_info bytes via memmove
sunkaixuan2018 Mar 5, 2026
8317279
fix(multi_card_run_example): use unsigned char for shared HcclRootInfo
sunkaixuan2018 Mar 5, 2026
e4dfcf3
fix(cpt_and_comm/golden): write gather result via numpy view on torch…
sunkaixuan2018 Mar 5, 2026
8379f36
chore(debug): log first values and dump actual/golden npy for cpt_and…
sunkaixuan2018 Mar 5, 2026
b73be79
feat(hccl_helper): support RING topology like pto-comm-isa
sunkaixuan2018 Mar 5, 2026
4159e1d
fix(hccl_helper): use HcclDeviceContext.rankId for local window base
sunkaixuan2018 Mar 5, 2026
c8ed4b4
chore(cpt_and_comm): log gather mismatch but do not fail case
sunkaixuan2018 Mar 5, 2026
3ccca30
feat(cpt_and_comm): split win_base into win_in/out_base, add dcci cac…
sunkaixuan2018 Mar 9, 2026
ac4f7fa
fix(compare): print actual and expected as chunked lists on mismatch …
sunkaixuan2018 Mar 9, 2026
0dac5b3
feat(cpt_and_comm): split compute/comm into two phases with HcclBarri…
sunkaixuan2018 Mar 9, 2026
93d13b0
feat(cpt_and_comm): replace two-phase host barrier with single-launch…
sunkaixuan2018 Mar 9, 2026
6e3b63b
feat: add mega_kernel_comm example (paged attention + TGATHER in sing…
sunkaixuan2018 Mar 9, 2026
7edaaf5
Allgather
Crane-Liu Mar 10, 2026
fe73ad4
tensormap_comm
Crane-Liu Mar 11, 2026
9a48301
modify_allgather
Crane-Liu Mar 12, 2026
4218de5
Merge pull request #1 from sunkaixuan2018/Kernel_Allgather
Crane-Liu Mar 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ outputs

# Mid-work documentation
.docs

# Debug dumps (generated locally)
examples/host_build_graph/cpt_and_comm/debug/
18 changes: 18 additions & 0 deletions examples/host_build_graph/allgather_Manual/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# AllGather (Manual RDMA 实现)

多卡 AllGather 通信算子:使用 **直接 RDMA 读取**(HcclRemotePtr + TLOAD/TSTORE)实现。每个 rank 获得所有 rank 数据的拼接结果。

**实现方式**:`WindowMemCopyIn -> CommBarrier -> AllGatherManual -> WindowMemCopyOut -> CommBarrier(post)`

- 无 TGATHER 集体调用,所有 rank 并行执行
- 适用于性能对比测试

## 运行

```bash
python examples/scripts/multi_card_run_example.py \
-k examples/host_build_graph/allgather_Manual/kernels \
-g examples/host_build_graph/allgather_Manual/golden.py
```

需要设置 `PTO_COMM_ISA_ROOT` 指向 pto-comm-isa 根目录,以及多卡 HCCL 环境。
59 changes: 59 additions & 0 deletions examples/host_build_graph/allgather_Manual/golden.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Golden reference for AllGather (Manual RDMA variant, no compute).

Each rank contributes GATHER_COUNT float32 elements.
After AllGather, EVERY rank holds the concatenation of all ranks' data.
"""

import ctypes
import numpy as np

GATHER_COUNT = 64

ALL_CASES = {"Default": {}}
DEFAULT_CASE = "Default"
__outputs__ = ["out"]
RTOL = 1e-4
ATOL = 1e-4


def generate_inputs(params: dict) -> list:
"""Return flat argument list. For requires_comm, params includes device_ctx_ptr, win_in_base, win_out_base, n_ranks, root, rank_id."""
rank_id = params.get("rank_id", 0)
n_ranks = params.get("n_ranks", 2)
root = params.get("root", 0)

np.random.seed(42 + rank_id)
src = np.random.randn(GATHER_COUNT).astype(np.float32) * 0.1
out = np.zeros((n_ranks * GATHER_COUNT,), dtype=np.float32)

result = [
("src", src),
("out", out),
("size_src", ctypes.c_int64(src.nbytes)),
("size_out", ctypes.c_int64(out.nbytes)),
]

if "device_ctx_ptr" in params and "win_in_base" in params and "win_out_base" in params:
result.extend([
("device_ctx_ptr", ctypes.c_uint64(params["device_ctx_ptr"])),
("win_in_base", ctypes.c_uint64(params["win_in_base"])),
("win_out_base", ctypes.c_uint64(params["win_out_base"])),
("n_ranks", ctypes.c_int32(n_ranks)),
("root", ctypes.c_int32(root)),
("rank_id", ctypes.c_int32(rank_id)),
])

return result


def compute_golden(tensors: dict, params: dict) -> None:
"""AllGather: every rank gets the full concatenation of all ranks' data."""
n_ranks = params.get("n_ranks", 2)
out = tensors["out"]

out_np = out.cpu().numpy() if hasattr(out, 'cpu') else np.asarray(out)
for r in range(n_ranks):
np.random.seed(42 + r)
src_r = np.random.randn(GATHER_COUNT).astype(np.float32) * 0.1
out_np[r * GATHER_COUNT : (r + 1) * GATHER_COUNT] = src_r[:GATHER_COUNT]
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Manual AllGather kernel - direct RDMA reads, no TGATHER.
*
* Each rank independently reads from all ranks' win_src via HcclRemotePtr
* and writes to local dst. No collective TGATHER call, so no deadlock.
* All ranks can run in parallel (single kernel, single barrier).
*
* Args: dst, src, ctx, nranks, rank_id (rank_id unused, for API compatibility)
*/

#include <cstdint>
#include <pto/pto-inst.hpp>
#include <pto/common/pto_tile.hpp>
#include "hccl_context.h"
#include "hccl_helpers.h"

using namespace pto;

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

static constexpr size_t GATHER_COUNT = 64;

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ float* dst = reinterpret_cast<__gm__ float*>(args[0]);
__gm__ float* src = reinterpret_cast<__gm__ float*>(args[1]);
__gm__ HcclDeviceContext* hcclCtx = reinterpret_cast<__gm__ HcclDeviceContext*>(args[2]);
int nranks = static_cast<int>(args[3]);
(void)args[4]; /* rank_id unused */

using ShapeDyn = pto::Shape<pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC>;
using StrideDyn = pto::Stride<pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC, pto::DYNAMIC>;
using Global = pto::GlobalTensor<float, ShapeDyn, StrideDyn, pto::Layout::ND>;
using TileData = pto::Tile<pto::TileType::Vec, float, 1, GATHER_COUNT, pto::BLayout::RowMajor, -1, -1>;

ShapeDyn sliceShape(1, 1, 1, 1, GATHER_COUNT);
StrideDyn sliceStride(GATHER_COUNT, GATHER_COUNT, GATHER_COUNT, GATHER_COUNT, 1);

TileData ubTile(1, GATHER_COUNT);
TASSIGN(ubTile, 0x0);

int actual_nranks = (nranks > 16) ? 16 : nranks;
for (int r = 0; r < actual_nranks; ++r) {
__gm__ float* remote_src = HcclRemotePtr(hcclCtx, src, r);
__gm__ float* local_dst = dst + static_cast<ptrdiff_t>(r) * GATHER_COUNT;

Global srcG(remote_src, sliceShape, sliceStride);
Global dstG(local_dst, sliceShape, sliceStride);

TLOAD(ubTile, srcG);
set_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0);
wait_flag(PIPE_MTE2, PIPE_MTE3, EVENT_ID0);
TSTORE(dstG, ubTile);
set_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0);
wait_flag(PIPE_MTE3, PIPE_MTE2, EVENT_ID0);
}

pipe_barrier(PIPE_ALL);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* All-to-all barrier (多对多): every rank waits for every other rank.
*
* Used by AllGather where every rank reads from all ranks' windows.
* Unlike comm_barrier_kernel (many-to-one), ALL ranks do TWAIT here.
*
* Flow:
* 1. Each rank TNOTIFY to root's barrier slot[my_rank]
* 2. Each rank TWAIT on root's barrier until all n_ranks slots >= 1
*
* Args:
* args[0] = barrier_base (local barrier buffer; root's is used for sync)
* args[1] = device_ctx_ptr (HcclDeviceContext*)
* args[2] = n_ranks
* args[3] = root (whose barrier buffer is the sync point)
*/

#include <cstdint>
#include <pto/pto-inst.hpp>
#include <pto/comm/pto_comm_inst.hpp>
#include "hccl_context.h"
#include "hccl_helpers.h"

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ int32_t* local_barrier = reinterpret_cast<__gm__ int32_t*>(args[0]);
__gm__ HcclDeviceContext* ctx = reinterpret_cast<__gm__ HcclDeviceContext*>(args[1]);
int n_ranks = static_cast<int>(args[2]);
int root = static_cast<int>(args[3]);
int my_rank = static_cast<int>(ctx->rankId);

// Step 1: Each rank writes flag=1 to root's barrier slot[my_rank] via RDMA.
__gm__ int32_t* remote_slot = HcclRemotePtr(ctx, local_barrier, root) + my_rank;
pto::comm::Signal sig(remote_slot);
pto::comm::TNOTIFY(sig, 1, pto::comm::NotifyOp::Set);

// Step 2: ALL ranks wait until every rank's flag is >= 1 (multi-to-multi).
__gm__ int32_t* root_barrier = HcclRemotePtr(ctx, local_barrier, root);
for (int i = 0; i < n_ranks; ++i) {
pto::comm::Signal slot(root_barrier + i);
pto::comm::TWAIT(slot, 1, pto::comm::WaitCmp::GE);
}

pipe_barrier(PIPE_ALL);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* WindowMemCopyIn: Copy device buffer to HCCL window.
* Used before AllGather so remote ranks can read.
*/

#include <cstdint>
#include <pto/pto-inst.hpp>

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ float* win_dst = reinterpret_cast<__gm__ float*>(args[0]);
__gm__ float* dev_src = reinterpret_cast<__gm__ float*>(args[1]);
int count = static_cast<int>(args[2]);

for (int i = 0; i < count; ++i) {
win_dst[i] = dev_src[i];
}
pipe_barrier(PIPE_ALL);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* WindowMemCopyOut: Copy HCCL window to device buffer.
* After AllGather, every rank copies gathered result to device.
*/

#include <cstdint>
#include <pto/pto-inst.hpp>

#ifndef __gm__
#define __gm__
#endif

#ifndef __aicore__
#define __aicore__ [aicore]
#endif

extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) {
__gm__ float* dev_dst = reinterpret_cast<__gm__ float*>(args[0]);
__gm__ float* win_src = reinterpret_cast<__gm__ float*>(args[1]);
int count = static_cast<int>(args[2]);

for (int i = 0; i < count; ++i) {
dev_dst[i] = win_src[i];
}
pipe_barrier(PIPE_ALL);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
AllGather (Manual): direct RDMA reads for performance comparison.

Flow: WindowMemCopyIn -> CommBarrier -> AllGatherManual (HcclRemotePtr+TLOAD/TSTORE)
-> WindowMemCopyOut -> CommBarrier(post).
No TGATHER, all ranks run in parallel. Requires HCCL, PTO_COMM_ISA_ROOT.
"""

from pathlib import Path

_KERNELS_ROOT = Path(__file__).parent

ORCHESTRATION = {
"source": str(_KERNELS_ROOT / "orchestration" / "allgather_orch.cpp"),
"function_name": "build_allgather_graph",
}

KERNELS = [
{"func_id": 0, "name": "WindowMemCopyIn", "source": str(_KERNELS_ROOT / "aiv" / "window_memcopy_in.cpp"), "core_type": "aiv"},
{"func_id": 1, "name": "AllGatherManual", "source": str(_KERNELS_ROOT / "aiv" / "allgather_manual_kernel.cpp"), "core_type": "aiv"},
{"func_id": 2, "name": "WindowMemCopyOut", "source": str(_KERNELS_ROOT / "aiv" / "window_memcopy_out.cpp"), "core_type": "aiv"},
{"func_id": 3, "name": "CommBarrierAll", "source": str(_KERNELS_ROOT / "aiv" / "comm_barrier_all_kernel.cpp"), "core_type": "aiv"},
]

RUNTIME_CONFIG = {
"runtime": "host_build_graph",
"n_devices": 2,
"first_device_id": 0,
"requires_comm": True,
}
Loading