Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions samples/spatial/collectives/allreduce_1D.sptl
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* 1D Allreduce over N PEs with K elements per PE.
* Root is at (0,0). Reduce = pipelined chain westward; broadcast = direct multicast.
*
* Communication Pattern (N=4):
* Reduce: [0,0] <--1-- [1,0] <--0-- [2,0] <--1-- [3,0]
* Broadcast: [0,0] ----multicast----> {[1,0], [2,0], [3,0]}
*
* (!) Assumes N >= 2 (!)
**/
kernel @allreduce_1d<N, K>(stream<f32, K>[N,1] readonly a_in, stream<f32, K>[N,1] writeonly out) {

place i16 i, i16 j in [0:N, 0] {
f32[K] a
}

// Phase 1: All PEs load their input
phase {
compute i16 i, i16 j in [0:N, 0] {
await receive(a, a_in[i, j])
}
}

// Phase 2: Chain reduce westward into root (0,0)
phase {
dataflow i16 i, i16 j in [0:N, 0] {
stream<f32> red = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 0
}
stream<f32> blue = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 1
}
}

// East corner: only send
compute i16 i, i16 j in [N-1, 0] {
await send(a, red if (N-1) % 2 == 0 else blue)
}
// Odd PEs: receive red, accumulate, forward blue
compute i16 i, i16 j in [1:N-1:2, 0] {
await foreach i16 k, f32 x in [0:K], receive(red) {
a[k] = a[k] + x
await send(a[k], blue)
}
}
// Even PEs (middle): receive blue, accumulate, forward red
compute i16 i, i16 j in [2:N-1:2, 0] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
await send(a[k], red)
}
}
// West corner (root): accumulate final sum; always receives blue
compute i16 i, i16 j in [0, 0] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
}
}
}

// Phase 3: Multicast from root (0,0) to all other PEs simultaneously
phase {
dataflow i16 i, i16 j in [0:N, 0] {
stream<f32> s = relative_stream([1:N], 0) {
hops = auto,
channel = 2
}
}

// Root: multicast the reduced value
compute i16 i, i16 j in [0, 0] {
await send(a, s)
}

// All other PEs: receive the broadcast value
compute i16 i, i16 j in [1:N, 0] {
await receive(a, s)
}
}

// Phase 4: All PEs write output
phase {
compute i16 i, i16 j in [0:N, 0] {
await send(a, out[i, j])
}
}
}
153 changes: 153 additions & 0 deletions samples/spatial/collectives/allreduce_2D.sptl
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* 2D Allreduce over NX*NY PEs with K elements per PE.
* Root is at (0,0). Reduce = pipelined chain (X then Y); broadcast = direct multicast (X then Y).
*
* Communication Pattern:
* Reduce Phase X: each row reduces westward → (0, j) for all j
* Reduce Phase Y: column 0 reduces northward → root (0,0)
* Broadcast Phase X: root (0,0) multicasts to {(1,0)…(NX-1,0)}
* Broadcast Phase Y: every (i,0) multicasts southward to {(i,1)…(i,NY-1)}
*
* (!) Assumes NX >= 2 and NY >= 2 (!)
**/
kernel @allreduce_2d<NX, NY, K>(stream<f32, K>[NX,NY] readonly a_in, stream<f32, K>[NX,NY] writeonly out) {

place i16 i, i16 j in [0:NX, 0:NY] {
f32[K] a
}

// Phase 1: All PEs load their input
phase {
compute i16 i, i16 j in [0:NX, 0:NY] {
await receive(a, a_in[i, j])
}
}

// Reduce Phase X: every row reduces westward into its (i=0, j) PE.
// All NY rows execute in parallel. Mirrors chain_reduce_2D Phase X.
phase {
dataflow i16 i, i16 j in [0:NX, 0:NY] {
stream<f32> red = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 0
}
stream<f32> blue = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 1
}
}

// East corners (i=NX-1): only send
compute i16 i, i16 j in [NX-1, 0:NY] {
await send(a, red if (NX-1) % 2 == 0 else blue)
}
// Odd i (middle): receive red, accumulate, forward blue
compute i16 i, i16 j in [1:NX-1:2, 0:NY] {
await foreach i16 k, f32 x in [0:K], receive(red) {
a[k] = a[k] + x
await send(a[k], blue)
}
}
// Even i (middle): receive blue, accumulate, forward red
compute i16 i, i16 j in [2:NX-1:2, 0:NY] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
await send(a[k], red)
}
}
// West corners (i=0): accumulate row sum; always receive blue
compute i16 i, i16 j in [0, 0:NY] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
}
}
}

// Reduce Phase Y: column 0 reduces northward into root (0,0).
// Mirrors chain_reduce_2D Phase Y.
phase {
dataflow i16 i, i16 j in [0, 0:NY] {
stream<f32> red = relative_stream(0, -1) {
hops = [(0, -1)],
channel = 2
}
stream<f32> blue = relative_stream(0, -1) {
hops = [(0, -1)],
channel = 3
}
}

// South corner (j=NY-1): only send
compute i16 i, i16 j in [0, NY-1] {
await send(a, red if (NY-1) % 2 == 0 else blue)
}
// Odd j (middle): receive red, accumulate, forward blue
compute i16 i, i16 j in [0, 1:NY-1:2] {
await foreach i16 k, f32 x in [0:K], receive(red) {
a[k] = a[k] + x
await send(a[k], blue)
}
}
// Even j (middle): receive blue, accumulate, forward red
compute i16 i, i16 j in [0, 2:NY-1:2] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
await send(a[k], red)
}
}
// Root (j=0): accumulate final column sum; always receives blue
compute i16 i, i16 j in [0, 0] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
}
}
}

