From 13da7a3f23bc4271a50408af4e3951266eab069b Mon Sep 17 00:00:00 2001 From: zhusy54 Date: Fri, 13 Mar 2026 15:59:26 +0800 Subject: [PATCH] Refactor: split unified task ring into Main Ring + Consumed Ring Separate per-task data by lifecycle to reduce ring buffer back-pressure: - Main Ring (freed at RELEASED): descriptor + payload (~3800B/task) - Consumed Ring (freed at CONSUMED): task_state + fanout tracking (~64B/task) Add RELEASED(4) and CONSUMED(5) states to extend the task state machine. Introduce dual waterline advancement (last_task_released / last_task_consumed) with independent try-locks for each reclamation path. TensorMap validity now uses last_task_consumed; dep pool reclamation uses last_task_released. Consumed ring defaults to 4x main ring capacity (configurable via PTO2_RING_CONSUMED_WINDOW env var). --- .../aicpu/aicpu_executor.cpp | 17 +- .../host/runtime_maker.cpp | 8 +- .../runtime/pto_orchestrator.cpp | 104 +++++----- .../runtime/pto_orchestrator.h | 2 +- .../runtime/pto_ring_buffer.cpp | 4 +- .../runtime/pto_ring_buffer.h | 43 ++-- .../runtime/pto_runtime2.cpp | 15 +- .../runtime/pto_runtime2_types.h | 82 +++++--- .../runtime/pto_scheduler.cpp | 74 ++++--- .../runtime/pto_scheduler.h | 183 ++++++++++++------ .../runtime/pto_shared_memory.cpp | 13 +- .../runtime/pto_shared_memory.h | 8 +- .../runtime/pto_tensormap.cpp | 18 +- .../runtime/pto_tensormap.h | 20 +- .../runtime/runtime.cpp | 23 +-- .../runtime/runtime.h | 5 +- 16 files changed, 381 insertions(+), 238 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 7182d5c1..c99ea27e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -1271,9 +1271,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa int32_t cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0; for (int32_t si = 0; si < task_count; si++) { int32_t slot = si & window_mask; - PTO2TaskState st = sched->slot_states[slot].task_state.load(std::memory_order_relaxed); - int32_t rc = sched->slot_states[slot].fanin_refcount.load(std::memory_order_relaxed); - int32_t fi = sched->slot_states[slot].fanin_count; + PTO2TaskState st = sched->get_consumed_entry(si).task_state.load(std::memory_order_relaxed); + int32_t rc = sched->get_main_slot(si).fanin_refcount.load(std::memory_order_relaxed); + int32_t fi = sched->get_main_slot(si).fanin_count; int32_t kid = task_descriptors[slot].kernel_id[0]; if (st >= PTO2_TASK_COMPLETED) continue; // Already done if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; } @@ -1614,6 +1614,10 @@ int32_t AicpuExecutor::run(Runtime* runtime) { if (runtime->pto2_heap_size > 0) { heap_size = runtime->pto2_heap_size; } + uint64_t consumed_window_size = task_window_size * 4; // default + if (runtime->pto2_consumed_window_size > 0) { + consumed_window_size = runtime->pto2_consumed_window_size; + } DEV_INFO("Thread %d: Ring sizes: task_window=%lu, heap=%lu", thread_idx, (unsigned long)task_window_size, (unsigned long)heap_size); @@ -1631,6 +1635,9 @@ int32_t AicpuExecutor::run(Runtime* runtime) { return -1; } + // Set consumed_window_size before runtime creation + sm_handle->header->consumed_window_size = consumed_window_size; + rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE, sm_handle, gm_heap, heap_size, orch_thread_num_); if (!rt) { @@ -1641,8 +1648,8 @@ int32_t AicpuExecutor::run(Runtime* runtime) { return -1; } - // Wire up slot_states pointer for profiling (complete_perf_records) - runtime->set_pto2_slot_states_ptr(rt->scheduler.slot_states); + // Wire up consumed_ring pointer for profiling (complete_perf_records) + runtime->set_pto2_consumed_ring_ptr(rt->scheduler.consumed_ring); // Store shared state for other orchestrator threads orch_func_ = orch_func; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp index c9238d89..137b074e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp @@ -265,10 +265,12 @@ extern "C" int init_runtime_impl(Runtime *runtime, { runtime->pto2_task_window_size = parse_env_uint64("PTO2_RING_TASK_WINDOW", 4, true); runtime->pto2_heap_size = parse_env_uint64("PTO2_RING_HEAP", 1024, true); - if (runtime->pto2_task_window_size || runtime->pto2_heap_size) { - LOG_INFO("Ring buffer overrides: task_window=%lu heap=%lu", + runtime->pto2_consumed_window_size = parse_env_uint64("PTO2_RING_CONSUMED_WINDOW", 4, true); + if (runtime->pto2_task_window_size || runtime->pto2_heap_size || runtime->pto2_consumed_window_size) { + LOG_INFO("Ring buffer overrides: task_window=%lu heap=%lu consumed_window=%lu", (unsigned long)(runtime->pto2_task_window_size ? runtime->pto2_task_window_size : PTO2_TASK_WINDOW_SIZE), - (unsigned long)(runtime->pto2_heap_size ? runtime->pto2_heap_size : PTO2_HEAP_SIZE)); + (unsigned long)(runtime->pto2_heap_size ? runtime->pto2_heap_size : PTO2_HEAP_SIZE), + (unsigned long)(runtime->pto2_consumed_window_size ? runtime->pto2_consumed_window_size : 0)); } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index fa240381..acee8df0 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -109,7 +109,7 @@ bool pto2_orchestrator_init( pto2_task_ring_init(&orch->task_ring, sm_handle->task_descriptors, sm_handle->header->task_window_size, - &sm_handle->header->last_task_alive, + &sm_handle->header->last_task_released, &sm_handle->header->current_task_index); // Allocate and initialize dependency list pool (per-orchestrator, no shared memory) @@ -122,7 +122,7 @@ bool pto2_orchestrator_init( orch->dep_pool_last_reclaimed = 0; // Initialize TensorMap - if (!orch->tensor_map.init_default(sm_handle->header->task_window_size)) { + if (!orch->tensor_map.init_default(sm_handle->header->consumed_window_size)) { free(dep_entries); return false; } @@ -245,17 +245,17 @@ void pto2_submit_mixed_task( // === STEP 0: Sync TensorMap validity and optional cleanup === orch->tensor_map.sync_tensormap(); - // Reclaim dead dep pool entries based on scheduler's last_task_alive + // Reclaim dead dep pool entries based on scheduler's last_task_released { - int32_t last_alive = orch->sm_handle->header->last_task_alive.load(std::memory_order_acquire); - if (last_alive > orch->dep_pool_last_reclaimed && last_alive > 0) { - int32_t newest_consumed = last_alive - 1; - int32_t slot_rc = orch->task_ring.get_task_slot(newest_consumed); - int32_t mark = orch->sm_handle->task_payloads[slot_rc].dep_pool_mark; + int32_t last_released = orch->sm_handle->header->last_task_released.load(std::memory_order_acquire); + if (last_released > orch->dep_pool_last_reclaimed && last_released > 0) { + int32_t newest_released = last_released - 1; + PTO2ConsumedRingEntry& cr_entry = orch->scheduler->get_consumed_entry(newest_released); + int32_t mark = cr_entry.dep_pool_mark; if (mark > 0) { orch->dep_pool.advance_tail(mark); } - orch->dep_pool_last_reclaimed = last_alive; + orch->dep_pool_last_reclaimed = last_released; } } @@ -265,32 +265,35 @@ void pto2_submit_mixed_task( always_assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); // === Scope deadlock pre-check === - // Tasks within a scope hold a fanout_count reference released only at scope_end. - // If scope task count >= window_size, no slots can ever be reclaimed → deadlock. + // Tasks within a scope hold a fanout_count reference (in consumed ring) + // released only at scope_end. The consumed ring cannot reclaim these entries + // until CONSUMED, so scope_task_count >= consumed_window_size → deadlock. + // (Main ring slots CAN be reclaimed at RELEASED within a scope.) { int32_t scope_task_count = orch->scope_tasks_size - orch->scope_begins[orch->scope_stack_top]; - if (scope_task_count >= orch->task_ring.window_size - 1) { + int64_t consumed_window = static_cast(orch->sm_handle->header->consumed_window_size); + if (scope_task_count >= consumed_window - 1) { int32_t total_submitted = orch->task_ring.current_index_ptr->load(std::memory_order_acquire); - int32_t last_alive = orch->task_ring.last_alive_ptr->load(std::memory_order_acquire); - int32_t active_count = total_submitted - last_alive; + int32_t last_released = orch->task_ring.last_released_ptr->load(std::memory_order_acquire); + int32_t active_count = total_submitted - last_released; LOG_ERROR("========================================"); LOG_ERROR("FATAL: Scope Deadlock Detected!"); LOG_ERROR("========================================"); - LOG_ERROR("Tasks in current scope (%d) >= task_window_size (%d).", - scope_task_count, orch->task_ring.window_size); + LOG_ERROR("Tasks in current scope (%d) >= consumed_window_size (%ld).", + scope_task_count, (long)consumed_window); LOG_ERROR(" scope_depth: %d", orch->scope_stack_top + 1); LOG_ERROR(" scope_task_count: %d", scope_task_count); LOG_ERROR(" total_submitted: %d", total_submitted); - LOG_ERROR(" last_task_alive: %d", last_alive); + LOG_ERROR(" last_task_released: %d", last_released); LOG_ERROR(" active_tasks: %d / %d", active_count, orch->task_ring.window_size); LOG_ERROR("Root Cause:"); - LOG_ERROR(" Tasks within a scope hold a fanout_count reference that is only"); - LOG_ERROR(" released at scope_end. When scope task count >= window_size,"); - LOG_ERROR(" no slots can be reclaimed -> deadlock."); + LOG_ERROR(" Consumed ring entries hold fanout_count references released only at"); + LOG_ERROR(" scope_end. When scope task count >= consumed_window_size,"); + LOG_ERROR(" no consumed ring entries can be reclaimed -> deadlock."); LOG_ERROR("Solution:"); LOG_ERROR(" 1. Reduce tasks per scope (use batching/unroll)"); - LOG_ERROR(" 2. Increase PTO2_TASK_WINDOW_SIZE (current: %d)", orch->task_ring.window_size); + LOG_ERROR(" 2. Increase consumed_window_size (current: %ld)", (long)consumed_window); LOG_ERROR(" 3. Split work across multiple scopes"); LOG_ERROR("========================================"); exit(1); @@ -313,19 +316,22 @@ void pto2_submit_mixed_task( task.active_mask = active_mask; task.subtask_done_mask.store(0, std::memory_order_relaxed); task.packed_buffer_base = NULL; - task.packed_buffer_end = NULL; - // Initialize slot state (scheduler-private) + // Initialize scheduler-private state (consumed ring + main slot) PTO2SchedulerState* sched = orch->scheduler; if (sched) { - PTO2TaskSlotState& slot_state = sched->slot_states[slot]; - slot_state.fanin_count = 0; - slot_state.fanout_head = nullptr; - slot_state.fanout_lock.store(0, std::memory_order_relaxed); + PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id); + cr_entry.fanout_head = nullptr; + cr_entry.fanout_lock.store(0, std::memory_order_relaxed); // Initial fanout_count = 1 (the owning scope holds one reference) - slot_state.fanout_count = 1; - slot_state.fanout_refcount.store(0, std::memory_order_release); - slot_state.fanin_refcount.store(0, std::memory_order_release); + cr_entry.fanout_count = 1; + cr_entry.fanout_refcount.store(0, std::memory_order_release); + cr_entry.packed_buffer_end = nullptr; + cr_entry.dep_pool_mark = 0; + + PTO2MainSlotState& main_slot = sched->get_main_slot(task_id); + main_slot.fanin_count = 0; + main_slot.fanin_refcount.store(0, std::memory_order_release); } // Register this task in its owning scope @@ -366,7 +372,8 @@ void pto2_submit_mixed_task( if (total_output_size > 0) { task.packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size); - task.packed_buffer_end = (char*)task.packed_buffer_base + total_output_size; + PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id); + cr_entry.packed_buffer_end = (char*)task.packed_buffer_base + total_output_size; } CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP, task_id); #if PTO2_ORCH_PROFILING @@ -452,11 +459,12 @@ void pto2_submit_mixed_task( // === STEP 5: Finalize fanin list === // First build the fanin list if (sched) { - PTO2TaskSlotState& cur_slot_state = sched->slot_states[slot]; + PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id); + PTO2MainSlotState& cur_main_slot = sched->get_main_slot(task_id); // Initialize scheduler state BEFORE adding to producer fanout lists, // so concurrent on_mixed_task_complete can safely access task_state/fanout_refcount. - cur_slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); - cur_slot_state.fanout_refcount.store(0, std::memory_order_relaxed); + cr_entry.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); + cr_entry.fanout_refcount.store(0, std::memory_order_relaxed); auto& dep_pool = orch->dep_pool; if (orch->dep_pool_cur_entry == nullptr) { @@ -464,7 +472,7 @@ void pto2_submit_mixed_task( } int32_t early_finished = 0; - cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early + cur_main_slot.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early payload->fanin_actual_count = fanin_count; for (int i = 0; i < fanin_count; i++) { payload->fanin_tasks[i] = fanin_temp[i]; @@ -472,34 +480,33 @@ void pto2_submit_mixed_task( for (int i = 0; i < fanin_count; i++) { int32_t producer_task_id = fanin_temp[i]; // Add this task to producer's fanout list (with spinlock) - int32_t prod_slot = task_ring.get_task_slot(producer_task_id); - PTO2TaskSlotState& producer_slot_state = sched->slot_states[prod_slot]; + PTO2ConsumedRingEntry& producer_cr = sched->get_consumed_entry(producer_task_id); orch->dep_pool_cur_entry->task_id = task_id; - orch->dep_pool_cur_entry->next = producer_slot_state.fanout_head; + orch->dep_pool_cur_entry->next = producer_cr.fanout_head; #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); + pto2_fanout_lock(producer_cr, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); #else - pto2_fanout_lock(producer_slot_state); + pto2_fanout_lock(producer_cr); #endif // Normal path: prepend consumer to producer's fanout list - producer_slot_state.fanout_count += 1; - int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire); + producer_cr.fanout_count += 1; + int32_t prod_state = producer_cr.task_state.load(std::memory_order_acquire); if (prod_state >= PTO2_TASK_COMPLETED) { // Early return optimization: if producer already completed, we can skip adding dependency and directly // decrement fanin_count early_finished++; } else { - producer_slot_state.fanout_head = orch->dep_pool_cur_entry; + producer_cr.fanout_head = orch->dep_pool_cur_entry; } - pto2_fanout_unlock(producer_slot_state); - if (producer_slot_state.fanout_head == orch->dep_pool_cur_entry) { + pto2_fanout_unlock(producer_cr); + if (producer_cr.fanout_head == orch->dep_pool_cur_entry) { orch->dep_pool_cur_entry = &dep_pool.alloc(); } } // Combined release: merge early_finished batch + init_task's +1 release // into a single atomic fetch_add (saves one acq_rel cache-line bounce per task). int32_t initial_refcount = early_finished + 1; // +1 for the init release - int32_t new_rc = cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + int32_t new_rc = cur_main_slot.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + initial_refcount; if (new_rc >= fanin_count + 1) { PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask); @@ -516,7 +523,10 @@ void pto2_submit_mixed_task( } // Record dep pool watermark for this task (used by tail reclamation) - payload->dep_pool_mark = orch->dep_pool.top; + if (sched) { + PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id); + cr_entry.dep_pool_mark = orch->dep_pool.top; + } CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN, task_id); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index e1373423..3714e233 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -44,7 +44,7 @@ struct PTO2OrchestratorState { PTO2TaskRing task_ring; // Task slot allocation PTO2DepListPool dep_pool; // Dependency list storage (per-orchestrator, no atomics needed) PTO2DepListEntry* dep_pool_cur_entry; - int32_t dep_pool_last_reclaimed; // last_task_alive value at last reclamation + int32_t dep_pool_last_reclaimed; // last_task_released value at last reclamation // === TENSOR MAP (Private) === PTO2TensorMap tensor_map; // Producer lookup diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp index 65607e5f..ebedae23 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp @@ -31,12 +31,12 @@ void pto2_heap_ring_init(PTO2HeapRing* ring, void* base, uint64_t size, // ============================================================================= void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors, - int32_t window_size, std::atomic* last_alive_ptr, + int32_t window_size, std::atomic* last_released_ptr, std::atomic* current_index_ptr) { ring->descriptors = descriptors; ring->window_size = window_size; ring->current_index_ptr = current_index_ptr; - ring->last_alive_ptr = last_alive_ptr; + ring->last_released_ptr = last_released_ptr; } // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index 3cd6b6c2..4a70303d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -12,7 +12,7 @@ * 2. TaskRing - Task slot allocation * - Fixed window size (TASK_WINDOW_SIZE) * - Wrap-around modulo window size - * - Implicit reclamation via last_task_alive advancement + * - Implicit reclamation via last_task_released advancement * - Back-pressure: stalls when window is full * * 3. DepListPool - Dependency list entry allocation @@ -241,8 +241,8 @@ struct PTO2TaskRing { int32_t window_size; // Window size (power of 2) std::atomic* current_index_ptr; // Shared atomic in SM header - // Reference to shared memory last_task_alive (for back-pressure) - std::atomic* last_alive_ptr; // Points to header->last_task_alive + // Reference to shared memory last_task_released (for back-pressure) + std::atomic* last_released_ptr; // Points to header->last_task_released /** * Allocate a task slot from task ring @@ -293,12 +293,12 @@ struct PTO2TaskRing { #if PTO2_SPIN_VERBOSE_LOGGING // Periodic block notification if (spin_count % PTO2_BLOCK_NOTIFY_INTERVAL == 0 && spin_count < PTO2_FLOW_CONTROL_SPIN_LIMIT) { - int32_t last_alive = last_alive_ptr->load(std::memory_order_acquire); + int32_t last_released = last_released_ptr->load(std::memory_order_acquire); int32_t current = current_index_ptr->load(std::memory_order_acquire); - int32_t active_count = current - last_alive; - LOG_WARN("[TaskRing] BLOCKED (Flow Control): current=%d, last_alive=%d, " + int32_t active_count = current - last_released; + LOG_WARN("[TaskRing] BLOCKED (Flow Control): current=%d, last_released=%d, " "active=%d/%d (%.1f%%), spins=%d", - current, last_alive, active_count, window_size, + current, last_released, active_count, window_size, 100.0 * active_count / window_size, spin_count); notified = true; } @@ -306,9 +306,9 @@ struct PTO2TaskRing { // Check for potential deadlock if (spin_count >= PTO2_FLOW_CONTROL_SPIN_LIMIT) { - int32_t last_alive = last_alive_ptr->load(std::memory_order_acquire); + int32_t last_released = last_released_ptr->load(std::memory_order_acquire); int32_t current = current_index_ptr->load(std::memory_order_acquire); - int32_t active_count = current - last_alive; + int32_t active_count = current - last_released; LOG_ERROR("========================================"); LOG_ERROR("FATAL: Flow Control Deadlock Detected!"); @@ -316,15 +316,16 @@ struct PTO2TaskRing { LOG_ERROR("Task Ring is FULL and no progress after %d spins.", spin_count); LOG_ERROR("Flow Control Status:"); LOG_ERROR(" - Current task index: %d", current); - LOG_ERROR(" - Last task alive: %d", last_alive); + LOG_ERROR(" - Last task released: %d", last_released); LOG_ERROR(" - Active tasks: %d", active_count); LOG_ERROR(" - Window size: %d", window_size); LOG_ERROR(" - Window utilization: %.1f%%", 100.0 * active_count / window_size); LOG_ERROR("Root Cause:"); - LOG_ERROR(" Tasks cannot transition to CONSUMED state because:"); - LOG_ERROR(" - fanout_count includes 1 for the owning scope"); - LOG_ERROR(" - scope_end() requires orchestrator to continue"); - LOG_ERROR(" - But orchestrator is blocked waiting for task ring space"); + LOG_ERROR(" Main ring is full: no tasks are reaching RELEASED state."); + LOG_ERROR(" Possible reasons:"); + LOG_ERROR(" - Scheduler threads are not making progress"); + LOG_ERROR(" - Tasks are blocked waiting for dependencies"); + LOG_ERROR(" - Deferred release backlog is not being flushed"); LOG_ERROR(" This creates a circular dependency (deadlock)."); LOG_ERROR("Solution:"); LOG_ERROR(" Current task_window_size: %d", window_size); @@ -351,8 +352,8 @@ struct PTO2TaskRing { int32_t pto2_task_ring_try_alloc() { // Optimistically allocate a task ID int32_t task_id = current_index_ptr->fetch_add(1, std::memory_order_acq_rel); - int32_t last_alive = last_alive_ptr->load(std::memory_order_acquire); - int32_t active_count = task_id - last_alive; + int32_t last_released = last_released_ptr->load(std::memory_order_acquire); + int32_t active_count = task_id - last_released; // Check if there's room (leave at least 1 slot empty) if (active_count < window_size - 1) { @@ -386,18 +387,18 @@ struct PTO2TaskRing { * @param ring Task ring to initialize * @param descriptors Task descriptor array from shared memory * @param window_size Window size (must be power of 2) - * @param last_alive_ptr Pointer to shared memory last_task_alive + * @param last_released_ptr Pointer to shared memory last_task_released */ void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors, - int32_t window_size, std::atomic* last_alive_ptr, + int32_t window_size, std::atomic* last_released_ptr, std::atomic* current_index_ptr); /** * Get number of active tasks in window */ static inline int32_t pto2_task_ring_active_count(PTO2TaskRing* ring) { - int32_t last_alive = ring->last_alive_ptr->load(std::memory_order_acquire); - return ring->current_index_ptr->load(std::memory_order_acquire) - last_alive; + int32_t last_released = ring->last_released_ptr->load(std::memory_order_acquire); + return ring->current_index_ptr->load(std::memory_order_acquire) - last_released; } /** @@ -463,7 +464,7 @@ struct PTO2DepListPool { /** * Advance the tail pointer, reclaiming dead entries. - * Called by the orchestrator based on last_task_alive advancement. + * Called by the orchestrator based on last_task_released advancement. */ void advance_tail(int32_t new_tail) { if (new_tail > tail) { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index 8ebb0033..00d763ce 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -84,6 +84,10 @@ PTO2Runtime* pto2_runtime_create_custom(PTO2RuntimeMode mode, return NULL; } + // Set consumed_window_size (default = 4 × task_window_size) + uint64_t consumed_window_size = task_window_size * 4; + rt->sm_handle->header->consumed_window_size = consumed_window_size; + // Allocate GM heap for output buffers rt->gm_heap_size = heap_size; #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L @@ -112,7 +116,7 @@ PTO2Runtime* pto2_runtime_create_custom(PTO2RuntimeMode mode, } // Initialize scheduler - if (!pto2_scheduler_init(&rt->scheduler, rt->sm_handle, rt->gm_heap)) { + if (!pto2_scheduler_init(&rt->scheduler, rt->sm_handle, rt->gm_heap, consumed_window_size)) { pto2_orchestrator_destroy(&rt->orchestrators[0]); free(rt->gm_heap); pto2_sm_destroy(rt->sm_handle); @@ -158,8 +162,13 @@ PTO2Runtime* pto2_runtime_create_from_sm(PTO2RuntimeMode mode, } } - // Initialize scheduler - if (!pto2_scheduler_init(&rt->scheduler, rt->sm_handle, rt->gm_heap)) { + // Initialize scheduler (consumed_window_size must be set in header by host) + uint64_t cws = sm_handle->header->consumed_window_size; + if (cws == 0) { + cws = sm_handle->header->task_window_size * 4; + sm_handle->header->consumed_window_size = cws; + } + if (!pto2_scheduler_init(&rt->scheduler, rt->sm_handle, rt->gm_heap, cws)) { for (int i = 0; i < orch_count; i++) { pto2_orchestrator_destroy(&rt->orchestrators[i]); } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index 5cfa71f5..00ce1189 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -115,18 +115,23 @@ typedef enum { * Task state enumeration * * State transitions: - * PENDING -> READY -> RUNNING -> COMPLETED -> CONSUMED + * PENDING -> READY -> RUNNING -> COMPLETED -> RELEASED -> CONSUMED * * Conditions: - * PENDING->READY: fanin_refcount == fanin_count - * COMPLETED->CONSUMED: fanout_refcount == fanout_count && state == COMPLETED + * PENDING->READY: fanin_refcount == fanin_count + * COMPLETED->RELEASED: on_task_release() finishes reading payload + * RELEASED->CONSUMED: fanout_refcount == fanout_count && state == RELEASED + * + * RELEASED unlocks main ring slot reclamation (descriptor + payload). + * CONSUMED unlocks consumed ring slot + heap reclamation. */ typedef enum { PTO2_TASK_PENDING = 0, // Waiting for dependencies (fanin_refcount < fanin_count) PTO2_TASK_READY = 1, // All dependencies satisfied, waiting in ready queue PTO2_TASK_RUNNING = 2, // Currently executing on a worker PTO2_TASK_COMPLETED = 3, // Execution finished, output may still be in use - PTO2_TASK_CONSUMED = 4 // Output fully consumed, buffers can be released + PTO2_TASK_RELEASED = 4, // Payload read complete, main ring slot can be reclaimed + PTO2_TASK_CONSUMED = 5 // Output fully consumed, consumed ring + heap can be released } PTO2TaskState; // ============================================================================= @@ -295,41 +300,54 @@ struct PTO2TaskDescriptor { // Packed output buffer (all outputs packed into single contiguous buffer) void* packed_buffer_base; // Start of packed buffer in GM Heap - void* packed_buffer_end; // End of packed buffer (for heap reclamation) + // NOTE: packed_buffer_end moved to PTO2ConsumedRingEntry for lifecycle separation }; // ============================================================================= -// Per-Slot Scheduling State +// Consumed Ring Entry (long-lived scheduling state, AICPU-private) // ============================================================================= /** - * Per-task slot scheduling state (scheduler-private, NOT in shared memory) + * Per-task consumed ring entry (scheduler-private, NOT in shared memory) * - * Consolidates all hot-path scheduling fields into a single cache-friendly - * structure (32 bytes = half a cache line). Accessing any field of a task's - * slot state brings all related fields into the same cache line. + * Contains fields that must survive until CONSUMED state: + * - task_state, fanout tracking, packed_buffer_end, dep_pool_mark * - * Concurrency notes: - * - fanout_head, fanout_count protected by fanout_lock (per-task spinlock) - * - fanin_count set once at submission, read-only after (hot path for ready check) - * - task_state, fanin_refcount, fanout_refcount updated atomically + * Indexed by task_id & consumed_window_mask (4× main ring capacity). + * Cache-line aligned for atomic access efficiency. */ -struct alignas(64) PTO2TaskSlotState { - // Fanout lock + list (accessed together under lock in on_task_complete) - std::atomic fanout_lock; // Per-task spinlock (0=unlocked, 1=locked) +struct alignas(64) PTO2ConsumedRingEntry { + // Task state (completion, consumed check) + std::atomic task_state; // PENDING→...→CONSUMED + + // Fanout tracking (accessed in check_and_handle_consumed / release_producer) + std::atomic fanout_refcount; // Dynamic: counts released references int32_t fanout_count; // 1 (owning scope) + number of consumers + // Fanout lock + list (accessed together under lock in on_task_complete) + std::atomic fanout_lock; // Per-task spinlock (0=unlocked, 1=locked) PTO2DepListEntry* fanout_head; // Pointer to first fanout entry (nullptr = empty) - // Task state (completion, consumed check, ready check) - std::atomic task_state; // PENDING/READY/RUNNING/COMPLETED/CONSUMED + // Heap reclamation pointer (drives heap_tail via consumed watermark) + void* packed_buffer_end; // End of packed buffer in GM Heap + + // Dep pool reclamation watermark + int32_t dep_pool_mark; // Dep pool top after this task's submission +}; - // Fanin (accessed together in release_fanin_and_check_ready) +// ============================================================================= +// Main Slot State (short-lived fanin tracking, AICPU-private) +// ============================================================================= + +/** + * Per-task main slot state (scheduler-private, NOT in shared memory) + * + * Contains only fanin tracking fields, freed at RELEASED state. + * Indexed by task_id & task_window_mask (same as main ring). + */ +struct alignas(64) PTO2MainSlotState { std::atomic fanin_refcount; // Dynamic: counts completed producers int32_t fanin_count; // Number of producer dependencies (set once) - - // Fanout refcount (accessed with fanout_count in check_and_handle_consumed) - std::atomic fanout_refcount; // Dynamic: counts released references }; /** @@ -346,7 +364,7 @@ struct PTO2TaskPayload { int param_count{0}; int32_t fanin_tasks[PTO2_MAX_INPUTS]; // Producer task IDs (cold path, used by on_task_release) int32_t fanin_actual_count{0}; // Actual fanin count (without the +1 redundance) - int32_t dep_pool_mark{0}; // Dep pool top after this task's submission (for reclamation) + // NOTE: dep_pool_mark moved to PTO2ConsumedRingEntry for lifecycle separation }; // ============================================================================= @@ -412,20 +430,20 @@ typedef void (*PTO2InCoreFunc)(void** args, int32_t num_args); #endif #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING -static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state, +static inline void pto2_fanout_lock(PTO2ConsumedRingEntry& entry, uint64_t& atomic_count, uint64_t& wait_cycle) { uint64_t t0 = get_sys_cnt_aicpu(); bool contended = false; uint32_t atomic_ops = 0; for (;;) { - while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) { + while (entry.fanout_lock.load(std::memory_order_acquire) != 0) { contended = true; atomic_ops++; // each load = 1 atomic SPIN_WAIT_HINT(); } int32_t expected = 0; - if (slot_state.fanout_lock.compare_exchange_weak(expected, 1, + if (entry.fanout_lock.compare_exchange_weak(expected, 1, std::memory_order_acquire, std::memory_order_relaxed)) { atomic_ops++; // successful CAS = 1 atomic atomic_count += atomic_ops; @@ -440,21 +458,21 @@ static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state, } #endif -static inline void pto2_fanout_lock(PTO2TaskSlotState& slot_state) { +static inline void pto2_fanout_lock(PTO2ConsumedRingEntry& entry) { for (;;) { - while (slot_state.fanout_lock.load(std::memory_order_acquire) != 0) { + while (entry.fanout_lock.load(std::memory_order_acquire) != 0) { SPIN_WAIT_HINT(); } int32_t expected = 0; - if (slot_state.fanout_lock.compare_exchange_weak(expected, 1, + if (entry.fanout_lock.compare_exchange_weak(expected, 1, std::memory_order_acquire, std::memory_order_relaxed)) { return; } } } -static inline void pto2_fanout_unlock(PTO2TaskSlotState& slot_state) { - slot_state.fanout_lock.store(0, std::memory_order_release); +static inline void pto2_fanout_unlock(PTO2ConsumedRingEntry& entry) { + entry.fanout_lock.store(0, std::memory_order_release); } #endif // PTO_RUNTIME2_TYPES_H diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 8b6081eb..399bab41 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -63,6 +63,7 @@ const char* pto2_task_state_name(PTO2TaskState state) { case PTO2_TASK_READY: return "READY"; case PTO2_TASK_RUNNING: return "RUNNING"; case PTO2_TASK_COMPLETED: return "COMPLETED"; + case PTO2_TASK_RELEASED: return "RELEASED"; case PTO2_TASK_CONSUMED: return "CONSUMED"; default: return "UNKNOWN"; } @@ -104,45 +105,64 @@ void pto2_ready_queue_destroy(PTO2ReadyQueue* queue) { bool pto2_scheduler_init(PTO2SchedulerState* sched, PTO2SharedMemoryHandle* sm_handle, - void* heap_base) { + void* heap_base, + uint64_t consumed_window_size) { sched->sm_handle = sm_handle; sched->task_descriptors = sm_handle->task_descriptors; sched->heap_base = heap_base; - sched->slot_states = nullptr; + sched->consumed_ring = nullptr; + sched->main_slot_states = nullptr; #if PTO2_SCHED_PROFILING sched->tasks_completed.store(0, std::memory_order_relaxed); sched->tasks_consumed.store(0, std::memory_order_relaxed); #endif sched->ring_advance_lock.store(0, std::memory_order_relaxed); + sched->main_ring_advance_lock.store(0, std::memory_order_relaxed); // Get runtime task_window_size from shared memory header uint64_t window_size = sm_handle->header->task_window_size; sched->task_window_size = window_size; sched->task_window_mask = window_size - 1; // For fast modulo (window_size must be power of 2) + // Consumed ring configuration + sched->consumed_window_size = consumed_window_size; + sched->consumed_window_mask = consumed_window_size - 1; + // Initialize local copies of ring pointers - sched->last_task_alive = 0; + sched->last_task_consumed = 0; + sched->last_task_released = 0; sched->last_heap_consumed = 0; sched->heap_tail = 0; - // Allocate per-task slot state array (dynamically sized based on runtime window_size) - sched->slot_states = new (std::nothrow) PTO2TaskSlotState[window_size]; - if (!sched->slot_states) { + // Allocate consumed ring (long-lived state: task_state, fanout, heap ptr) + sched->consumed_ring = new (std::nothrow) PTO2ConsumedRingEntry[consumed_window_size]; + if (!sched->consumed_ring) { + return false; + } + + // Allocate main slot states (short-lived fanin tracking) + sched->main_slot_states = new (std::nothrow) PTO2MainSlotState[window_size]; + if (!sched->main_slot_states) { + delete[] sched->consumed_ring; + sched->consumed_ring = nullptr; return false; } - // Zero-initialize all per-task slot state fields. - // new[] default-initializes std::atomic which leaves values indeterminate. - // Scheduler logic (e.g. fanin_refcount fetch_add in release_fanin_and_check_ready) - // assumes slots start at zero before init_task writes them. + // Zero-initialize consumed ring entries + for (uint64_t i = 0; i < consumed_window_size; i++) { + sched->consumed_ring[i].task_state.store(static_cast(0), std::memory_order_relaxed); + sched->consumed_ring[i].fanout_refcount.store(0, std::memory_order_relaxed); + sched->consumed_ring[i].fanout_count = 0; + sched->consumed_ring[i].fanout_lock.store(0, std::memory_order_relaxed); + sched->consumed_ring[i].fanout_head = nullptr; + sched->consumed_ring[i].packed_buffer_end = nullptr; + sched->consumed_ring[i].dep_pool_mark = 0; + } + + // Zero-initialize main slot states for (uint64_t i = 0; i < window_size; i++) { - sched->slot_states[i].fanout_lock.store(0, std::memory_order_relaxed); - sched->slot_states[i].fanout_count = 0; - sched->slot_states[i].fanout_head = nullptr; - sched->slot_states[i].task_state.store(static_cast(0), std::memory_order_relaxed); - sched->slot_states[i].fanin_refcount.store(0, std::memory_order_relaxed); - sched->slot_states[i].fanin_count = 0; - sched->slot_states[i].fanout_refcount.store(0, std::memory_order_relaxed); + sched->main_slot_states[i].fanin_refcount.store(0, std::memory_order_relaxed); + sched->main_slot_states[i].fanin_count = 0; } // Initialize ready queues (one per resource shape) @@ -152,8 +172,10 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, for (int j = 0; j < i; j++) { pto2_ready_queue_destroy(&sched->ready_queues[j]); } - delete[] sched->slot_states; - sched->slot_states = nullptr; + delete[] sched->main_slot_states; + sched->main_slot_states = nullptr; + delete[] sched->consumed_ring; + sched->consumed_ring = nullptr; return false; } } @@ -162,9 +184,14 @@ bool pto2_scheduler_init(PTO2SchedulerState* sched, } void pto2_scheduler_destroy(PTO2SchedulerState* sched) { - if (sched->slot_states) { - delete[] sched->slot_states; - sched->slot_states = nullptr; + if (sched->consumed_ring) { + delete[] sched->consumed_ring; + sched->consumed_ring = nullptr; + } + + if (sched->main_slot_states) { + delete[] sched->main_slot_states; + sched->main_slot_states = nullptr; } for (int i = 0; i < PTO2_NUM_RESOURCE_SHAPES; i++) { @@ -178,7 +205,8 @@ void pto2_scheduler_destroy(PTO2SchedulerState* sched) { void pto2_scheduler_print_stats(PTO2SchedulerState* sched) { LOG_INFO("=== Scheduler Statistics ==="); - LOG_INFO("last_task_alive: %d", sched->last_task_alive); + LOG_INFO("last_task_consumed: %d", sched->last_task_consumed); + LOG_INFO("last_task_released: %d", sched->last_task_released); LOG_INFO("heap_tail: %" PRIu64, sched->heap_tail); #if PTO2_SCHED_PROFILING LOG_INFO("tasks_completed: %lld", (long long)sched->tasks_completed.load(std::memory_order_relaxed)); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index 61a5c6a3..d5c1a460 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -3,9 +3,9 @@ * * The Scheduler is responsible for: * 1. Maintaining per-resource-shape ready queues - * 2. Tracking task state (PENDING -> READY -> RUNNING -> COMPLETED -> CONSUMED) + * 2. Tracking task state (PENDING -> READY -> RUNNING -> COMPLETED -> RELEASED -> CONSUMED) * 3. Managing fanin/fanout refcounts for dependency resolution - * 4. Advancing last_task_alive for heap reclamation + * 4. Dual waterline advancement (last_task_released for main ring, last_task_consumed for heap) * 5. Two-stage mixed-task completion (subtask done bits → mixed-task complete) * * The Scheduler runs on Device AI_CPU and processes: @@ -274,7 +274,8 @@ struct PTO2CompletionStats { * Scheduler state structure * * Contains dynamic state updated during task execution. - * Separated from shared memory for cache efficiency. + * Uses dual ring buffers: main ring (short-lived fanin state) and + * consumed ring (long-lived fanout/task_state). * Hot-path methods are defined inline (implicitly inline as member functions). */ struct PTO2SchedulerState { @@ -283,7 +284,8 @@ struct PTO2SchedulerState { PTO2TaskDescriptor* task_descriptors; // Local copies of ring pointers (written to shared memory after update) - int32_t last_task_alive; // Task ring tail (advances on COMPLETED for slot reuse) + int32_t last_task_consumed; // Consumed ring tail (drives heap reclamation) + int32_t last_task_released; // Main ring tail (drives slot reclamation) int32_t last_heap_consumed; // Heap watermark (advances on CONSUMED for buffer reuse) uint64_t heap_tail; // Heap ring tail (offset from heap_base) @@ -291,15 +293,20 @@ struct PTO2SchedulerState { void* heap_base; // === DYNAMIC CONFIGURATION === - uint64_t task_window_size; // Task window size (power of 2) + uint64_t task_window_size; // Main ring capacity (power of 2) uint64_t task_window_mask; // task_window_size - 1 (for fast modulo) + uint64_t consumed_window_size; // Consumed ring capacity (default = 4 × task_window_size) + uint64_t consumed_window_mask; // consumed_window_size - 1 // === PRIVATE DATA (not in shared memory) === - // Per-task slot state (dynamically allocated, indexed by task_id & task_window_mask) - // Consolidates task_state, fanin/fanout refcounts, and dependency metadata - // into a single cache-friendly structure (32 bytes per slot). - PTO2TaskSlotState* slot_states; + // Consumed ring: long-lived per-task state (task_state, fanout tracking, heap ptr) + // Indexed by task_id & consumed_window_mask + PTO2ConsumedRingEntry* consumed_ring; + + // Main slot states: short-lived fanin tracking only + // Indexed by task_id & task_window_mask + PTO2MainSlotState* main_slot_states; // Ready queues (one per resource shape) PTO2ReadyQueue ready_queues[PTO2_NUM_RESOURCE_SHAPES]; @@ -309,7 +316,8 @@ struct PTO2SchedulerState { std::atomic tasks_completed; std::atomic tasks_consumed; #endif - std::atomic ring_advance_lock{0}; // Try-lock for advance_ring_pointers + std::atomic ring_advance_lock{0}; // Try-lock for advance_consumed_ring_pointers + std::atomic main_ring_advance_lock{0}; // Try-lock for advance_main_ring_pointers // ========================================================================= // Inline hot-path methods @@ -319,44 +327,74 @@ struct PTO2SchedulerState { return task_id & task_window_mask; } - PTO2TaskSlotState& get_slot_state_by_slot(int32_t slot) { return slot_states[slot]; } - PTO2TaskSlotState& get_slot_state_by_task_id(int32_t task_id) { return slot_states[task_id & task_window_mask]; } + PTO2ConsumedRingEntry& get_consumed_entry(int32_t task_id) { + return consumed_ring[task_id & consumed_window_mask]; + } + + PTO2MainSlotState& get_main_slot(int32_t task_id) { + return main_slot_states[task_id & task_window_mask]; + } void sync_to_sm() { PTO2SharedMemoryHeader* header = sm_handle->header; - header->last_task_alive.store(last_task_alive, std::memory_order_release); + header->last_task_consumed.store(last_task_consumed, std::memory_order_release); + header->last_task_released.store(last_task_released, std::memory_order_release); header->heap_tail.store(heap_tail, std::memory_order_release); - header->heap_tail_gen.store(last_task_alive, std::memory_order_release); + header->heap_tail_gen.store(last_task_consumed, std::memory_order_release); } - void advance_ring_pointers() { + /** + * Advance consumed ring watermark: scan CONSUMED tasks, update heap_tail. + * Drives heap reclamation. + */ + void advance_consumed_ring_pointers() { PTO2SharedMemoryHeader* header = sm_handle->header; int32_t current_task_index = header->current_task_index.load(std::memory_order_acquire); - while (last_task_alive < current_task_index) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(last_task_alive); - if (slot_state.task_state.load(std::memory_order_acquire) != PTO2_TASK_CONSUMED) { + while (last_task_consumed < current_task_index) { + PTO2ConsumedRingEntry& entry = get_consumed_entry(last_task_consumed); + if (entry.task_state.load(std::memory_order_acquire) != PTO2_TASK_CONSUMED) { break; } - last_task_alive++; + last_task_consumed++; } - if (last_task_alive > 0) { - int32_t last_consumed_id = last_task_alive - 1; - PTO2TaskDescriptor* last_consumed = pto2_sm_get_task(sm_handle, last_consumed_id); - if (last_consumed->packed_buffer_end != NULL) { - heap_tail = (uint64_t)((char*)last_consumed->packed_buffer_end - (char*)heap_base); + if (last_task_consumed > 0) { + PTO2ConsumedRingEntry& last_entry = get_consumed_entry(last_task_consumed - 1); + if (last_entry.packed_buffer_end != nullptr) { + heap_tail = (uint64_t)((char*)last_entry.packed_buffer_end - (char*)heap_base); } } sync_to_sm(); } - void check_and_handle_consumed(PTO2TaskSlotState& slot_state) { - if (slot_state.fanout_refcount.load(std::memory_order_acquire) != slot_state.fanout_count) return; + /** + * Advance main ring watermark: scan RELEASED tasks. + * Drives main ring slot (descriptor + payload) reclamation. + */ + void advance_main_ring_pointers() { + PTO2SharedMemoryHeader* header = sm_handle->header; + int32_t current_task_index = header->current_task_index.load(std::memory_order_acquire); - PTO2TaskState expected = PTO2_TASK_COMPLETED; - if (!slot_state.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, + while (last_task_released < current_task_index) { + PTO2ConsumedRingEntry& entry = get_consumed_entry(last_task_released); + if (entry.task_state.load(std::memory_order_acquire) < PTO2_TASK_RELEASED) { + break; + } + last_task_released++; + } + + header->last_task_released.store(last_task_released, std::memory_order_release); + } + + void check_and_handle_consumed(PTO2ConsumedRingEntry& entry) { + if (entry.fanout_refcount.load(std::memory_order_acquire) != entry.fanout_count) return; + + // CAS from RELEASED→CONSUMED (not COMPLETED→CONSUMED) + // If task is still COMPLETED (not yet RELEASED), CAS fails safely. + PTO2TaskState expected = PTO2_TASK_RELEASED; + if (!entry.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, std::memory_order_acq_rel, std::memory_order_acquire)) { return; } @@ -369,22 +407,22 @@ struct PTO2SchedulerState { int32_t expected_lock = 0; if (ring_advance_lock.compare_exchange_strong(expected_lock, 1, std::memory_order_acquire, std::memory_order_relaxed)) { - advance_ring_pointers(); + advance_consumed_ring_pointers(); ring_advance_lock.store(0, std::memory_order_release); } } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING - void check_and_handle_consumed(PTO2TaskSlotState& slot_state, uint64_t& atomic_count) { - int32_t fc = slot_state.fanout_count; - int32_t rc = slot_state.fanout_refcount.load(std::memory_order_acquire); + void check_and_handle_consumed(PTO2ConsumedRingEntry& entry, uint64_t& atomic_count) { + int32_t fc = entry.fanout_count; + int32_t rc = entry.fanout_refcount.load(std::memory_order_acquire); atomic_count += 2; // fanout_count.load + fanout_refcount.load if (rc != fc) return; - PTO2TaskState expected = PTO2_TASK_COMPLETED; - if (!slot_state.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, + PTO2TaskState expected = PTO2_TASK_RELEASED; + if (!entry.task_state.compare_exchange_strong(expected, PTO2_TASK_CONSUMED, std::memory_order_acq_rel, std::memory_order_acquire)) { atomic_count += 1; // failed CAS return; @@ -400,7 +438,7 @@ struct PTO2SchedulerState { int32_t expected_lock = 0; if (ring_advance_lock.compare_exchange_strong(expected_lock, 1, std::memory_order_acquire, std::memory_order_relaxed)) { - advance_ring_pointers(); + advance_consumed_ring_pointers(); ring_advance_lock.store(0, std::memory_order_release); atomic_count += 2; // try-lock CAS + unlock store } else { @@ -410,31 +448,31 @@ struct PTO2SchedulerState { #endif void release_producer(int32_t producer_id) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(producer_id); - slot_state.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); - check_and_handle_consumed(slot_state); + PTO2ConsumedRingEntry& entry = get_consumed_entry(producer_id); + entry.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); + check_and_handle_consumed(entry); } #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING void release_producer(int32_t producer_id, uint64_t& atomic_count) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(producer_id); - slot_state.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); + PTO2ConsumedRingEntry& entry = get_consumed_entry(producer_id); + entry.fanout_refcount.fetch_add(1, std::memory_order_acq_rel); atomic_count += 1; // fanout_refcount.fetch_add - check_and_handle_consumed(slot_state, atomic_count); + check_and_handle_consumed(entry, atomic_count); } #endif bool release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task, PTO2LocalReadyBuffer* local_bufs = nullptr) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); + PTO2MainSlotState& main_slot = get_main_slot(task_id); // Atomically increment fanin_refcount and check if all producers are done // ACQ_REL on fanin_refcount already synchronizes with the orchestrator's // release in init_task, making fanin_count visible — plain load suffices. - int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; + int32_t new_refcount = main_slot.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; - if (new_refcount == slot_state.fanin_count) { + if (new_refcount == main_slot.fanin_count) { // Local-first: try per-CoreType thread-local buffer before global queue // Route by active_mask: AIC-containing tasks → buf[0], AIV-only → buf[1] PTO2ResourceShape shape = pto2_active_mask_to_shape(task->active_mask); @@ -455,14 +493,15 @@ struct PTO2SchedulerState { bool release_fanin_and_check_ready(int32_t task_id, PTO2TaskDescriptor* task, uint64_t& atomic_count, uint64_t& push_wait, PTO2LocalReadyBuffer* local_bufs = nullptr) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); + PTO2MainSlotState& main_slot = get_main_slot(task_id); - int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; + int32_t new_refcount = main_slot.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; atomic_count += 1; // fanin_refcount.fetch_add - if (new_refcount == slot_state.fanin_count) { + if (new_refcount == main_slot.fanin_count) { + PTO2ConsumedRingEntry& entry = get_consumed_entry(task_id); PTO2TaskState expected = PTO2_TASK_PENDING; - if (slot_state.task_state.compare_exchange_strong( + if (entry.task_state.compare_exchange_strong( expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire)) { atomic_count += 1; // CAS(task_state PENDING→READY) // Local-first: try per-CoreType thread-local buffer before global queue @@ -483,14 +522,14 @@ struct PTO2SchedulerState { #endif void init_task(int32_t task_id, PTO2TaskDescriptor* task) { - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(task_id); + PTO2ConsumedRingEntry& entry = get_consumed_entry(task_id); - slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); // Orchestrator is the unique owner + entry.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); // Reset fanout_refcount for new task lifecycle. // Do NOT reset fanin_refcount — it may have been incremented by // concurrent on_task_complete between Step 5 and Step 6. - slot_state.fanout_refcount.store(0, std::memory_order_relaxed); + entry.fanout_refcount.store(0, std::memory_order_relaxed); #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING extern uint64_t g_orch_finalize_atomic_count; @@ -591,7 +630,7 @@ struct PTO2SchedulerState { void on_mixed_task_complete(int32_t mixed_task_id, PTO2LocalReadyBuffer* local_bufs = nullptr) { #endif - PTO2TaskSlotState& slot_state = get_slot_state_by_task_id(mixed_task_id); + PTO2ConsumedRingEntry& entry = get_consumed_entry(mixed_task_id); #if PTO2_SCHED_PROFILING extern uint64_t g_sched_lock_cycle[], g_sched_fanout_cycle[]; @@ -602,13 +641,13 @@ struct PTO2SchedulerState { #endif #if PTO2_SCHED_PROFILING - pto2_fanout_lock(slot_state, lock_atomics, lock_wait); + pto2_fanout_lock(entry, lock_atomics, lock_wait); #else - pto2_fanout_lock(slot_state); + pto2_fanout_lock(entry); #endif - slot_state.task_state.store(PTO2_TASK_COMPLETED, std::memory_order_release); - PTO2DepListEntry* current = slot_state.fanout_head; // Protected by fanout_lock - pto2_fanout_unlock(slot_state); + entry.task_state.store(PTO2_TASK_COMPLETED, std::memory_order_release); + PTO2DepListEntry* current = entry.fanout_head; // Protected by fanout_lock + pto2_fanout_unlock(entry); #if PTO2_SCHED_PROFILING lock_atomics += 2; // state.store + unlock.store @@ -656,7 +695,12 @@ struct PTO2SchedulerState { } /** - * Cold path: release producers (fanin traversal) + check self for CONSUMED. + * Cold path: release producers (fanin traversal) + transition to RELEASED. + * + * SAFETY: payload->fanin_tasks must be fully read BEFORE setting RELEASED, + * because advance_main_ring_pointers() may reclaim the main ring slot + * once RELEASED is visible. + * * Returns fanin edge count for profiling. */ @@ -674,6 +718,8 @@ struct PTO2SchedulerState { int32_t slot = get_task_slot(task_id); PTO2TaskPayload* payload = &sm_handle->task_payloads[slot]; int32_t fanin_edges = payload->fanin_actual_count; + + // Read all fanin producer IDs from payload BEFORE marking RELEASED for (int32_t i = 0; i < fanin_edges; i++) { #if PTO2_SCHED_PROFILING release_producer(payload->fanin_tasks[i], fanin_atomics); @@ -686,15 +732,27 @@ struct PTO2SchedulerState { PTO2_SCHED_CYCLE_LAP(g_sched_fanin_cycle[thread_idx]); #endif - // Self consumed check + // Transition to RELEASED — payload is no longer needed + PTO2ConsumedRingEntry& entry = get_consumed_entry(task_id); + entry.task_state.store(PTO2_TASK_RELEASED, std::memory_order_release); + + // Try to advance main ring watermark + int32_t expected_lock = 0; + if (main_ring_advance_lock.compare_exchange_strong(expected_lock, 1, + std::memory_order_acquire, std::memory_order_relaxed)) { + advance_main_ring_pointers(); + main_ring_advance_lock.store(0, std::memory_order_release); + } + + // Self consumed check — fanout_refcount may already equal fanout_count #if PTO2_SCHED_PROFILING uint64_t self_atomics = 0; - check_and_handle_consumed(get_slot_state_by_slot(slot), self_atomics); + check_and_handle_consumed(entry, self_atomics); g_sched_self_atomic_count[thread_idx] += self_atomics; PTO2_SCHED_CYCLE_LAP(g_sched_self_consumed_cycle[thread_idx]); g_sched_complete_count[thread_idx]++; #else - check_and_handle_consumed(get_slot_state_by_slot(slot)); + check_and_handle_consumed(entry); #endif return fanin_edges; } @@ -706,7 +764,8 @@ struct PTO2SchedulerState { bool pto2_scheduler_init(PTO2SchedulerState* sched, PTO2SharedMemoryHandle* sm_handle, - void* heap_base); + void* heap_base, + uint64_t consumed_window_size); void pto2_scheduler_destroy(PTO2SchedulerState* sched); // ============================================================================= diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.cpp index f9f0f65f..06b56b0a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.cpp @@ -144,12 +144,14 @@ void pto2_sm_init_header(PTO2SharedMemoryHandle* handle, header->current_task_index.store(0, std::memory_order_relaxed); header->heap_top.store(0, std::memory_order_relaxed); header->orchestrator_done.store(0, std::memory_order_relaxed); - header->last_task_alive.store(0, std::memory_order_relaxed); + header->last_task_consumed.store(0, std::memory_order_relaxed); + header->last_task_released.store(0, std::memory_order_relaxed); header->heap_tail.store(0, std::memory_order_relaxed); header->heap_tail_gen.store(0, std::memory_order_relaxed); // Layout info header->task_window_size = task_window_size; + header->consumed_window_size = 0; // Set by runtime creation (default = 4 × task_window_size) header->heap_size = heap_size; // Calculate offsets @@ -182,7 +184,8 @@ void pto2_sm_print_layout(PTO2SharedMemoryHandle* handle) { LOG_INFO(" heap_tail: %" PRIu64, h->heap_tail.load(std::memory_order_acquire)); LOG_INFO(" current_task_index: %d", h->current_task_index.load(std::memory_order_acquire)); LOG_INFO(" orchestrator_done: %d", h->orchestrator_done.load(std::memory_order_acquire)); - LOG_INFO(" last_task_alive: %d", h->last_task_alive.load(std::memory_order_acquire)); + LOG_INFO(" last_task_consumed: %d", h->last_task_consumed.load(std::memory_order_acquire)); + LOG_INFO(" last_task_released: %d", h->last_task_released.load(std::memory_order_acquire)); LOG_INFO("================================"); } @@ -201,11 +204,13 @@ bool pto2_sm_validate(PTO2SharedMemoryHandle* handle) { // Check flow control pointer sanity int32_t current_task_index = h->current_task_index.load(std::memory_order_acquire); - int32_t last_task_alive = h->last_task_alive.load(std::memory_order_acquire); + int32_t last_task_consumed = h->last_task_consumed.load(std::memory_order_acquire); + int32_t last_task_released = h->last_task_released.load(std::memory_order_acquire); uint64_t heap_top = h->heap_top.load(std::memory_order_acquire); uint64_t heap_tail = h->heap_tail.load(std::memory_order_acquire); if (current_task_index < 0) return false; - if (last_task_alive < 0) return false; + if (last_task_consumed < 0) return false; + if (last_task_released < 0) return false; if (heap_top > h->heap_size) return false; if (heap_tail > h->heap_size) return false; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.h index b34d9461..28ff96aa 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_shared_memory.h @@ -49,12 +49,14 @@ typedef struct { // Written by Scheduler, Read by Orchestrator (for back-pressure) std::atomic heap_tail; // Heap ring free pointer (on-device, matches pto2_heap_ring_init) - std::atomic last_task_alive; // Task ring tail (oldest active task) + std::atomic last_task_consumed; // Consumed ring tail (oldest unconsumed task, drives heap reclaim) + std::atomic last_task_released; // Main ring tail (oldest unreleased task, drives slot reclaim) std::atomic heap_tail_gen; // Ticket counter for serialized heap_tail writes // (ensures concurrent threads write in task order) // === LAYOUT INFO (set once at init) === - uint64_t task_window_size; // PTO2_TASK_WINDOW_SIZE + uint64_t task_window_size; // Main ring capacity (PTO2_TASK_WINDOW_SIZE) + uint64_t consumed_window_size; // Consumed ring capacity (default = 4 × task_window_size) uint64_t heap_size; // Total heap size // Offsets into shared memory (relative to SM_Base) @@ -69,7 +71,7 @@ typedef struct { std::atomic graph_output_size; // Size in bytes // Padding to cache-line-aligned size (ALIGN_UP to PTO2_ALIGN_SIZE) - uint64_t _padding[4]; + uint64_t _padding[3]; } PTO2SharedMemoryHeader; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp index b0f78124..43ac4b19 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp @@ -108,7 +108,7 @@ bool PTO2TensorMap::init(int32_t new_num_buckets, int32_t new_pool_size, int32_t task_window_size = new_task_window_size; - last_task_alive = 0; + last_task_consumed = 0; return true; } @@ -193,7 +193,7 @@ void PTO2TensorMap::print_stats() { LOG_INFO("Empty buckets: %d", empty_buckets); LOG_INFO("Max chain len: %d", max_chain); LOG_INFO("Avg chain len: %.2f", non_empty_buckets > 0 ? (float)total_chain / non_empty_buckets : 0); - LOG_INFO("Last task alive: %d", last_task_alive); + LOG_INFO("Last task consumed: %d", last_task_consumed); LOG_INFO("============================"); } @@ -213,13 +213,13 @@ void PTO2TensorMap::sync_tensormap() { constexpr int MIN_FREE_NUM = 1024; always_assert(orch != nullptr); while(true) { - // Read current last_task_alive from shared memory - int32_t new_last_task_alive = - orch->sm_handle->header->last_task_alive.load(std::memory_order_acquire); - sync_validity(new_last_task_alive); - if ((pool_size - next_entry_idx + free_num < MIN_FREE_NUM) || new_last_task_alive - orch->tensormap_last_cleanup >= PTO2_TENSORMAP_CLEANUP_INTERVAL) { - cleanup_retired(orch->tensormap_last_cleanup, new_last_task_alive); - orch->tensormap_last_cleanup = new_last_task_alive; + // Read current last_task_consumed from shared memory + int32_t new_last_task_consumed = + orch->sm_handle->header->last_task_consumed.load(std::memory_order_acquire); + sync_validity(new_last_task_consumed); + if ((pool_size - next_entry_idx + free_num < MIN_FREE_NUM) || new_last_task_consumed - orch->tensormap_last_cleanup >= PTO2_TENSORMAP_CLEANUP_INTERVAL) { + cleanup_retired(orch->tensormap_last_cleanup, new_last_task_consumed); + orch->tensormap_last_cleanup = new_last_task_consumed; } else { break; } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 2f3f3e5d..de784d8f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -63,7 +63,7 @@ extern uint64_t g_insert_count; * Maps tensor region -> producer task ID * * Stored in ring buffer pool with lazy invalidation: - * - Entry is valid only if producer_task_id >= last_task_alive + * - Entry is valid only if producer_task_id >= last_task_consumed * - Stale entries ignored during lookup * - Pool wraps around, overwriting stale entries * @@ -128,7 +128,7 @@ struct PTO2TensorMap { int32_t task_window_size; // Runtime task window size (for slot masking) // Validity threshold (for lazy invalidation) - int32_t last_task_alive; // Cached value from shared memory + int32_t last_task_consumed; // Cached value from shared memory PTO2OrchestratorState* orch{nullptr}; @@ -200,9 +200,9 @@ struct PTO2TensorMap { * Update validity threshold from shared memory * Called periodically to refresh the lazy invalidation threshold. * - * @param last_task_alive Current value from shared memory + * @param last_task_consumed Current value from shared memory */ - void sync_validity(int32_t last_task_alive) { this->last_task_alive = last_task_alive; } + void sync_validity(int32_t last_task_consumed) { this->last_task_consumed = last_task_consumed; } /** * Lookup producer for a tensor region @@ -318,15 +318,15 @@ struct PTO2TensorMap { /** * Cleanup stale entries for retired tasks * - * Called periodically by Orchestrator when last_task_alive advances. + * Called periodically by Orchestrator when last_task_consumed advances. * Removes entries from bucket chains for tasks in [old, new) range. * - * @param old_last_task_alive Previous threshold - * @param new_last_task_alive New threshold + * @param old_last_task_consumed Previous threshold + * @param new_last_task_consumed New threshold */ - void cleanup_retired(int32_t old_last_task_alive, int32_t new_last_task_alive) { + void cleanup_retired(int32_t old_last_task_consumed, int32_t new_last_task_consumed) { // Iterate through retired tasks and remove their entries from bucket chains - for (int32_t task_id = old_last_task_alive; task_id < new_last_task_alive; task_id++) { + for (int32_t task_id = old_last_task_consumed; task_id < new_last_task_consumed; task_id++) { int32_t task_slot = task_id & (task_window_size - 1); PTO2TensorMapEntry* cur_entry = task_entry_head[task_slot]; @@ -364,7 +364,7 @@ struct PTO2TensorMap { * Check if entry is valid (producer has not retired) */ bool entry_valid(const PTO2TensorMapEntry& entry) const { - return entry.producer_task_id >= last_task_alive; + return entry.producer_task_id >= last_task_consumed; } void remove_entry(PTO2TensorMapEntry& entry) { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp index 7175e5f7..6badd598 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp @@ -26,6 +26,7 @@ Runtime::Runtime() { ready_queue_shards = RUNTIME_DEFAULT_READY_QUEUE_SHARDS; pto2_task_window_size = 0; pto2_heap_size = 0; + pto2_consumed_window_size = 0; // Initialize tensor pairs tensor_pair_count = 0; @@ -34,7 +35,7 @@ Runtime::Runtime() { orch_built_on_host_ = true; pto2_gm_sm_ptr_ = nullptr; pto2_gm_heap_ptr_ = nullptr; - pto2_slot_states_ptr_ = nullptr; + pto2_consumed_ring_ptr_ = nullptr; orch_args_ = nullptr; orch_arg_count_ = 0; @@ -94,7 +95,7 @@ int Runtime::get_orch_arg_count() const { return orch_arg_count_; } void Runtime::set_orch_built_on_host(bool v) { orch_built_on_host_ = v; } void Runtime::set_pto2_gm_sm_ptr(void* p) { pto2_gm_sm_ptr_ = p; } void Runtime::set_pto2_gm_heap(void* p) { pto2_gm_heap_ptr_ = p; } -void Runtime::set_pto2_slot_states_ptr(void* p) { pto2_slot_states_ptr_ = p; } +void Runtime::set_pto2_consumed_ring_ptr(void* p) { pto2_consumed_ring_ptr_ = p; } void Runtime::set_orch_args(uint64_t* args, int count) { orch_arg_count_ = count <= RUNTIME_MAX_ARGS ? count : RUNTIME_MAX_ARGS; if (args && orch_arg_count_ > 0) { @@ -167,15 +168,15 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { return; } - // Get slot states for fanout traversal - PTO2TaskSlotState* slot_states = static_cast(pto2_slot_states_ptr_); - if (slot_states == nullptr) { + // Get consumed ring entries for fanout traversal + PTO2ConsumedRingEntry* consumed_ring = static_cast(pto2_consumed_ring_ptr_); + if (consumed_ring == nullptr) { return; } - // Get window mask from shared memory header + // Get consumed window mask from shared memory header PTO2SharedMemoryHeader* header = static_cast(sm_base); - int32_t window_mask = header->task_window_size - 1; + int32_t consumed_window_mask = header->consumed_window_size - 1; uint32_t count = perf_buf->count; @@ -183,13 +184,13 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { PerfRecord* record = &perf_buf->records[i]; int32_t task_id = record->task_id; - // Get slot state for fanout traversal - int32_t slot = task_id & window_mask; - PTO2TaskSlotState& ss = slot_states[slot]; + // Get consumed ring entry for fanout traversal + int32_t slot = task_id & consumed_window_mask; + PTO2ConsumedRingEntry& cr = consumed_ring[slot]; // Fill fanout information by traversing the linked list record->fanout_count = 0; - PTO2DepListEntry* cur = ss.fanout_head; + PTO2DepListEntry* cur = cr.fanout_head; while (cur != nullptr && record->fanout_count < RUNTIME_MAX_FANOUT) { record->fanout[record->fanout_count++] = cur->task_id; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h index 7690cd90..66e80887 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h @@ -148,6 +148,7 @@ class Runtime { // Ring buffer size overrides (0 = use compile-time defaults) uint64_t pto2_task_window_size; uint64_t pto2_heap_size; + uint64_t pto2_consumed_window_size; // Consumed ring capacity (0 = 4 × task_window_size) // PTO2 integration: kernel_id -> GM function_bin_addr mapping // NOTE: Made public for direct access from aicore code @@ -170,7 +171,7 @@ class Runtime { bool orch_built_on_host_; void* pto2_gm_sm_ptr_; // GM pointer to PTO2 shared memory (device) void* pto2_gm_heap_ptr_; // GM heap for orchestrator output buffers (device) - void* pto2_slot_states_ptr_; // Pointer to PTO2TaskSlotState array (scheduler-private, for profiling) + void* pto2_consumed_ring_ptr_; // Pointer to PTO2ConsumedRingEntry array (scheduler-private, for profiling) uint64_t* orch_args_; // Arguments for device orchestration int orch_arg_count_; uint64_t orch_args_storage_[RUNTIME_MAX_ARGS]; // Copy of args for device @@ -236,7 +237,7 @@ class Runtime { void set_orch_built_on_host(bool v); void set_pto2_gm_sm_ptr(void* p); void set_pto2_gm_heap(void* p); - void set_pto2_slot_states_ptr(void* p); + void set_pto2_consumed_ring_ptr(void* p); void set_orch_args(uint64_t* args, int count); // Device orchestration SO binary (for dlopen on AICPU thread 3)