From b9cca1eddfeeaec09a99cd51d594839cd551b0f4 Mon Sep 17 00:00:00 2001 From: glukas Date: Mon, 30 Mar 2026 18:12:48 +0200 Subject: [PATCH 1/3] Allreduce --- samples/spatial/collectives/allreduce_1D.sptl | 89 ++++++++++ samples/spatial/collectives/allreduce_2D.sptl | 153 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 samples/spatial/collectives/allreduce_1D.sptl create mode 100644 samples/spatial/collectives/allreduce_2D.sptl diff --git a/samples/spatial/collectives/allreduce_1D.sptl b/samples/spatial/collectives/allreduce_1D.sptl new file mode 100644 index 00000000..dbdc1a72 --- /dev/null +++ b/samples/spatial/collectives/allreduce_1D.sptl @@ -0,0 +1,89 @@ +/** + * 1D Allreduce over N PEs with K elements per PE. + * Root is at (0,0). Reduce = pipelined chain westward; broadcast = direct multicast. + * + * Communication Pattern (N=4): + * Reduce: [0,0] <--1-- [1,0] <--0-- [2,0] <--1-- [3,0] + * Broadcast: [0,0] ----multicast----> {[1,0], [2,0], [3,0]} + * + * (!) Assumes N >= 2 (!) + **/ +kernel @allreduce_1d(stream[N,1] readonly a_in, stream[N,1] writeonly out) { + + place i16 i, i16 j in [0:N, 0] { + f32[K] a + } + + // Phase 1: All PEs load their input + phase { + compute i16 i, i16 j in [0:N, 0] { + await receive(a, a_in[i, j]) + } + } + + // Phase 2: Chain reduce westward into root (0,0) + phase { + dataflow i16 i, i16 j in [0:N, 0] { + stream red = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 0 + } + stream blue = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 1 + } + } + + // East corner: only send + compute i16 i, i16 j in [N-1, 0] { + await send(a, red if (N-1) % 2 == 0 else blue) + } + // Odd PEs: receive red, accumulate, forward blue + compute i16 i, i16 j in [1:N-1:2, 0] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Even PEs (middle): receive blue, accumulate, forward red + compute i16 i, i16 j in [2:N-1:2, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // West corner (root): accumulate final sum; always receives blue + compute i16 i, i16 j in [0, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + } + + // Phase 3: Multicast from root (0,0) to all other PEs simultaneously + phase { + dataflow i16 i, i16 j in [0:N, 0] { + stream s = relative_stream([1:N], 0) { + hops = auto, + channel = 2 + } + } + + // Root: multicast the reduced value + compute i16 i, i16 j in [0, 0] { + await send(a, s) + } + + // All other PEs: receive the broadcast value + compute i16 i, i16 j in [1:N, 0] { + await receive(a, s) + } + } + + // Phase 4: All PEs write output + phase { + compute i16 i, i16 j in [0:N, 0] { + await send(a, out[i, j]) + } + } +} diff --git a/samples/spatial/collectives/allreduce_2D.sptl b/samples/spatial/collectives/allreduce_2D.sptl new file mode 100644 index 00000000..8b493831 --- /dev/null +++ b/samples/spatial/collectives/allreduce_2D.sptl @@ -0,0 +1,153 @@ +/** + * 2D Allreduce over NX*NY PEs with K elements per PE. + * Root is at (0,0). Reduce = pipelined chain (X then Y); broadcast = direct multicast (X then Y). + * + * Communication Pattern: + * Reduce Phase X: each row reduces westward → (0, j) for all j + * Reduce Phase Y: column 0 reduces northward → root (0,0) + * Broadcast Phase X: root (0,0) multicasts to {(1,0)…(NX-1,0)} + * Broadcast Phase Y: every (i,0) multicasts southward to {(i,1)…(i,NY-1)} + * + * (!) Assumes NX >= 2 and NY >= 2 (!) + **/ +kernel @allreduce_2d(stream[NX,NY] readonly a_in, stream[NX,NY] writeonly out) { + + place i16 i, i16 j in [0:NX, 0:NY] { + f32[K] a + } + + // Phase 1: All PEs load their input + phase { + compute i16 i, i16 j in [0:NX, 0:NY] { + await receive(a, a_in[i, j]) + } + } + + // Reduce Phase X: every row reduces westward into its (i=0, j) PE. + // All NY rows execute in parallel. Mirrors chain_reduce_2D Phase X. + phase { + dataflow i16 i, i16 j in [0:NX, 0:NY] { + stream red = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 0 + } + stream blue = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 1 + } + } + + // East corners (i=NX-1): only send + compute i16 i, i16 j in [NX-1, 0:NY] { + await send(a, red if (NX-1) % 2 == 0 else blue) + } + // Odd i (middle): receive red, accumulate, forward blue + compute i16 i, i16 j in [1:NX-1:2, 0:NY] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Even i (middle): receive blue, accumulate, forward red + compute i16 i, i16 j in [2:NX-1:2, 0:NY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // West corners (i=0): accumulate row sum; always receive blue + compute i16 i, i16 j in [0, 0:NY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + } + + // Reduce Phase Y: column 0 reduces northward into root (0,0). + // Mirrors chain_reduce_2D Phase Y. + phase { + dataflow i16 i, i16 j in [0, 0:NY] { + stream red = relative_stream(0, -1) { + hops = [(0, -1)], + channel = 2 + } + stream blue = relative_stream(0, -1) { + hops = [(0, -1)], + channel = 3 + } + } + + // South corner (j=NY-1): only send + compute i16 i, i16 j in [0, NY-1] { + await send(a, red if (NY-1) % 2 == 0 else blue) + } + // Odd j (middle): receive red, accumulate, forward blue + compute i16 i, i16 j in [0, 1:NY-1:2] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Even j (middle): receive blue, accumulate, forward red + compute i16 i, i16 j in [0, 2:NY-1:2] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // Root (j=0): accumulate final column sum; always receives blue + compute i16 i, i16 j in [0, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + } + + // Broadcast Phase X: root (0,0) multicasts eastward to all PEs in row j=0. + phase { + dataflow i16 i, i16 j in [0:NX, 0:1] { + stream sx = relative_stream([1:NX], 0) { + hops = auto, + channel = 4 + } + } + + // Root: multicast to all other row-0 PEs + compute i16 i, i16 j in [0:1, 0:1] { + await send(a, sx) + } + + // Row-0 receivers: collect from root + compute i16 i, i16 j in [1:NX, 0:1] { + await receive(a, sx) + } + } + + // Broadcast Phase Y: every (i,0) PE multicasts southward through its column. + // All NX columns execute in parallel. + phase { + dataflow i16 i, i16 j in [0:NX, 0:NY] { + stream sy = relative_stream(0, [1:NY]) { + hops = auto, + channel = 5 + } + } + + // Column roots (j=0): multicast southward + compute i16 i, i16 j in [0:NX, 0:1] { + await send(a, sy) + } + + // Column receivers (j>0): collect from their column root + compute i16 i, i16 j in [0:NX, 1:NY] { + await receive(a, sy) + } + } + + // Phase 6: All PEs write output + phase { + compute i16 i, i16 j in [0:NX, 0:NY] { + await send(a, out[i, j]) + } + } +} From 0d6aa944867cd4987c92b4ccc517255a410136c6 Mon Sep 17 00:00:00 2001 From: glukas Date: Mon, 30 Mar 2026 18:16:09 +0200 Subject: [PATCH 2/3] Add twophase allreduce, tests --- .../collectives/twophase_allreduce_1D.sptl | 143 ++++++++++ .../collectives/twophase_allreduce_2D.sptl | 264 ++++++++++++++++++ tests/csl_runtime/_lib.sh | 38 +++ tests/csl_runtime/test_allreduce_1d.sh | 35 +++ tests/csl_runtime/test_allreduce_2d.sh | 36 +++ .../csl_runtime/test_twophase_allreduce_1d.sh | 38 +++ .../csl_runtime/test_twophase_allreduce_2d.sh | 42 +++ 7 files changed, 596 insertions(+) create mode 100644 samples/spatial/collectives/twophase_allreduce_1D.sptl create mode 100644 samples/spatial/collectives/twophase_allreduce_2D.sptl create mode 100755 tests/csl_runtime/test_allreduce_1d.sh create mode 100755 tests/csl_runtime/test_allreduce_2d.sh create mode 100755 tests/csl_runtime/test_twophase_allreduce_1d.sh create mode 100755 tests/csl_runtime/test_twophase_allreduce_2d.sh diff --git a/samples/spatial/collectives/twophase_allreduce_1D.sptl b/samples/spatial/collectives/twophase_allreduce_1D.sptl new file mode 100644 index 00000000..7a258d5f --- /dev/null +++ b/samples/spatial/collectives/twophase_allreduce_1D.sptl @@ -0,0 +1,143 @@ +/** + * 1D Allreduce over G*S PEs with K elements per PE. + * Reduce = two-phase chain (within-group then cross-group); broadcast = direct multicast. + * Root is at (0,0). + * + * Communication Pattern (S=4, G=3, P=12): + * + * Phase 1 — within-group chain reduces (channels 0,1): + * [0,0] <--1-- [1,0] <--0-- [2,0] <--1-- [3,0] [4,0] <--... [8,0] <--... + * + * Phase 2 — cross-group chain reduce over {0,4,8} (channels 2,3): + * [0,0] <--3---- [4,0] <--2---- [8,0] + * + * Phase 3 — multicast from root to all PEs (channel 0 reused): + * [0,0] ----multicast----> {[1,0], [2,0], ..., [G*S-1,0]} + * + * (!) Assumes S is even (!) + **/ +kernel @twophase_allreduce_1d( + stream[G*S, 1] readonly a_in, + stream[G*S, 1] writeonly out +) { + + place i16 i, i16 j in [0:G*S, 0] { + f32[K] a + } + + // Phase 1: All PEs load input + phase { + compute i16 i, i16 j in [0:G*S, 0] { + await receive(a, a_in[i, j]) + } + } + + // Phase 2: Within-group reduce — each group [g*S .. (g+1)*S-1] reduces westward. + // SX even → east corner of every group is always at an odd local index → sends blue. + phase { + dataflow i16 i, i16 j in [0:S*G, 0] { + stream red = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 0 + } + stream blue = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 1 + } + } + + for i16 group in [0:G] { + // West corner of group: accumulate, no send + compute i16 i, i16 j in [group * S, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + // Middle odd + compute i16 i, i16 j in [group * S + 1 : (group+1) * S - 1 : 2, 0] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Middle even + compute i16 i, i16 j in [group * S + 2 : (group+1) * S - 1 : 2, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // East corner: S even → local index S-1 is odd → send blue + compute i16 i, i16 j in [(group+1) * S - 1, 0] { + await send(a, blue) + } + } + } + + // Phase 3: Cross-group reduce — G representatives at {0, S, 2S, ...} reduce westward. + // Multi-hop streams span S hops each. Channels 2,3 avoid conflict with Phase 2. + phase { + dataflow i16 i, i16 j in [0:S*G:S, 0] { + stream green = relative_stream(-S, 0) { + hops = auto, + channel = 2 + } + stream yellow = relative_stream(-S, 0) { + hops = auto, + channel = 3 + } + } + + // West corner (root): accumulate final sum + compute i16 i, i16 j in [0, 0] { + await foreach i16 k, f32 x in [0:K], receive(green) { + a[k] = a[k] + x + } + } + // Middle odd group representatives + compute i16 i, i16 j in [S:(G-1)*S:2*S, 0] { + await foreach i16 k, f32 x in [0:K], receive(yellow) { + a[k] = a[k] + x + await send(a[k], green) + } + } + // Middle even group representatives + compute i16 i, i16 j in [2*S:(G-1)*S:2*S, 0] { + await foreach i16 k, f32 x in [0:K], receive(green) { + a[k] = a[k] + x + await send(a[k], yellow) + } + } + // East corner: group-level index G-1; even → red (green), odd → blue (yellow) + compute i16 i, i16 j in [(G-1)*S, 0] { + await send(a, yellow if (G-1) % 2 == 0 else green) + } + } + + // Phase 4: Multicast from root (0,0) to all other PEs simultaneously + phase { + dataflow i16 i, i16 j in [0:G*S, 0] { + stream s = relative_stream([1:G*S], 0) { + hops = auto, + channel = 0 + } + } + + // Root: multicast the reduced value + compute i16 i, i16 j in [0, 0] { + await send(a, s) + } + + // All other PEs: receive the broadcast value + compute i16 i, i16 j in [1:G*S, 0] { + await receive(a, s) + } + } + + // Phase 5: All PEs write output + phase { + compute i16 i, i16 j in [0:G*S, 0] { + await send(a, out[i, j]) + } + } +} diff --git a/samples/spatial/collectives/twophase_allreduce_2D.sptl b/samples/spatial/collectives/twophase_allreduce_2D.sptl new file mode 100644 index 00000000..be545641 --- /dev/null +++ b/samples/spatial/collectives/twophase_allreduce_2D.sptl @@ -0,0 +1,264 @@ +/** + * 2D Allreduce over (GX*SX) x (GY*SY) PEs with K elements per PE. + * Reduce = two-phase chain (X then Y, each two-phase); broadcast = direct multicast (X then Y). + * Root is at (0,0). + * + * Reduce phases: + * X1: within-group X reduce → each group's west corner (channels 0,1) + * X2: cross-group X reduce → column 0 (channels 2,3) + * Y1: within-group Y reduce → each group's north corner (channels 4,5) + * Y2: cross-group Y reduce → root (0,0) (channels 6,7) + * + * Broadcast phases (channels reused; phases are sequential): + * BX: root (0,0) multicasts eastward along row j=0 (channel 0) + * BY: every (i,0) multicasts southward through column i (channel 1) + * + * (!) Assumes SX and SY are even (!) + **/ +kernel @twophase_allreduce_2d( + stream[GX*SX, GY*SY] readonly a_in, + stream[GX*SX, GY*SY] writeonly out +) { + + place i16 i, i16 j in [0:GX*SX, 0:GY*SY] { + f32[K] a + } + + // Phase 1: All PEs load input + phase { + compute i16 i, i16 j in [0:GX*SX, 0:GY*SY] { + await receive(a, a_in[i, j]) + } + } + + // ------------------------------------------------------------------------- + // Phase X1: Within-group reduce in X, for every row. + // Each group [gx*SX .. (gx+1)*SX-1] reduces westward into gx*SX. + // SX even → east corner always at an odd local index → sends blue. + // All GY*SY rows participate in parallel. + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0:GX*SX, 0:GY*SY] { + stream red = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 0 + } + stream blue = relative_stream(-1, 0) { + hops = [(-1, 0)], + channel = 1 + } + } + + for i16 gx in [0:GX] { + // West corner of group: accumulate, no send + compute i16 i, i16 j in [gx * SX, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + // Middle odd + compute i16 i, i16 j in [gx * SX + 1 : (gx+1) * SX - 1 : 2, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Middle even + compute i16 i, i16 j in [gx * SX + 2 : (gx+1) * SX - 1 : 2, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // East corner: SX even → local index SX-1 is odd → send blue + compute i16 i, i16 j in [(gx+1) * SX - 1, 0:GY*SY] { + await send(a, blue) + } + } + } + + // ------------------------------------------------------------------------- + // Phase X2: Cross-group reduce in X, for every row. + // GX representatives at i = 0, SX, 2*SX, ... reduce westward into column 0. + // Multi-hop streams span SX hops. Channels 2,3 avoid conflict with X1. + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0:GX*SX:SX, 0:GY*SY] { + stream red = relative_stream(-SX, 0) { + hops = auto, + channel = 2 + } + stream blue = relative_stream(-SX, 0) { + hops = auto, + channel = 3 + } + } + + // West corner (column 0): accumulate row sum + compute i16 i, i16 j in [0, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + // Middle odd group representatives + compute i16 i, i16 j in [SX:(GX-1)*SX:2*SX, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Middle even group representatives + compute i16 i, i16 j in [2*SX:(GX-1)*SX:2*SX, 0:GY*SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // East corner: group-level index GX-1; even → red, odd → blue + compute i16 i, i16 j in [(GX-1)*SX, 0:GY*SY] { + await send(a, red if (GX-1) % 2 == 0 else blue) + } + } + + // ------------------------------------------------------------------------- + // Phase Y1: Within-group reduce in Y, for column 0 only. + // Each group [gy*SY .. (gy+1)*SY-1] reduces northward into gy*SY. + // SY even → south corner always at an odd local index → sends blue. + // Channels 4,5 avoid conflict with X1 (0,1) and X2 (2,3). + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0, 0:GY*SY] { + stream red = relative_stream(0, -1) { + hops = [(0, -1)], + channel = 4 + } + stream blue = relative_stream(0, -1) { + hops = [(0, -1)], + channel = 5 + } + } + + for i16 gy in [0:GY] { + // North corner of group: accumulate, no send + compute i16 i, i16 j in [0, gy * SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + // Middle odd + compute i16 i, i16 j in [0, gy * SY + 1 : (gy+1) * SY - 1 : 2] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Middle even + compute i16 i, i16 j in [0, gy * SY + 2 : (gy+1) * SY - 1 : 2] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // South corner: SY even → local index SY-1 is odd → send blue + compute i16 i, i16 j in [0, (gy+1) * SY - 1] { + await send(a, blue) + } + } + } + + // ------------------------------------------------------------------------- + // Phase Y2: Cross-group reduce in Y, for column 0 only. + // GY representatives at j = 0, SY, 2*SY, ... reduce northward into root (0,0). + // Multi-hop streams span SY hops. Channels 6,7 avoid conflict with Y1 (4,5). + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0, 0:GY*SY:SY] { + stream red = relative_stream(0, -SY) { + hops = auto, + channel = 6 + } + stream blue = relative_stream(0, -SY) { + hops = auto, + channel = 7 + } + } + + // Root (0,0): accumulate final sum + compute i16 i, i16 j in [0, 0] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + } + } + // Middle odd group representatives + compute i16 i, i16 j in [0, SY:(GY-1)*SY:2*SY] { + await foreach i16 k, f32 x in [0:K], receive(red) { + a[k] = a[k] + x + await send(a[k], blue) + } + } + // Middle even group representatives + compute i16 i, i16 j in [0, 2*SY:(GY-1)*SY:2*SY] { + await foreach i16 k, f32 x in [0:K], receive(blue) { + a[k] = a[k] + x + await send(a[k], red) + } + } + // South corner: group-level index GY-1; even → red, odd → blue + compute i16 i, i16 j in [0, (GY-1)*SY] { + await send(a, red if (GY-1) % 2 == 0 else blue) + } + } + + // ------------------------------------------------------------------------- + // Broadcast Phase X: root (0,0) multicasts eastward to all PEs in row j=0. + // Channels reused (phases are sequential). + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0:GX*SX, 0:1] { + stream sx = relative_stream([1:GX*SX], 0) { + hops = auto, + channel = 0 + } + } + + // Root: multicast to all other row-0 PEs + compute i16 i, i16 j in [0:1, 0:1] { + await send(a, sx) + } + + // Row-0 receivers: collect from root + compute i16 i, i16 j in [1:GX*SX, 0:1] { + await receive(a, sx) + } + } + + // ------------------------------------------------------------------------- + // Broadcast Phase Y: every (i,0) PE multicasts southward through its column. + // All GX*SX columns execute in parallel. + // ------------------------------------------------------------------------- + phase { + dataflow i16 i, i16 j in [0:GX*SX, 0:GY*SY] { + stream sy = relative_stream(0, [1:GY*SY]) { + hops = auto, + channel = 1 + } + } + + // Column roots (j=0): multicast southward + compute i16 i, i16 j in [0:GX*SX, 0:1] { + await send(a, sy) + } + + // Column receivers (j>0): collect from their column root + compute i16 i, i16 j in [0:GX*SX, 1:GY*SY] { + await receive(a, sy) + } + } + + // Phase 8: All PEs write output + phase { + compute i16 i, i16 j in [0:GX*SX, 0:GY*SY] { + await send(a, out[i, j]) + } + } +} diff --git a/tests/csl_runtime/_lib.sh b/tests/csl_runtime/_lib.sh index a5e24a96..b5381bc4 100755 --- a/tests/csl_runtime/_lib.sh +++ b/tests/csl_runtime/_lib.sh @@ -176,6 +176,44 @@ print(f"Test passed: x-neg-multicast [-{START}:-{N}], {N-START} receivers.") PYEOF } +# verify_allreduce +# Loads a_in.npy (shape N×1×K), expects OUT_out.npy (shape N×1×K) +# where every PE holds the global sum: OUT_out[i,0,:] == sum(a_in[:,0,:]). +verify_allreduce() { + python3 - <<'PYEOF' +import numpy as np, sys +a = np.load('a_in.npy') +ref = np.sum(a, axis=0, keepdims=True) # (1, 1, K) +out = np.load('OUT_out.npy') # (N, 1, K) +rep = np.broadcast_to(ref, out.shape) +if not np.allclose(out, rep, atol=1e-5): + print(f"Test failed: max abs diff = {float(np.max(np.abs(out - rep))):.3e}") + print(f" expected: {rep.flatten()[:8]}") + print(f" got: {out.flatten()[:8]}") + sys.exit(1) +print("Test passed: allreduce output matches expected sum at every PE.") +PYEOF +} + +# verify_allreduce_2d +# Loads a_in.npy (shape NX×NY×K), expects OUT_out.npy (shape NX×NY×K) +# where every PE holds the global sum: OUT_out[i,j,:] == sum(a_in[:,:,:]). +verify_allreduce_2d() { + python3 - <<'PYEOF' +import numpy as np, sys +a = np.load('a_in.npy') +ref = np.sum(a, axis=(0, 1), keepdims=True) # (1, 1, K) +out = np.load('OUT_out.npy') # (NX, NY, K) +rep = np.broadcast_to(ref, out.shape) +if not np.allclose(out, rep, atol=1e-5): + print(f"Test failed: max abs diff = {float(np.max(np.abs(out - rep))):.3e}") + print(f" expected: {rep.flatten()[:8]}") + print(f" got: {out.flatten()[:8]}") + sys.exit(1) +print("Test passed: 2D allreduce output matches expected sum at every PE.") +PYEOF +} + # cleanup FOLDER # Removes compiled folder and temporary npy files. cleanup() { diff --git a/tests/csl_runtime/test_allreduce_1d.sh b/tests/csl_runtime/test_allreduce_1d.sh new file mode 100755 index 00000000..95d832fa --- /dev/null +++ b/tests/csl_runtime/test_allreduce_1d.sh @@ -0,0 +1,35 @@ +#!/bin/sh +# E2E test: 1-D allreduce (chain reduce + multicast broadcast, N PEs, K elements per PE). +# Kernel: allreduce_1D.sptl params: N K +# Reference: OUT_out[i,0,:] == sum(a_in[:,0,:]) for all i in 0..N-1 + +set -e +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +. "$SCRIPT_DIR/_lib.sh" + +FOLDER="allreduce_1d_sptl" + +run_allreduce_1d() { + N=$1 + K=$2 + echo "--- allreduce_1d N=$N K=$K ---" + + sptlc "$COLLECTIVES_DIR/allreduce_1D.sptl" "$FOLDER" -p N=$N -p K=$K + + python3 - < Date: Mon, 30 Mar 2026 18:17:51 +0200 Subject: [PATCH 3/3] Fix colors --- samples/spatial/collectives/twophase_allreduce_1D.sptl | 2 +- samples/spatial/collectives/twophase_allreduce_2D.sptl | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/samples/spatial/collectives/twophase_allreduce_1D.sptl b/samples/spatial/collectives/twophase_allreduce_1D.sptl index 7a258d5f..5ac2f607 100644 --- a/samples/spatial/collectives/twophase_allreduce_1D.sptl +++ b/samples/spatial/collectives/twophase_allreduce_1D.sptl @@ -119,7 +119,7 @@ kernel @twophase_allreduce_1d( dataflow i16 i, i16 j in [0:G*S, 0] { stream s = relative_stream([1:G*S], 0) { hops = auto, - channel = 0 + channel = 4 } } diff --git a/samples/spatial/collectives/twophase_allreduce_2D.sptl b/samples/spatial/collectives/twophase_allreduce_2D.sptl index be545641..ce1b077f 100644 --- a/samples/spatial/collectives/twophase_allreduce_2D.sptl +++ b/samples/spatial/collectives/twophase_allreduce_2D.sptl @@ -217,7 +217,7 @@ kernel @twophase_allreduce_2d( dataflow i16 i, i16 j in [0:GX*SX, 0:1] { stream sx = relative_stream([1:GX*SX], 0) { hops = auto, - channel = 0 + channel = 8 } } @@ -240,7 +240,7 @@ kernel @twophase_allreduce_2d( dataflow i16 i, i16 j in [0:GX*SX, 0:GY*SY] { stream sy = relative_stream(0, [1:GY*SY]) { hops = auto, - channel = 1 + channel = 9 } }