// Broadcast Phase X: root (0,0) multicasts eastward to all PEs in row j=0.
phase {
dataflow i16 i, i16 j in [0:NX, 0:1] {
stream<f32> sx = relative_stream([1:NX], 0) {
hops = auto,
channel = 4
}
}

// Root: multicast to all other row-0 PEs
compute i16 i, i16 j in [0:1, 0:1] {
await send(a, sx)
}

// Row-0 receivers: collect from root
compute i16 i, i16 j in [1:NX, 0:1] {
await receive(a, sx)
}
}

// Broadcast Phase Y: every (i,0) PE multicasts southward through its column.
// All NX columns execute in parallel.
phase {
dataflow i16 i, i16 j in [0:NX, 0:NY] {
stream<f32> sy = relative_stream(0, [1:NY]) {
hops = auto,
channel = 5
}
}

// Column roots (j=0): multicast southward
compute i16 i, i16 j in [0:NX, 0:1] {
await send(a, sy)
}

// Column receivers (j>0): collect from their column root
compute i16 i, i16 j in [0:NX, 1:NY] {
await receive(a, sy)
}
}

// Phase 6: All PEs write output
phase {
compute i16 i, i16 j in [0:NX, 0:NY] {
await send(a, out[i, j])
}
}
}
143 changes: 143 additions & 0 deletions samples/spatial/collectives/twophase_allreduce_1D.sptl
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/**
* 1D Allreduce over G*S PEs with K elements per PE.
* Reduce = two-phase chain (within-group then cross-group); broadcast = direct multicast.
* Root is at (0,0).
*
* Communication Pattern (S=4, G=3, P=12):
*
* Phase 1 — within-group chain reduces (channels 0,1):
* [0,0] <--1-- [1,0] <--0-- [2,0] <--1-- [3,0] [4,0] <--... [8,0] <--...
*
* Phase 2 — cross-group chain reduce over {0,4,8} (channels 2,3):
* [0,0] <--3---- [4,0] <--2---- [8,0]
*
* Phase 3 — multicast from root to all PEs (channel 0 reused):
* [0,0] ----multicast----> {[1,0], [2,0], ..., [G*S-1,0]}
*
* (!) Assumes S is even (!)
**/
kernel @twophase_allreduce_1d<G, S, K>(
stream<f32, K>[G*S, 1] readonly a_in,
stream<f32, K>[G*S, 1] writeonly out
) {

place i16 i, i16 j in [0:G*S, 0] {
f32[K] a
}

// Phase 1: All PEs load input
phase {
compute i16 i, i16 j in [0:G*S, 0] {
await receive(a, a_in[i, j])
}
}

// Phase 2: Within-group reduce — each group [g*S .. (g+1)*S-1] reduces westward.
// SX even → east corner of every group is always at an odd local index → sends blue.
phase {
dataflow i16 i, i16 j in [0:S*G, 0] {
stream<f32> red = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 0
}
stream<f32> blue = relative_stream(-1, 0) {
hops = [(-1, 0)],
channel = 1
}
}

for i16 group in [0:G] {
// West corner of group: accumulate, no send
compute i16 i, i16 j in [group * S, 0] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
}
}
// Middle odd
compute i16 i, i16 j in [group * S + 1 : (group+1) * S - 1 : 2, 0] {
await foreach i16 k, f32 x in [0:K], receive(red) {
a[k] = a[k] + x
await send(a[k], blue)
}
}
// Middle even
compute i16 i, i16 j in [group * S + 2 : (group+1) * S - 1 : 2, 0] {
await foreach i16 k, f32 x in [0:K], receive(blue) {
a[k] = a[k] + x
await send(a[k], red)
}
}
// East corner: S even → local index S-1 is odd → send blue
compute i16 i, i16 j in [(group+1) * S - 1, 0] {
await send(a, blue)
}
}
}

// Phase 3: Cross-group reduce — G representatives at {0, S, 2S, ...} reduce westward.
// Multi-hop streams span S hops each. Channels 2,3 avoid conflict with Phase 2.
phase {
dataflow i16 i, i16 j in [0:S*G:S, 0] {
stream<f32> green = relative_stream(-S, 0) {
hops = auto,
channel = 2
}
stream<f32> yellow = relative_stream(-S, 0) {
hops = auto,
channel = 3
}
}

// West corner (root): accumulate final sum
compute i16 i, i16 j in [0, 0] {
await foreach i16 k, f32 x in [0:K], receive(green) {
a[k] = a[k] + x
}
}
// Middle odd group representatives
compute i16 i, i16 j in [S:(G-1)*S:2*S, 0] {
await foreach i16 k, f32 x in [0:K], receive(yellow) {
a[k] = a[k] + x
await send(a[k], green)
}
}
// Middle even group representatives
compute i16 i, i16 j in [2*S:(G-1)*S:2*S, 0] {
await foreach i16 k, f32 x in [0:K], receive(green) {
a[k] = a[k] + x
await send(a[k], yellow)
}
}
// East corner: group-level index G-1; even → red (green), odd → blue (yellow)
compute i16 i, i16 j in [(G-1)*S, 0] {
await send(a, yellow if (G-1) % 2 == 0 else green)
}
}

// Phase 4: Multicast from root (0,0) to all other PEs simultaneously
phase {
dataflow i16 i, i16 j in [0:G*S, 0] {
stream<f32> s = relative_stream([1:G*S], 0) {
hops = auto,
channel = 4
}
}

// Root: multicast the reduced value
compute i16 i, i16 j in [0, 0] {
await send(a, s)
}

// All other PEs: receive the broadcast value
compute i16 i, i16 j in [1:G*S, 0] {
await receive(a, s)
}
}

// Phase 5: All PEs write output
phase {
compute i16 i, i16 j in [0:G*S, 0] {
await send(a, out[i, j])
}
}
}
Loading
Loading