From b96df653919f39270ff1e2b0fdcec8c89bdc671c Mon Sep 17 00:00:00 2001 From: "jonathan.schaefer@hlrs.de" Date: Wed, 3 Jun 2026 15:34:51 +0100 Subject: [PATCH 1/5] Fixed test non-hw-executed cases to support wse3 This commit introduces several changes: - Renamed the task_recycling_*.sptl files to a wse2 specific version - Created new corresponding task_recycling_*_wse3.sptl files that represent correct test cases for wse3 - Added conditional test case execution depending on environment variable WSE_ARCH --- ...ge.sptl => task_recycling_merge_wse2.sptl} | 0 .../samples/task_recycling_merge_wse3.sptl | 114 ++++++++++++++ ...l => task_recycling_three_stage_wse2.sptl} | 0 .../task_recycling_three_stage_wse3.sptl | 93 +++++++++++ ...ptl => task_recycling_two_stage_wse2.sptl} | 0 .../task_recycling_two_stage_wse3.sptl | 93 +++++++++++ tests/spatial_ir/test_task_recycling.py | 145 ++++++++++++------ .../spatial_ir/test_task_recycling_codegen.py | 64 +++++--- 8 files changed, 441 insertions(+), 68 deletions(-) rename tests/csl_runtime/samples/{task_recycling_merge.sptl => task_recycling_merge_wse2.sptl} (100%) create mode 100644 tests/csl_runtime/samples/task_recycling_merge_wse3.sptl rename tests/csl_runtime/samples/{task_recycling_three_stage.sptl => task_recycling_three_stage_wse2.sptl} (100%) create mode 100644 tests/csl_runtime/samples/task_recycling_three_stage_wse3.sptl rename tests/csl_runtime/samples/{task_recycling_two_stage.sptl => task_recycling_two_stage_wse2.sptl} (100%) create mode 100644 tests/csl_runtime/samples/task_recycling_two_stage_wse3.sptl diff --git a/tests/csl_runtime/samples/task_recycling_merge.sptl b/tests/csl_runtime/samples/task_recycling_merge_wse2.sptl similarity index 100% rename from tests/csl_runtime/samples/task_recycling_merge.sptl rename to tests/csl_runtime/samples/task_recycling_merge_wse2.sptl diff --git a/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl b/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl new file mode 100644 index 00000000..71bc6531 --- /dev/null +++ b/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl @@ -0,0 +1,114 @@ +// Two awaitall groups +// +// This test was altered because the old test with one awaitall fails on WSE-3 +kernel @task_recycling_merge<>(stream[1, 1] readonly input, stream[1, 1] writeonly output) { + place u16 i, u16 j in [0:1, 0:1] { + f32[22] buf + f32 v0 + f32 v1 + f32 v2 + f32 v3 + f32 v4 + f32 v5 + f32 v6 + f32 v7 + f32 v8 + f32 v9 + f32 v10 + f32 v11 + f32 v12 + f32 v13 + f32 v14 + f32 v15 + f32 v16 + f32 v17 + f32 v18 + f32 left_val + f32 right_val + f32 output_val + f32[1] out_buf + } + + dataflow u16 i, u16 j in [0:1, 0:1] { + } + + compute u16 i, u16 j in [0:1, 0:1] { + await receive(buf, input[i, j]) + + completion s0 = async { + v0 = buf[0] + } + completion s1 = async { + v1 = v0 * 0.0 + buf[1] + } + completion s2 = async { + v2 = v1 * 0.0 + buf[2] + } + completion s3 = async { + v3 = v2 * 0.0 + buf[3] + } + completion s4 = async { + v4 = v3 * 0.0 + buf[4] + } + completion s5 = async { + v5 = v4 * 0.0 + buf[5] + } + completion s6 = async { + v6 = v5 * 0.0 + buf[6] + } + completion s7 = async { + v7 = v6 * 0.0 + buf[7] + } + completion s8 = async { + v8 = v7 * 0.0 + buf[8] + } + completion s9 = async { + v9 = v8 * 0.0 + buf[9] + } + completion s10 = async { + v10 = v9 * 0.0 + buf[10] + } + completion s11 = async { + v11 = v10 * 0.0 + buf[11] + } + + completion s12 = async { + v12 = v11 * 0.0 + buf[12] + } + completion s13 = async { + v13 = v12 * 0.0 + buf[13] + } + completion s14 = async { + v14 = v13 * 0.0 + buf[14] + } + completion s15 = async { + v15 = v14 * 0.0 + buf[15] + } + completion s16 = async { + v16 = v15 * 0.0 + buf[16] + } + completion s17 = async { + v17 = v16 * 0.0 + buf[17] + } + completion s18 = async { + v18 = v17 * 0.0 + buf[18] + } + completion left = async { + left_val = v19 * 0.0 + buf[20] + } + completion right = async { + right_val = v19 * 0.0 + buf[21] + } + + awaitall + + output_val = v0 + v1 + v2 + v3 + v4 + v5 + v6 + output_val = output_val + v7 + v8 + v9 + v10 + v11 + output_val = output_val + v12 + v13 + v14 + v15 + v16 + output_val = output_val + v17 + v18 + v19 + output_val = output_val + left_val + right_val + out_buf[0] = output_val + + await send(out_buf, output[i, j]) + } +} diff --git a/tests/csl_runtime/samples/task_recycling_three_stage.sptl b/tests/csl_runtime/samples/task_recycling_three_stage_wse2.sptl similarity index 100% rename from tests/csl_runtime/samples/task_recycling_three_stage.sptl rename to tests/csl_runtime/samples/task_recycling_three_stage_wse2.sptl diff --git a/tests/csl_runtime/samples/task_recycling_three_stage_wse3.sptl b/tests/csl_runtime/samples/task_recycling_three_stage_wse3.sptl new file mode 100644 index 00000000..1bd825ac --- /dev/null +++ b/tests/csl_runtime/samples/task_recycling_three_stage_wse3.sptl @@ -0,0 +1,93 @@ +// Three sequential awaitall groups of 6 completions each. +// +// Key coloring property: +// Each awaitall group with 6 completions produces a 5-clique of blocked join +// tasks in the conflict graph. Tasks from different stages do not conflict +// (each stage is fully downstream of the previous one), so all three 5-cliques +// share the same 5 hardware slots. +// +// Total local tasks: ~18 (5 + 5 + 5 blocked tasks, T0, and join/final tasks). +// Chromatic number: 5. Average slot depth: ~3.5 tasks/slot. This exercises +// deeper slot reuse than the two-stage variant and demonstrates that the +// greedy coloring correctly reuses colors across non-conflicting stages. +// +// Mathematical result: sum(buf[0:18]). +kernel @task_recycling_three_stage<>(stream[1, 1] readonly input, + stream[1, 1] writeonly output) { + place u16 i, u16 j in [0:1, 0:1] { + f32[24] buf + f32 a0 + f32 a1 + f32 a2 + f32 a3 + f32 a4 + f32 a5 + f32 a6 + f32 a7 + f32 b0 + f32 b1 + f32 b2 + f32 b3 + f32 b4 + f32 b5 + f32 b6 + f32 b7 + f32 c0 + f32 c1 + f32 c2 + f32 c3 + f32 c4 + f32 c5 + f32 c6 + f32 c7 + f32 output_val + f32[1] out_buf + } + + dataflow u16 i, u16 j in [0:1, 0:1] { + } + + compute u16 i, u16 j in [0:1, 0:1] { + await receive(buf, input[i, j]) + + // Stage 1: 6 completions, produces a 5-clique + completion s1_0 = async { a0 = buf[0] } + completion s1_1 = async { a1 = a0 * 0.0 + buf[1] } + completion s1_2 = async { a2 = a1 * 0.0 + buf[2] } + completion s1_3 = async { a3 = a2 * 0.0 + buf[3] } + completion s1_4 = async { a4 = a3 * 0.0 + buf[4] } + completion s1_5 = async { a5 = a4 * 0.0 + buf[5] } + completion s1_6 = async { a6 = a5 * 0.0 + buf[6] } + completion s1_7 = async { a7 = a6 * 0.0 + buf[7] } + awaitall + + // Stage 2: 6 completions, another 5-clique sharing stage-1 slots + completion s2_0 = async { b0 = buf[8] } + completion s2_1 = async { b1 = b0 * 0.0 + buf[9] } + completion s2_2 = async { b2 = b1 * 0.0 + buf[10] } + completion s2_3 = async { b3 = b2 * 0.0 + buf[11] } + completion s2_4 = async { b4 = b3 * 0.0 + buf[12] } + completion s2_5 = async { b5 = b4 * 0.0 + buf[13] } + completion s2_6 = async { b6 = b5 * 0.0 + buf[14] } + completion s2_7 = async { b7 = b6 * 0.0 + buf[15] } + awaitall + + // Stage 3: 6 completions, third 5-clique sharing the same 5 slots + completion s3_0 = async { c0 = buf[16] } + completion s3_1 = async { c1 = c0 * 0.0 + buf[17] } + completion s3_2 = async { c2 = c1 * 0.0 + buf[18] } + completion s3_3 = async { c3 = c2 * 0.0 + buf[19] } + completion s3_4 = async { c4 = c3 * 0.0 + buf[20] } + completion s3_5 = async { c5 = c4 * 0.0 + buf[21] } + completion s3_6 = async { c6 = c5 * 0.0 + buf[22] } + completion s3_7 = async { c7 = c6 * 0.0 + buf[23] } + awaitall + + output_val = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + output_val = output_val + b0 + b1 + b2 + b3 + b4 + b5 + b6 + b7 + output_val = output_val + c0 + c1 + c2 + c3 + c4 + c5 + c6 + c7 + out_buf[0] = output_val + + await send(out_buf, output[i, j]) + } +} diff --git a/tests/csl_runtime/samples/task_recycling_two_stage.sptl b/tests/csl_runtime/samples/task_recycling_two_stage_wse2.sptl similarity index 100% rename from tests/csl_runtime/samples/task_recycling_two_stage.sptl rename to tests/csl_runtime/samples/task_recycling_two_stage_wse2.sptl diff --git a/tests/csl_runtime/samples/task_recycling_two_stage_wse3.sptl b/tests/csl_runtime/samples/task_recycling_two_stage_wse3.sptl new file mode 100644 index 00000000..95c6b45c --- /dev/null +++ b/tests/csl_runtime/samples/task_recycling_two_stage_wse3.sptl @@ -0,0 +1,93 @@ +// Two sequential awaitall groups of 8 completions each. +// +// Key coloring property: +// Each awaitall with N completions creates N-1 blocked join tasks that form +// a clique in the conflict graph (they all have the main task as a common +// predecessor). With 8 completions per group that is a 7-clique per group. +// However, the two groups are fully sequential — every task in group 2 has +// every task in group 1 as an ancestor — so the two 7-cliques share no edges +// and can reuse the same 7 hardware slots. +// +// Total local tasks: ~15 (7 blocked in group 1, 7 blocked in group 2, T0, +// T_final, ...). Chromatic number: 7. Average slot depth: ~2 tasks/slot. +// +// The multiplication-by-zero trick (v * 0.0 + buf[k]) introduces a code-order +// dependency on v while keeping the mathematical value equal to buf[k], so the +// final result is simply sum(buf[0:16]). +kernel @task_recycling_two_stage<>(stream[1, 1] readonly input, + stream[1, 1] writeonly output) { + place u16 i, u16 j in [0:1, 0:1] { + f32[24] buf + f32 a0 + f32 a1 + f32 a2 + f32 a3 + f32 a4 + f32 a5 + f32 a6 + f32 a7 + f32 a8 + f32 a9 + f32 a10 + f32 a11 + f32 b0 + f32 b1 + f32 b2 + f32 b3 + f32 b4 + f32 b5 + f32 b6 + f32 b7 + f32 b8 + f32 b9 + f32 b10 + f32 b11 + f32 output_val + f32[1] out_buf + } + + dataflow u16 i, u16 j in [0:1, 0:1] { + } + + compute u16 i, u16 j in [0:1, 0:1] { + await receive(buf, input[i, j]) + + // Stage 1: 8 completions + completion s1_0 = async { a0 = buf[0] } + completion s1_1 = async { a1 = a0 * 0.0 + buf[1] } + completion s1_2 = async { a2 = a1 * 0.0 + buf[2] } + completion s1_3 = async { a3 = a2 * 0.0 + buf[3] } + completion s1_4 = async { a4 = a3 * 0.0 + buf[4] } + completion s1_5 = async { a5 = a4 * 0.0 + buf[5] } + completion s1_6 = async { a6 = a5 * 0.0 + buf[6] } + completion s1_7 = async { a7 = a6 * 0.0 + buf[7] } + completion s1_8 = async { a8 = a7 * 0.0 + buf[8] } + completion s1_9 = async { a9 = a8 * 0.0 + buf[9] } + completion s1_10 = async { a10 = a9 * 0.0 + buf[10] } + completion s1_11 = async { a11 = a10 * 0.0 + buf[11] } + awaitall + + // Stage 2: 8 completions (independent of stage 1's values) + completion s2_0 = async { b0 = buf[12] } + completion s2_1 = async { b1 = b0 * 0.0 + buf[13] } + completion s2_2 = async { b2 = b1 * 0.0 + buf[14] } + completion s2_3 = async { b3 = b2 * 0.0 + buf[15] } + completion s2_4 = async { b4 = b3 * 0.0 + buf[16] } + completion s2_5 = async { b5 = b4 * 0.0 + buf[17] } + completion s2_6 = async { b6 = b5 * 0.0 + buf[18] } + completion s2_7 = async { b7 = b6 * 0.0 + buf[19] } + completion s2_8 = async { b8 = b7 * 0.0 + buf[20] } + completion s2_9 = async { b9 = b8 * 0.0 + buf[21] } + completion s2_10 = async { b10 = b9 * 0.0 + buf[22] } + completion s2_11 = async { b11 = b10 * 0.0 + buf[23] } + awaitall + + output_val = a0 + a1 + a2 + a3 + a4 + a5 + a6 + a7 + output_val = output_val + a8 + a9 + a10 + a11 + output_val = output_val + b0 + b1 + b2 + b3 + b4 + b5 + b6 + b7 + output_val = output_val + b8 + b9 + b10 + b11 + out_buf[0] = output_val + + await send(out_buf, output[i, j]) + } +} diff --git a/tests/spatial_ir/test_task_recycling.py b/tests/spatial_ir/test_task_recycling.py index 9b026d8b..4eb77518 100644 --- a/tests/spatial_ir/test_task_recycling.py +++ b/tests/spatial_ir/test_task_recycling.py @@ -8,7 +8,10 @@ def _load_sample_kernel(): - sample = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples', 'task_recycling_merge.sptl') + kernel = f'task_recycling_merge_{os.getenv("WSE_ARCH", "wse2")}.sptl' + sample = os.path.join( + os.path.dirname(__file__), "..", "csl_runtime", "samples", kernel + ) kernel = parser.parse_file(sample) return passes.constexpr_propagation(kernel) @@ -37,11 +40,12 @@ def _create_linear_local_tasks(length: int): tasks.append( tdag.CSLTask( task_id=task_index, - task_type='local', + task_type="local", statements=[task_index], outgoing=outgoing, blocked=False, - )) + ) + ) return tasks @@ -49,20 +53,26 @@ def test_task_recycling_overflow_load_balanced(): local_task_count = len(constants.LOCAL_TASK_IDS) + 5 tasks = _create_linear_local_tasks(local_task_count) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) # All hardware slots are used. assert len(plan.local_slots) == len(constants.LOCAL_TASK_IDS) # With N+5 conflict-free tasks spread across N slots, each slot holds at most 2 tasks. assert all(len(slot.task_indices) <= 2 for slot in plan.local_slots) # Every task is assigned to exactly one slot. - assert {t for slot in plan.local_slots for t in slot.task_indices} == set(range(local_task_count)) + assert {t for slot in plan.local_slots for t in slot.task_indices} == set( + range(local_task_count) + ) def test_task_recycling_all_tasks_assigned(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) # Every logical task appears in exactly one slot. assigned = [t for slot in plan.local_slots for t in slot.task_indices] @@ -71,50 +81,74 @@ def test_task_recycling_all_tasks_assigned(): def test_task_recycling_plan_reuses_local_slots(): tasks = _create_unfused_tasks() - local_task_count = sum(1 for task in tasks if task.task_type == 'local') + local_task_count = sum(1 for task in tasks if task.task_type == "local") assert local_task_count > len(constants.LOCAL_TASK_IDS) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) assert len(plan.local_slots) <= len(constants.LOCAL_TASK_IDS) assert any(slot.recycled for slot in plan.local_slots) blocked_recycled = [ - task_index for task_index, task in enumerate(tasks) - if task.task_type == 'local' and task.blocked and plan.is_recycled_local_task(task_index) + task_index + for task_index, task in enumerate(tasks) + if task.task_type == "local" + and task.blocked + and plan.is_recycled_local_task(task_index) ] assert blocked_recycled merge_task = blocked_recycled[0] shared_slot = plan.local_slot(merge_task) - assert any(other < merge_task for other in shared_slot.task_indices if other != merge_task) + assert any( + other < merge_task for other in shared_slot.task_indices if other != merge_task + ) # --------------------------------------------------------------------------- # Conflict graph helpers # --------------------------------------------------------------------------- + def _create_fork_tasks(): """Task 0 activates tasks 1 and 2 independently; 1 and 2 are concurrent.""" return [ - tdag.CSLTask(0, 'local', [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False), - tdag.CSLTask(1, 'local', [1], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), - tdag.CSLTask(2, 'local', [2], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), + tdag.CSLTask( + 0, + "local", + [0], + [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], + blocked=False, + ), + tdag.CSLTask( + 1, "local", [1], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False + ), + tdag.CSLTask( + 2, "local", [2], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False + ), ] def _create_diamond_tasks(): """0 forks into 1 and 2, which join at the blocked task 3.""" return [ - tdag.CSLTask(0, 'local', [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False), - tdag.CSLTask(1, 'local', [1], [(3, tdag.InterTaskEdge.ACTIVATE)], blocked=False), - tdag.CSLTask(2, 'local', [2], [(3, tdag.InterTaskEdge.UNBLOCK)], blocked=False), - tdag.CSLTask(3, 'local', [3], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=True), + tdag.CSLTask( + 0, + "local", + [0], + [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], + blocked=False, + ), + tdag.CSLTask( + 1, "local", [1], [(3, tdag.InterTaskEdge.ACTIVATE)], blocked=False + ), + tdag.CSLTask(2, "local", [2], [(3, tdag.InterTaskEdge.UNBLOCK)], blocked=False), + tdag.CSLTask( + 3, "local", [3], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=True + ), ] @@ -123,7 +157,7 @@ def test_fork_concurrent_arms_conflict(): tasks = _create_fork_tasks() conflict_graph = task_recycling._build_conflict_graph(tasks, [0, 1, 2]) - assert 2 in conflict_graph[1], 'Tasks 1 and 2 are concurrent and must conflict' + assert 2 in conflict_graph[1], "Tasks 1 and 2 are concurrent and must conflict" assert 1 in conflict_graph[2] # Task 0 strictly precedes both 1 and 2, so it does not conflict with either. assert 1 not in conflict_graph[0] @@ -145,23 +179,26 @@ def test_diamond_source_and_sink_do_not_conflict(): def test_no_conflicting_tasks_share_slot(): """Fundamental safety invariant: tasks that conflict must be in different slots.""" tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - local_indices = [i for i, t in enumerate(tasks) if t.task_type == 'local'] + local_indices = [i for i, t in enumerate(tasks) if t.task_type == "local"] conflict_graph = task_recycling._build_conflict_graph(tasks, local_indices) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) for slot in plan.local_slots: for i, a in enumerate(slot.task_indices): - for b in slot.task_indices[i + 1:]: - assert b not in conflict_graph[a], ( - f'Tasks {a} and {b} conflict but were placed in the same slot' - ) + for b in slot.task_indices[i + 1 :]: + assert ( + b not in conflict_graph[a] + ), f"Tasks {a} and {b} conflict but were placed in the same slot" # --------------------------------------------------------------------------- # greedy_coloring helper # --------------------------------------------------------------------------- + def test_greedy_coloring_max_colors_infeasible(): """A triangle (K3) needs 3 colors; requesting 2 must return None.""" k3 = {0: {1, 2}, 1: {0, 2}, 2: {0, 1}} @@ -176,14 +213,18 @@ def test_greedy_coloring_max_colors_feasible(): assert len(set(result.values())) == 3 for v, neighbors in k3.items(): for u in neighbors: - assert result[v] != result[u], 'Adjacent vertices must have different colors' + assert ( + result[v] != result[u] + ), "Adjacent vertices must have different colors" def test_greedy_coloring_fixed_colors_preserved(): """Fixed-prefix colors must be retained in the returned coloring.""" no_conflicts = {v: set() for v in range(4)} fixed = {0: 7, 1: 3} - result = task_recycling.greedy_coloring(no_conflicts, [0, 1, 2, 3], fixed_coloring=fixed) + result = task_recycling.greedy_coloring( + no_conflicts, [0, 1, 2, 3], fixed_coloring=fixed + ) assert result is not None assert result[0] == 7 assert result[1] == 3 @@ -195,7 +236,9 @@ def test_greedy_coloring_fixed_conflict_honored(): """A free vertex adjacent to a fixed-color vertex must not receive that color.""" conflict_graph = {0: {1}, 1: {0}, 2: set()} fixed = {0: 5} - result = task_recycling.greedy_coloring(conflict_graph, [0, 1, 2], fixed_coloring=fixed, max_colors=10) + result = task_recycling.greedy_coloring( + conflict_graph, [0, 1, 2], fixed_coloring=fixed, max_colors=10 + ) assert result is not None assert result[0] == 5 assert result[1] != 5 @@ -205,15 +248,20 @@ def test_greedy_coloring_fixed_conflict_honored(): # plan_task_bindings mode behavior # --------------------------------------------------------------------------- + def test_fail_on_overrun_raises(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 1) - with pytest.raises(ValueError, match='Too many local tasks'): - task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN) + with pytest.raises(ValueError, match="Too many local tasks"): + task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN + ) def test_fail_on_overrun_exact_fit(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN + ) assert not plan.uses_recycling assert len(plan.local_slots) == len(constants.LOCAL_TASK_IDS) @@ -221,13 +269,17 @@ def test_fail_on_overrun_exact_fit(): def test_state_machine_no_recycling_when_tasks_fit(): """When local task count ≤ hardware slots, every task gets its own unique slot.""" tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) assert not plan.uses_recycling assert all(len(slot.task_indices) == 1 for slot in plan.local_slots) def test_empty_task_list_returns_empty_plan(): - plan = task_recycling.plan_task_bindings([], tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + [], tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) assert plan.local_slots == () assert plan.task_to_local_slot == {} @@ -236,22 +288,29 @@ def test_empty_task_list_returns_empty_plan(): # Transition preamble # --------------------------------------------------------------------------- + def test_transition_preamble_empty_for_non_recycled(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) for i in range(len(tasks)): - assert plan.emit_local_transition_preamble(i, blocked=False) == '' - assert plan.emit_local_transition_preamble(i, blocked=True) == '' - + assert plan.emit_local_transition_preamble(i, blocked=False) == "" + assert plan.emit_local_transition_preamble(i, blocked=True) == "" # --------------------------------------------------------------------------- # Determinism # --------------------------------------------------------------------------- + def test_plan_is_deterministic(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - plan1 = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) - plan2 = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan1 = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) + plan2 = task_recycling.plan_task_bindings( + tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN + ) assert plan1.task_to_local_slot == plan2.task_to_local_slot assert plan1.task_to_local_state == plan2.task_to_local_state diff --git a/tests/spatial_ir/test_task_recycling_codegen.py b/tests/spatial_ir/test_task_recycling_codegen.py index dc023cf0..6b8e6bb8 100644 --- a/tests/spatial_ir/test_task_recycling_codegen.py +++ b/tests/spatial_ir/test_task_recycling_codegen.py @@ -7,39 +7,47 @@ from spada.syntax.spatial_ir import parser, passes _CSL_RUNTIME_TASK_RECYCLING_SAMPLES = os.path.join( - os.path.dirname(__file__), '..', 'csl_runtime', 'samples') + os.path.dirname(__file__), "..", "csl_runtime", "samples" +) def test_task_recycling_codegen_uses_else_if_dispatch_for_recycled_slots(): sample = os.path.join( - os.path.dirname(__file__), '..', '..', 'samples', 'spatial', 'collectives', 'tree_reduce_2D.sptl') + os.path.dirname(__file__), + "..", + "..", + "samples", + "spatial", + "collectives", + "tree_reduce_2D.sptl", + ) kernel = parser.parse_file(sample) kernel = passes.concretize_parameters(kernel, LX=8, LY=8, K=16) kernel = passes.constexpr_propagation(kernel) csl_files = lower_spatial_ir_to_csl(kernel, task_fusion=False) - code = next(file.code for file in csl_files if file.filename == 'code_0_0.csl') + code = next(file.code for file in csl_files if file.filename == "code_0_0.csl") task_id_occurrences: dict[str, int] = {} - for hardware_id in re.findall(r'@get_local_task_id\((\d+)\)', code): + for hardware_id in re.findall(r"@get_local_task_id\((\d+)\)", code): task_id_occurrences[hardware_id] = task_id_occurrences.get(hardware_id, 0) + 1 assert any(count > 1 for count in task_id_occurrences.values()) - assert '__task_slot_' in code - assert 'else if (__task_slot_' in code + assert "__task_slot_" in code + assert "else if (__task_slot_" in code assert re.search( - r'if \(__task_slot_\d+_state == \d+\) \{.*?\n\s+\}\n\s+else if \(__task_slot_\d+_state == \d+\) \{', + r"if \(__task_slot_\d+_state == \d+\) \{.*?\n\s+\}\n\s+else if \(__task_slot_\d+_state == \d+\) \{", code, re.S, ) @pytest.mark.parametrize( - 'filename', + "filename", ( - 'task_recycling_merge.sptl', - 'task_recycling_two_stage.sptl', - 'task_recycling_three_stage.sptl', + f'task_recycling_merge_{os.getenv("WSE_ARCH", "wse2")}.sptl', + f'task_recycling_two_stage_{os.getenv("WSE_ARCH", "wse2")}.sptl', + f'task_recycling_three_stage_{os.getenv("WSE_ARCH", "wse2")}.sptl', ), ) def test_csl_runtime_task_recycling_sample_lowers(filename: str): @@ -48,27 +56,33 @@ def test_csl_runtime_task_recycling_sample_lowers(filename: str): kernel = parser.parse_file(path) kernel = passes.constexpr_propagation(kernel) csl_files = lower_spatial_ir_to_csl( - kernel, task_fusion=False, copy_elision=True, prune_memory=True) - assert csl_files, 'expected at least one generated CSL file' - combined = '\n'.join(f.code for f in csl_files) - assert combined.strip(), 'expected non-empty CSL' - assert '__task_slot_' in combined, 'expected task-ID recycling in generated CSL' + kernel, task_fusion=False, copy_elision=True, prune_memory=True + ) + assert csl_files, "expected at least one generated CSL file" + combined = "\n".join(f.code for f in csl_files) + assert combined.strip(), "expected non-empty CSL" + assert "__task_slot_" in combined, "expected task-ID recycling in generated CSL" def test_codegen_avoids_local_task_id_color_overlap(): - path = os.path.join(_CSL_RUNTIME_TASK_RECYCLING_SAMPLES, 'task_color_overlap_many_channels.sptl') + path = os.path.join( + _CSL_RUNTIME_TASK_RECYCLING_SAMPLES, "task_color_overlap_many_channels.sptl" + ) kernel = parser.parse_file(path) kernel = passes.constexpr_propagation(kernel) csl_files = lower_spatial_ir_to_csl( - kernel, task_fusion=False, copy_elision=True, prune_memory=True) - combined = '\n'.join(f.code for f in csl_files) + kernel, task_fusion=False, copy_elision=True, prune_memory=True + ) + combined = "\n".join(f.code for f in csl_files) - local_task_ids = {int(v) for v in re.findall(r'@get_local_task_id\((\d+)\)', combined)} - colors = {int(v) for v in re.findall(r'@get_color\((\d+)\)', combined)} + local_task_ids = { + int(v) for v in re.findall(r"@get_local_task_id\((\d+)\)", combined) + } + colors = {int(v) for v in re.findall(r"@get_color\((\d+)\)", combined)} - assert 8 in colors, 'sample should force color 8 to be allocated' + assert 8 in colors, "sample should force color 8 to be allocated" assert local_task_ids - assert local_task_ids.isdisjoint(colors), ( - f'local task IDs overlap communication colors: ids={sorted(local_task_ids)}, colors={sorted(colors)}' - ) + assert local_task_ids.isdisjoint( + colors + ), f"local task IDs overlap communication colors: ids={sorted(local_task_ids)}, colors={sorted(colors)}" From b90ac07585323eafdaa2a7d61e89e238bf632f25 Mon Sep 17 00:00:00 2001 From: "jonathan.schaefer@hlrs.de" Date: Wed, 3 Jun 2026 15:34:51 +0100 Subject: [PATCH 2/5] Fixed test non-hw-executed cases to support wse3 This commit introduces several changes: - Renamed the task_recycling_*.sptl files to a wse2 specific version - Created new corresponding task_recycling_*_wse3.sptl files that represent correct test cases for wse3 - Added conditional test case execution depending on environment variable WSE_ARCH --- tests/csl_runtime/samples/task_recycling_merge_wse3.sptl | 3 --- 1 file changed, 3 deletions(-) diff --git a/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl b/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl index 71bc6531..0431b945 100644 --- a/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl +++ b/tests/csl_runtime/samples/task_recycling_merge_wse3.sptl @@ -1,6 +1,3 @@ -// Two awaitall groups -// -// This test was altered because the old test with one awaitall fails on WSE-3 kernel @task_recycling_merge<>(stream[1, 1] readonly input, stream[1, 1] writeonly output) { place u16 i, u16 j in [0:1, 0:1] { f32[22] buf From a57ec9a577b84a877d0879ffc7e12ae73240d1dd Mon Sep 17 00:00:00 2001 From: "jonathan.schaefer@hlrs.de" Date: Fri, 5 Jun 2026 09:20:06 +0100 Subject: [PATCH 3/5] reverted bad style formatting --- tests/spatial_ir/test_task_recycling.py | 145 ++++++------------ .../spatial_ir/test_task_recycling_codegen.py | 60 +++----- 2 files changed, 64 insertions(+), 141 deletions(-) diff --git a/tests/spatial_ir/test_task_recycling.py b/tests/spatial_ir/test_task_recycling.py index 4eb77518..9b026d8b 100644 --- a/tests/spatial_ir/test_task_recycling.py +++ b/tests/spatial_ir/test_task_recycling.py @@ -8,10 +8,7 @@ def _load_sample_kernel(): - kernel = f'task_recycling_merge_{os.getenv("WSE_ARCH", "wse2")}.sptl' - sample = os.path.join( - os.path.dirname(__file__), "..", "csl_runtime", "samples", kernel - ) + sample = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples', 'task_recycling_merge.sptl') kernel = parser.parse_file(sample) return passes.constexpr_propagation(kernel) @@ -40,12 +37,11 @@ def _create_linear_local_tasks(length: int): tasks.append( tdag.CSLTask( task_id=task_index, - task_type="local", + task_type='local', statements=[task_index], outgoing=outgoing, blocked=False, - ) - ) + )) return tasks @@ -53,26 +49,20 @@ def test_task_recycling_overflow_load_balanced(): local_task_count = len(constants.LOCAL_TASK_IDS) + 5 tasks = _create_linear_local_tasks(local_task_count) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) # All hardware slots are used. assert len(plan.local_slots) == len(constants.LOCAL_TASK_IDS) # With N+5 conflict-free tasks spread across N slots, each slot holds at most 2 tasks. assert all(len(slot.task_indices) <= 2 for slot in plan.local_slots) # Every task is assigned to exactly one slot. - assert {t for slot in plan.local_slots for t in slot.task_indices} == set( - range(local_task_count) - ) + assert {t for slot in plan.local_slots for t in slot.task_indices} == set(range(local_task_count)) def test_task_recycling_all_tasks_assigned(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) # Every logical task appears in exactly one slot. assigned = [t for slot in plan.local_slots for t in slot.task_indices] @@ -81,74 +71,50 @@ def test_task_recycling_all_tasks_assigned(): def test_task_recycling_plan_reuses_local_slots(): tasks = _create_unfused_tasks() - local_task_count = sum(1 for task in tasks if task.task_type == "local") + local_task_count = sum(1 for task in tasks if task.task_type == 'local') assert local_task_count > len(constants.LOCAL_TASK_IDS) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) assert len(plan.local_slots) <= len(constants.LOCAL_TASK_IDS) assert any(slot.recycled for slot in plan.local_slots) blocked_recycled = [ - task_index - for task_index, task in enumerate(tasks) - if task.task_type == "local" - and task.blocked - and plan.is_recycled_local_task(task_index) + task_index for task_index, task in enumerate(tasks) + if task.task_type == 'local' and task.blocked and plan.is_recycled_local_task(task_index) ] assert blocked_recycled merge_task = blocked_recycled[0] shared_slot = plan.local_slot(merge_task) - assert any( - other < merge_task for other in shared_slot.task_indices if other != merge_task - ) + assert any(other < merge_task for other in shared_slot.task_indices if other != merge_task) # --------------------------------------------------------------------------- # Conflict graph helpers # --------------------------------------------------------------------------- - def _create_fork_tasks(): """Task 0 activates tasks 1 and 2 independently; 1 and 2 are concurrent.""" return [ - tdag.CSLTask( - 0, - "local", - [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False, - ), - tdag.CSLTask( - 1, "local", [1], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False - ), - tdag.CSLTask( - 2, "local", [2], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False - ), + tdag.CSLTask(0, 'local', [0], + [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], + blocked=False), + tdag.CSLTask(1, 'local', [1], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), + tdag.CSLTask(2, 'local', [2], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), ] def _create_diamond_tasks(): """0 forks into 1 and 2, which join at the blocked task 3.""" return [ - tdag.CSLTask( - 0, - "local", - [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False, - ), - tdag.CSLTask( - 1, "local", [1], [(3, tdag.InterTaskEdge.ACTIVATE)], blocked=False - ), - tdag.CSLTask(2, "local", [2], [(3, tdag.InterTaskEdge.UNBLOCK)], blocked=False), - tdag.CSLTask( - 3, "local", [3], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=True - ), + tdag.CSLTask(0, 'local', [0], + [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], + blocked=False), + tdag.CSLTask(1, 'local', [1], [(3, tdag.InterTaskEdge.ACTIVATE)], blocked=False), + tdag.CSLTask(2, 'local', [2], [(3, tdag.InterTaskEdge.UNBLOCK)], blocked=False), + tdag.CSLTask(3, 'local', [3], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=True), ] @@ -157,7 +123,7 @@ def test_fork_concurrent_arms_conflict(): tasks = _create_fork_tasks() conflict_graph = task_recycling._build_conflict_graph(tasks, [0, 1, 2]) - assert 2 in conflict_graph[1], "Tasks 1 and 2 are concurrent and must conflict" + assert 2 in conflict_graph[1], 'Tasks 1 and 2 are concurrent and must conflict' assert 1 in conflict_graph[2] # Task 0 strictly precedes both 1 and 2, so it does not conflict with either. assert 1 not in conflict_graph[0] @@ -179,26 +145,23 @@ def test_diamond_source_and_sink_do_not_conflict(): def test_no_conflicting_tasks_share_slot(): """Fundamental safety invariant: tasks that conflict must be in different slots.""" tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - local_indices = [i for i, t in enumerate(tasks) if t.task_type == "local"] + local_indices = [i for i, t in enumerate(tasks) if t.task_type == 'local'] conflict_graph = task_recycling._build_conflict_graph(tasks, local_indices) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) for slot in plan.local_slots: for i, a in enumerate(slot.task_indices): - for b in slot.task_indices[i + 1 :]: - assert ( - b not in conflict_graph[a] - ), f"Tasks {a} and {b} conflict but were placed in the same slot" + for b in slot.task_indices[i + 1:]: + assert b not in conflict_graph[a], ( + f'Tasks {a} and {b} conflict but were placed in the same slot' + ) # --------------------------------------------------------------------------- # greedy_coloring helper # --------------------------------------------------------------------------- - def test_greedy_coloring_max_colors_infeasible(): """A triangle (K3) needs 3 colors; requesting 2 must return None.""" k3 = {0: {1, 2}, 1: {0, 2}, 2: {0, 1}} @@ -213,18 +176,14 @@ def test_greedy_coloring_max_colors_feasible(): assert len(set(result.values())) == 3 for v, neighbors in k3.items(): for u in neighbors: - assert ( - result[v] != result[u] - ), "Adjacent vertices must have different colors" + assert result[v] != result[u], 'Adjacent vertices must have different colors' def test_greedy_coloring_fixed_colors_preserved(): """Fixed-prefix colors must be retained in the returned coloring.""" no_conflicts = {v: set() for v in range(4)} fixed = {0: 7, 1: 3} - result = task_recycling.greedy_coloring( - no_conflicts, [0, 1, 2, 3], fixed_coloring=fixed - ) + result = task_recycling.greedy_coloring(no_conflicts, [0, 1, 2, 3], fixed_coloring=fixed) assert result is not None assert result[0] == 7 assert result[1] == 3 @@ -236,9 +195,7 @@ def test_greedy_coloring_fixed_conflict_honored(): """A free vertex adjacent to a fixed-color vertex must not receive that color.""" conflict_graph = {0: {1}, 1: {0}, 2: set()} fixed = {0: 5} - result = task_recycling.greedy_coloring( - conflict_graph, [0, 1, 2], fixed_coloring=fixed, max_colors=10 - ) + result = task_recycling.greedy_coloring(conflict_graph, [0, 1, 2], fixed_coloring=fixed, max_colors=10) assert result is not None assert result[0] == 5 assert result[1] != 5 @@ -248,20 +205,15 @@ def test_greedy_coloring_fixed_conflict_honored(): # plan_task_bindings mode behavior # --------------------------------------------------------------------------- - def test_fail_on_overrun_raises(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 1) - with pytest.raises(ValueError, match="Too many local tasks"): - task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN - ) + with pytest.raises(ValueError, match='Too many local tasks'): + task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN) def test_fail_on_overrun_exact_fit(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.FAIL_ON_OVERRUN) assert not plan.uses_recycling assert len(plan.local_slots) == len(constants.LOCAL_TASK_IDS) @@ -269,17 +221,13 @@ def test_fail_on_overrun_exact_fit(): def test_state_machine_no_recycling_when_tasks_fit(): """When local task count ≤ hardware slots, every task gets its own unique slot.""" tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) assert not plan.uses_recycling assert all(len(slot.task_indices) == 1 for slot in plan.local_slots) def test_empty_task_list_returns_empty_plan(): - plan = task_recycling.plan_task_bindings( - [], tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings([], tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) assert plan.local_slots == () assert plan.task_to_local_slot == {} @@ -288,29 +236,22 @@ def test_empty_task_list_returns_empty_plan(): # Transition preamble # --------------------------------------------------------------------------- - def test_transition_preamble_empty_for_non_recycled(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) - plan = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) for i in range(len(tasks)): - assert plan.emit_local_transition_preamble(i, blocked=False) == "" - assert plan.emit_local_transition_preamble(i, blocked=True) == "" + assert plan.emit_local_transition_preamble(i, blocked=False) == '' + assert plan.emit_local_transition_preamble(i, blocked=True) == '' + # --------------------------------------------------------------------------- # Determinism # --------------------------------------------------------------------------- - def test_plan_is_deterministic(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) - plan1 = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) - plan2 = task_recycling.plan_task_bindings( - tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN - ) + plan1 = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) + plan2 = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) assert plan1.task_to_local_slot == plan2.task_to_local_slot assert plan1.task_to_local_state == plan2.task_to_local_state diff --git a/tests/spatial_ir/test_task_recycling_codegen.py b/tests/spatial_ir/test_task_recycling_codegen.py index 6b8e6bb8..0de9b268 100644 --- a/tests/spatial_ir/test_task_recycling_codegen.py +++ b/tests/spatial_ir/test_task_recycling_codegen.py @@ -6,44 +6,35 @@ from spada.lowering.spatial_ir_to_csl import lower_spatial_ir_to_csl from spada.syntax.spatial_ir import parser, passes -_CSL_RUNTIME_TASK_RECYCLING_SAMPLES = os.path.join( - os.path.dirname(__file__), "..", "csl_runtime", "samples" -) +_CSL_RUNTIME_TASK_RECYCLING_SAMPLES = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples') def test_task_recycling_codegen_uses_else_if_dispatch_for_recycled_slots(): sample = os.path.join( - os.path.dirname(__file__), - "..", - "..", - "samples", - "spatial", - "collectives", - "tree_reduce_2D.sptl", - ) + os.path.dirname(__file__), '..', '..', 'samples', 'spatial', 'collectives', 'tree_reduce_2D.sptl') kernel = parser.parse_file(sample) kernel = passes.concretize_parameters(kernel, LX=8, LY=8, K=16) kernel = passes.constexpr_propagation(kernel) csl_files = lower_spatial_ir_to_csl(kernel, task_fusion=False) - code = next(file.code for file in csl_files if file.filename == "code_0_0.csl") + code = next(file.code for file in csl_files if file.filename == 'code_0_0.csl') task_id_occurrences: dict[str, int] = {} - for hardware_id in re.findall(r"@get_local_task_id\((\d+)\)", code): + for hardware_id in re.findall(r'@get_local_task_id\((\d+)\)', code): task_id_occurrences[hardware_id] = task_id_occurrences.get(hardware_id, 0) + 1 assert any(count > 1 for count in task_id_occurrences.values()) - assert "__task_slot_" in code - assert "else if (__task_slot_" in code + assert '__task_slot_' in code + assert 'else if (__task_slot_' in code assert re.search( - r"if \(__task_slot_\d+_state == \d+\) \{.*?\n\s+\}\n\s+else if \(__task_slot_\d+_state == \d+\) \{", + r'if \(__task_slot_\d+_state == \d+\) \{.*?\n\s+\}\n\s+else if \(__task_slot_\d+_state == \d+\) \{', code, re.S, ) @pytest.mark.parametrize( - "filename", + 'filename', ( f'task_recycling_merge_{os.getenv("WSE_ARCH", "wse2")}.sptl', f'task_recycling_two_stage_{os.getenv("WSE_ARCH", "wse2")}.sptl', @@ -55,34 +46,25 @@ def test_csl_runtime_task_recycling_sample_lowers(filename: str): path = os.path.join(_CSL_RUNTIME_TASK_RECYCLING_SAMPLES, filename) kernel = parser.parse_file(path) kernel = passes.constexpr_propagation(kernel) - csl_files = lower_spatial_ir_to_csl( - kernel, task_fusion=False, copy_elision=True, prune_memory=True - ) - assert csl_files, "expected at least one generated CSL file" - combined = "\n".join(f.code for f in csl_files) - assert combined.strip(), "expected non-empty CSL" - assert "__task_slot_" in combined, "expected task-ID recycling in generated CSL" + csl_files = lower_spatial_ir_to_csl(kernel, task_fusion=False, copy_elision=True, prune_memory=True) + assert csl_files, 'expected at least one generated CSL file' + combined = '\n'.join(f.code for f in csl_files) + assert combined.strip(), 'expected non-empty CSL' + assert '__task_slot_' in combined, 'expected task-ID recycling in generated CSL' def test_codegen_avoids_local_task_id_color_overlap(): - path = os.path.join( - _CSL_RUNTIME_TASK_RECYCLING_SAMPLES, "task_color_overlap_many_channels.sptl" - ) + path = os.path.join(_CSL_RUNTIME_TASK_RECYCLING_SAMPLES, 'task_color_overlap_many_channels.sptl') kernel = parser.parse_file(path) kernel = passes.constexpr_propagation(kernel) - csl_files = lower_spatial_ir_to_csl( - kernel, task_fusion=False, copy_elision=True, prune_memory=True - ) - combined = "\n".join(f.code for f in csl_files) + csl_files = lower_spatial_ir_to_csl(kernel, task_fusion=False, copy_elision=True, prune_memory=True) + combined = '\n'.join(f.code for f in csl_files) - local_task_ids = { - int(v) for v in re.findall(r"@get_local_task_id\((\d+)\)", combined) - } - colors = {int(v) for v in re.findall(r"@get_color\((\d+)\)", combined)} + local_task_ids = {int(v) for v in re.findall(r'@get_local_task_id\((\d+)\)', combined)} + colors = {int(v) for v in re.findall(r'@get_color\((\d+)\)', combined)} - assert 8 in colors, "sample should force color 8 to be allocated" + assert 8 in colors, 'sample should force color 8 to be allocated' assert local_task_ids - assert local_task_ids.isdisjoint( - colors - ), f"local task IDs overlap communication colors: ids={sorted(local_task_ids)}, colors={sorted(colors)}" + assert local_task_ids.isdisjoint(colors), ( + f'local task IDs overlap communication colors: ids={sorted(local_task_ids)}, colors={sorted(colors)}') From f7c6c59911b67aa973b5ae294291b2aec7e1e95e Mon Sep 17 00:00:00 2001 From: "jonathan.schaefer@hlrs.de" Date: Fri, 5 Jun 2026 09:50:49 +0100 Subject: [PATCH 4/5] Moved from env variable to constants.ARCH --- tests/spatial_ir/test_task_recycling.py | 2 +- tests/spatial_ir/test_task_recycling_codegen.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/spatial_ir/test_task_recycling.py b/tests/spatial_ir/test_task_recycling.py index 9b026d8b..1ca29f40 100644 --- a/tests/spatial_ir/test_task_recycling.py +++ b/tests/spatial_ir/test_task_recycling.py @@ -8,7 +8,7 @@ def _load_sample_kernel(): - sample = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples', 'task_recycling_merge.sptl') + sample = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples', f'task_recycling_merge_{constants.ARCH}.sptl') kernel = parser.parse_file(sample) return passes.constexpr_propagation(kernel) diff --git a/tests/spatial_ir/test_task_recycling_codegen.py b/tests/spatial_ir/test_task_recycling_codegen.py index 0de9b268..13e91635 100644 --- a/tests/spatial_ir/test_task_recycling_codegen.py +++ b/tests/spatial_ir/test_task_recycling_codegen.py @@ -5,6 +5,7 @@ from spada.lowering.spatial_ir_to_csl import lower_spatial_ir_to_csl from spada.syntax.spatial_ir import parser, passes +from spada.syntax.csl import constants _CSL_RUNTIME_TASK_RECYCLING_SAMPLES = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples') @@ -36,9 +37,9 @@ def test_task_recycling_codegen_uses_else_if_dispatch_for_recycled_slots(): @pytest.mark.parametrize( 'filename', ( - f'task_recycling_merge_{os.getenv("WSE_ARCH", "wse2")}.sptl', - f'task_recycling_two_stage_{os.getenv("WSE_ARCH", "wse2")}.sptl', - f'task_recycling_three_stage_{os.getenv("WSE_ARCH", "wse2")}.sptl', + f'task_recycling_merge_{constants.ARCH}.sptl', + f'task_recycling_two_stage_{constants.ARCH}.sptl', + f'task_recycling_three_stage_{constants.ARCH}.sptl', ), ) def test_csl_runtime_task_recycling_sample_lowers(filename: str): From c379533c649b1a2871b6c40a2f0639ce73849f7e Mon Sep 17 00:00:00 2001 From: "jonathan.schaefer@hlrs.de" Date: Fri, 5 Jun 2026 09:55:27 +0100 Subject: [PATCH 5/5] Applied style formatting --- tests/spatial_ir/test_task_recycling.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/tests/spatial_ir/test_task_recycling.py b/tests/spatial_ir/test_task_recycling.py index 1ca29f40..723102ad 100644 --- a/tests/spatial_ir/test_task_recycling.py +++ b/tests/spatial_ir/test_task_recycling.py @@ -8,7 +8,8 @@ def _load_sample_kernel(): - sample = os.path.join(os.path.dirname(__file__), '..', 'csl_runtime', 'samples', f'task_recycling_merge_{constants.ARCH}.sptl') + sample = os.path.join( + os.path.dirname(__file__), '..', 'csl_runtime', 'samples', f'task_recycling_merge_{constants.ARCH}.sptl') kernel = parser.parse_file(sample) return passes.constexpr_propagation(kernel) @@ -95,12 +96,12 @@ def test_task_recycling_plan_reuses_local_slots(): # Conflict graph helpers # --------------------------------------------------------------------------- + def _create_fork_tasks(): """Task 0 activates tasks 1 and 2 independently; 1 and 2 are concurrent.""" return [ - tdag.CSLTask(0, 'local', [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False), + tdag.CSLTask( + 0, 'local', [0], [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], blocked=False), tdag.CSLTask(1, 'local', [1], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), tdag.CSLTask(2, 'local', [2], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=False), ] @@ -109,9 +110,8 @@ def _create_fork_tasks(): def _create_diamond_tasks(): """0 forks into 1 and 2, which join at the blocked task 3.""" return [ - tdag.CSLTask(0, 'local', [0], - [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], - blocked=False), + tdag.CSLTask( + 0, 'local', [0], [(1, tdag.InterTaskEdge.ACTIVATE), (2, tdag.InterTaskEdge.ACTIVATE)], blocked=False), tdag.CSLTask(1, 'local', [1], [(3, tdag.InterTaskEdge.ACTIVATE)], blocked=False), tdag.CSLTask(2, 'local', [2], [(3, tdag.InterTaskEdge.UNBLOCK)], blocked=False), tdag.CSLTask(3, 'local', [3], [(-1, tdag.InterTaskEdge.SEQUENCE)], blocked=True), @@ -153,15 +153,14 @@ def test_no_conflicting_tasks_share_slot(): for slot in plan.local_slots: for i, a in enumerate(slot.task_indices): for b in slot.task_indices[i + 1:]: - assert b not in conflict_graph[a], ( - f'Tasks {a} and {b} conflict but were placed in the same slot' - ) + assert b not in conflict_graph[a], (f'Tasks {a} and {b} conflict but were placed in the same slot') # --------------------------------------------------------------------------- # greedy_coloring helper # --------------------------------------------------------------------------- + def test_greedy_coloring_max_colors_infeasible(): """A triangle (K3) needs 3 colors; requesting 2 must return None.""" k3 = {0: {1, 2}, 1: {0, 2}, 2: {0, 1}} @@ -205,6 +204,7 @@ def test_greedy_coloring_fixed_conflict_honored(): # plan_task_bindings mode behavior # --------------------------------------------------------------------------- + def test_fail_on_overrun_raises(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 1) with pytest.raises(ValueError, match='Too many local tasks'): @@ -236,6 +236,7 @@ def test_empty_task_list_returns_empty_plan(): # Transition preamble # --------------------------------------------------------------------------- + def test_transition_preamble_empty_for_non_recycled(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS)) plan = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN) @@ -244,11 +245,11 @@ def test_transition_preamble_empty_for_non_recycled(): assert plan.emit_local_transition_preamble(i, blocked=True) == '' - # --------------------------------------------------------------------------- # Determinism # --------------------------------------------------------------------------- + def test_plan_is_deterministic(): tasks = _create_linear_local_tasks(len(constants.LOCAL_TASK_IDS) + 5) plan1 = task_recycling.plan_task_bindings(tasks, tdag.TaskCreationBehavior.STATE_MACHINE_ON_OVERRUN)