diff --git a/src/a2a3/platform/onboard/host/host_regs.cpp b/src/a2a3/platform/onboard/host/host_regs.cpp index 43cbbc51..bd2f9e26 100644 --- a/src/a2a3/platform/onboard/host/host_regs.cpp +++ b/src/a2a3/platform/onboard/host/host_regs.cpp @@ -116,7 +116,7 @@ static void get_aicore_regs(std::vector& regs, uint64_t device_id) { if (rt != 0) { LOG_ERROR("get_aicore_reg_info failed, using placeholder addresses"); // Fallback: generate placeholder addresses - for (int i = 0; i < DAV_2201::PLATFORM_MAX_PHYSICAL_CORES; i++) { + for (uint32_t i = 0; i < DAV_2201::PLATFORM_MAX_PHYSICAL_CORES; i++) { aic.push_back(0xDEADBEEF00000000ULL + (i * 0x800000)); // 8M stride aiv.push_back(0xDEADBEEF00000000ULL + (i * 0x800000) + 0x100000); aiv.push_back(0xDEADBEEF00000000ULL + (i * 0x800000) + 0x200000); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 478e6640..2c141d37 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -1777,34 +1777,53 @@ int32_t AicpuExecutor::run(Runtime* runtime) { // Print orchestrator profiling data #if PTO2_ORCH_PROFILING PTO2OrchProfilingData p = pto2_orchestrator_get_profiling(); - uint64_t total = p.sync_cycle + p.alloc_cycle + p.params_cycle + - p.lookup_cycle + p.heap_cycle + p.insert_cycle + - p.fanin_cycle; + uint64_t total = p.sync_cycle + p.alloc_cycle + p.params_cycle + p.lookup_cycle + p.heap_cycle + + p.insert_cycle + p.fanin_cycle; if (total == 0) total = 1; // avoid div-by-zero - DEV_ALWAYS("Thread %d: === Orchestrator Profiling: %lld tasks, total=%.3fus ===", thread_idx, - (long long)p.submit_count, cycles_to_us(total)); - DEV_ALWAYS("Thread %d: sync_tensormap : %.3fus (%.1f%%)", thread_idx, cycles_to_us(p.sync_cycle), p.sync_cycle * 100.0 / total); - DEV_ALWAYS("Thread %d: task_ring_alloc: %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", thread_idx, - cycles_to_us(p.alloc_cycle), p.alloc_cycle * 100.0 / total, - cycles_to_us(p.alloc_cycle - p.alloc_wait_cycle), cycles_to_us(p.alloc_wait_cycle), + DEV_ALWAYS("Thread %d: === Orchestrator Profiling: %lld tasks, total=%.3fus ===", + thread_idx, + (long long)p.submit_count, + cycles_to_us(total)); + DEV_ALWAYS("Thread %d: task_ring_alloc: %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", + thread_idx, + cycles_to_us(p.alloc_cycle), + p.alloc_cycle * 100.0 / total, + cycles_to_us(p.alloc_cycle - p.alloc_wait_cycle), + cycles_to_us(p.alloc_wait_cycle), (unsigned long long)p.alloc_atomic_count); - DEV_ALWAYS("Thread %d: param_copy : %.3fus (%.1f%%) atomics=%llu", thread_idx, - cycles_to_us(p.params_cycle), p.params_cycle * 100.0 / total, - (unsigned long long)p.params_atomic_count); - DEV_ALWAYS("Thread %d: lookup+dep : %.3fus (%.1f%%)", thread_idx, cycles_to_us(p.lookup_cycle), p.lookup_cycle * 100.0 / total); - DEV_ALWAYS("Thread %d: heap_alloc : %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", thread_idx, - cycles_to_us(p.heap_cycle), p.heap_cycle * 100.0 / total, - cycles_to_us(p.heap_cycle - p.heap_wait_cycle), cycles_to_us(p.heap_wait_cycle), + DEV_ALWAYS("Thread %d: heap_alloc : %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", + thread_idx, + cycles_to_us(p.heap_cycle), + p.heap_cycle * 100.0 / total, + cycles_to_us(p.heap_cycle - p.heap_wait_cycle), + cycles_to_us(p.heap_wait_cycle), (unsigned long long)p.heap_atomic_count); - DEV_ALWAYS("Thread %d: tensormap_ins : %.3fus (%.1f%%)", thread_idx, cycles_to_us(p.insert_cycle), p.insert_cycle * 100.0 / total); - DEV_ALWAYS("Thread %d: fanin+ready : %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", thread_idx, - cycles_to_us(p.fanin_cycle), p.fanin_cycle * 100.0 / total, - cycles_to_us(p.fanin_cycle - p.fanin_wait_cycle), cycles_to_us(p.fanin_wait_cycle), + DEV_ALWAYS("Thread %d: sync_tensormap : %.3fus (%.1f%%)", + thread_idx, + cycles_to_us(p.sync_cycle), + p.sync_cycle * 100.0 / total); + DEV_ALWAYS("Thread %d: lookup+dep : %.3fus (%.1f%%)", + thread_idx, + cycles_to_us(p.lookup_cycle), + p.lookup_cycle * 100.0 / total); + DEV_ALWAYS("Thread %d: tensormap_ins : %.3fus (%.1f%%)", + thread_idx, + cycles_to_us(p.insert_cycle), + p.insert_cycle * 100.0 / total); + DEV_ALWAYS("Thread %d: param_copy : %.3fus (%.1f%%) atomics=%llu", + thread_idx, + cycles_to_us(p.params_cycle), + p.params_cycle * 100.0 / total, + (unsigned long long)p.params_atomic_count); + DEV_ALWAYS("Thread %d: fanin+ready : %.3fus (%.1f%%) work=%.3fus wait=%.3fus atomics=%llu", + thread_idx, + cycles_to_us(p.fanin_cycle), + p.fanin_cycle * 100.0 / total, + cycles_to_us(p.fanin_cycle - p.fanin_wait_cycle), + cycles_to_us(p.fanin_wait_cycle), (unsigned long long)p.fanin_atomic_count); - DEV_ALWAYS("Thread %d: scope_end : %.3fus atomics=%llu", thread_idx, - cycles_to_us(p.scope_end_cycle), - (unsigned long long)p.scope_end_atomic_count); - DEV_ALWAYS("Thread %d: avg/task : %.3fus", thread_idx, + DEV_ALWAYS("Thread %d: avg/task : %.3fus", + thread_idx, p.submit_count > 0 ? cycles_to_us(total) / p.submit_count : 0.0); #if PTO2_TENSORMAP_PROFILING diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md index 318055c4..22de1070 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_RING.md @@ -67,7 +67,6 @@ PTO2DepListPool dep_pool; // After: per-ring array PTO2RingSet rings[PTO2_MAX_RING_DEPTH]; -PTO2DepListEntry* dep_pool_cur_entries[PTO2_MAX_RING_DEPTH]; int32_t dep_pool_last_reclaimed[PTO2_MAX_RING_DEPTH]; ``` diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index e21bd291..d62e0f9a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -16,6 +16,7 @@ #include "common/unified_log.h" #include "pto_runtime2_types.h" +#include "pto_shared_memory.h" #include "pto_tensormap.h" #include "pto_types.h" #include "tensor.h" @@ -116,19 +117,18 @@ bool pto2_orchestrator_init( for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { // Each ring gets its own heap region void* ring_heap_base = (char*)gm_heap + r * heap_size; + auto &fc = sm_handle->header->rings[r].fc; // Initialize heap ring buffer - pto2_heap_ring_init(&orch->rings[r].heap_ring, ring_heap_base, heap_size, - &sm_handle->header->rings[r].fc.heap_tail, - &sm_handle->header->rings[r].fc.heap_top); + pto2_heap_ring_init(&orch->rings[r].heap_ring, ring_heap_base, heap_size, &fc.heap_tail, &fc.heap_top); orch->rings[r].heap_ring.error_code_ptr = &sm_handle->header->orch_error_code; // Initialize task ring buffer pto2_task_ring_init(&orch->rings[r].task_ring, sm_handle->task_descriptors[r], sm_handle->header->rings[r].task_window_size, - &sm_handle->header->rings[r].fc.last_task_alive, - &sm_handle->header->rings[r].fc.current_task_index); + &fc.last_task_alive, + &fc.current_task_index); orch->rings[r].task_ring.error_code_ptr = &sm_handle->header->orch_error_code; // Allocate and initialize dependency list pool (per-ring) @@ -140,9 +140,7 @@ bool pto2_orchestrator_init( } return false; } - pto2_dep_pool_init(&orch->rings[r].dep_pool, dep_entries, dep_pool_capacity); - orch->rings[r].dep_pool.error_code_ptr = &sm_handle->header->orch_error_code; - orch->dep_pool_cur_entries[r] = nullptr; + orch->rings[r].dep_pool.init(dep_entries, dep_pool_capacity, &sm_handle->header->orch_error_code); } // Initialize TensorMap with per-ring task window sizes @@ -198,62 +196,6 @@ void pto2_orchestrator_set_scheduler(PTO2OrchestratorState* orch, PTO2SchedulerS orch->scheduler = scheduler; } - -/** - * Ensure dep pool for a specific ring has at least `needed` entries available. - * Spin-waits for reclamation if under pressure. Detects deadlock if no progress. - */ -static void pto2_dep_pool_ensure_space(PTO2OrchestratorState* orch, uint8_t ring_id, int32_t needed) { - if (pto2_dep_pool_available(&orch->rings[ring_id].dep_pool) >= needed) return; - - int spin_count = 0; - int32_t prev_last_alive = - orch->sm_handle->header->rings[ring_id].fc.last_task_alive.load(std::memory_order_acquire); - while (pto2_dep_pool_available(&orch->rings[ring_id].dep_pool) < needed) { - orch->rings[ring_id].dep_pool.reclaim(orch->scheduler, ring_id, prev_last_alive); - if (pto2_dep_pool_available(&orch->rings[ring_id].dep_pool) >= needed) return; - - spin_count++; - - // Progress detection: reset spin counter if last_task_alive advances - int32_t cur_last_alive = - orch->sm_handle->header->rings[ring_id].fc.last_task_alive.load(std::memory_order_acquire); - if (cur_last_alive > prev_last_alive) { - spin_count = 0; - prev_last_alive = cur_last_alive; - } - - if (spin_count >= PTO2_DEP_POOL_SPIN_LIMIT) { - auto& pool = orch->rings[ring_id].dep_pool; - int32_t used = pool.top - pool.tail; - int32_t current = orch->rings[ring_id].task_ring.current_index_ptr->load(std::memory_order_acquire); - LOG_ERROR("========================================"); - LOG_ERROR("FATAL: Dependency Pool Deadlock Detected! (ring %d)", ring_id); - LOG_ERROR("========================================"); - LOG_ERROR("DepListPool cannot reclaim space after %d spins (no progress).", spin_count); - LOG_ERROR(" - Pool used: %d / %d (%.1f%%)", used, pool.capacity, - (pool.capacity > 0) ? (100.0 * used / pool.capacity) : 0.0); - LOG_ERROR(" - Pool top: %d (linear)", pool.top); - LOG_ERROR(" - Pool tail: %d (linear)", pool.tail); - LOG_ERROR(" - High water: %d", pool.high_water); - LOG_ERROR(" - Needed: %d entries", needed); - LOG_ERROR(" - last_task_alive: %d (stuck here)", cur_last_alive); - LOG_ERROR(" - current_task: %d", current); - LOG_ERROR(" - In-flight tasks: %d", current - cur_last_alive); - LOG_ERROR("Diagnosis:"); - LOG_ERROR(" last_task_alive is not advancing, so dep pool tail"); - LOG_ERROR(" cannot reclaim. Check TaskRing diagnostics for root cause."); - LOG_ERROR("Solution:"); - LOG_ERROR(" Increase dep pool capacity (current: %d, recommended: %d)", pool.capacity, pool.high_water * 2); - LOG_ERROR(" Compile-time: PTO2_DEP_LIST_POOL_SIZE in pto_runtime2_types.h"); - LOG_ERROR(" Runtime env: PTO2_RING_DEP_POOL=%d", pool.high_water * 2); - LOG_ERROR("========================================"); - exit(1); - } - SPIN_WAIT_HINT(); - } -} - // ============================================================================= // Scope Management // ============================================================================= @@ -307,12 +249,12 @@ void pto2_scope_end(PTO2OrchestratorState* orch) { // ============================================================================= void pto2_submit_mixed_task( PTO2OrchestratorState* orch, const MixedKernels& mixed_kernels, const PTOParam& params) { + CYCLE_COUNT_START(); + // Fast path after fatal error — all subsequent submits are no-ops if (orch->fatal) { return; } - - PTO2SchedulerState* sched = orch->scheduler; // Validate PTOParam construction (errors recorded by add_input/add_output/etc.) if (params.has_error) { @@ -329,7 +271,12 @@ void pto2_submit_mixed_task( return; } - CYCLE_COUNT_START(); + + // Determine which ring this task belongs to + uint8_t ring_id = orch->current_ring_id(); + auto& task_ring = orch->rings[ring_id].task_ring; + PTO2SchedulerState* sched = orch->scheduler; + PTO2RingFlowControl &fc = orch->sm_handle->header->rings[ring_id].fc; // === Validate submit inputs === uint8_t active_mask = pto2_mixed_kernels_to_active_mask(mixed_kernels); @@ -347,24 +294,6 @@ void pto2_submit_mixed_task( active_mask = pto2_mixed_kernels_to_active_mask(normalized); } - // === STEP 0: Sync TensorMap validity and optional cleanup === - - // Determine which ring this task belongs to - uint8_t ring_id = orch->current_ring_id(); - auto& task_ring = orch->rings[ring_id].task_ring; - - // Read current last_task_alive from shared memory for this ring - int32_t sm_last_task_alive = - orch->sm_handle->header->rings[ring_id].fc.last_task_alive.load(std::memory_order_acquire); - - orch->tensor_map.sync_tensormap(ring_id, sm_last_task_alive); - - if (sched) { - orch->rings[ring_id].dep_pool.reclaim(sched, ring_id, sm_last_task_alive); - } - - CYCLE_COUNT_LAP_RECORD(g_orch_sync_cycle, AicpuPhaseId::ORCH_SYNC, -1); - // Submission without an open scope is illegal always_assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope"); @@ -414,7 +343,7 @@ void pto2_submit_mixed_task( PTO2TaskId mixed_task_id = pto2_make_task_id(ring_id, static_cast(local_id)); PTO2TaskDescriptor& task = task_ring.get_task_by_slot(slot); - PTO2TaskPayload* payload = &orch->sm_handle->task_payloads[ring_id][slot]; + PTO2TaskPayload* payload = &orch->sm_handle->task_payloads[ring_id][slot]; // Early write-prefetch payload GM cache lines to issue RFO in background. // ~130 lines of computation (output_size, lookup, insert) follow before @@ -424,8 +353,8 @@ void pto2_submit_mixed_task( __builtin_prefetch(&payload->tensors[i], 1, 3); __builtin_prefetch(reinterpret_cast(&payload->tensors[i]) + 64, 1, 3); } - for (int32_t j = 0; j < params.scalar_count; j += 8) { - __builtin_prefetch(&payload->scalars[j], 1, 3); + for (int32_t i = 0; i < params.scalar_count; i += 8) { + __builtin_prefetch(&payload->scalars[i], 1, 3); } __builtin_prefetch(payload, 1, 3); __builtin_prefetch(reinterpret_cast(payload) + 64, 1, 3); @@ -460,7 +389,7 @@ void pto2_submit_mixed_task( CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC, local_id); - // === Calculate output size + heap alloc (read from params only, no GM access) === + // === STEP 2: Calculate output size + heap alloc (read from params only, no GM access) === int32_t total_output_size = 0; for (int i = 0; i < params.tensor_count; i++) { if (params.tensor_types[i] == PTOParamType::OUTPUT @@ -483,7 +412,19 @@ void pto2_submit_mixed_task( } #endif - // === Lookup inputs + assign output addrs (all from params, no GM) === + // === STEP 3: Sync TensorMap validity and optional cleanup === + // Read current last_task_alive from shared memory for this ring + int32_t sm_last_task_alive = fc.last_task_alive.load(std::memory_order_acquire); + + orch->tensor_map.sync_tensormap(ring_id, sm_last_task_alive); + + if (sched) { + orch->rings[ring_id].dep_pool.reclaim(*sched, ring_id, sm_last_task_alive); + } + + CYCLE_COUNT_LAP_RECORD(g_orch_sync_cycle, AicpuPhaseId::ORCH_SYNC, local_id); + + // === STEP 4: Lookup inputs + assign output addrs (all from params, no GM) === int32_t offset = 0; for (int i = 0; i < params.tensor_count; i++) { PTOParamType ptype = params.tensor_types[i]; @@ -500,8 +441,8 @@ void pto2_submit_mixed_task( PTO2TensorMapEntry& entry = *lookup_result.entries[r].entry; auto overlap_status = lookup_result.entries[r].overlap_status; // Check if this producer is already in fanin list (avoid duplicates) - int32_t prod_ring = static_cast(pto2_task_id_ring(entry.producer_task_id)); - int32_t prod_local = static_cast(pto2_task_id_local(entry.producer_task_id)); + auto prod_ring = entry.producer_task_id.ring(); + auto prod_local = entry.producer_task_id.local(); PTO2TaskSlotState* prod_state = &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); bool already_added = false; @@ -545,7 +486,7 @@ void pto2_submit_mixed_task( CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP, local_id); - // === Register outputs/inouts in TensorMap (must be separate from lookup) === + // === STEP 5: Register outputs/inouts in TensorMap (must be separate from lookup) === for (int i = 0; i < params.tensor_count; i++) { PTOParamType ptype = params.tensor_types[i]; if (ptype == PTOParamType::OUTPUT || ptype == PTOParamType::INOUT) { @@ -557,7 +498,7 @@ void pto2_submit_mixed_task( CYCLE_COUNT_LAP_RECORD(g_orch_insert_cycle, AicpuPhaseId::ORCH_INSERT, local_id); - // === Batch-write task descriptor to GM (single cache line burst) === + // === STEP 6: Batch-write to GM (single cache line burst) === // Deferred from allocation phase to avoid scattered GM writes that get // evicted by TensorMap lookup/insert cache pressure. __builtin_prefetch(&task, 1, 1); @@ -585,7 +526,7 @@ void pto2_submit_mixed_task( g_orch_params_atomic_count += 2; // fanout_lock.store + fanout_count.store #endif - // === STEP 5: Finalize fanin list === + // === STEP 7: Finalize fanin list === // First build the fanin list if (sched) { auto& rs = sched->ring_sched_states[ring_id]; @@ -595,13 +536,9 @@ void pto2_submit_mixed_task( cur_slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed); cur_slot_state.fanout_refcount.store(0, std::memory_order_relaxed); - // Ensure dep pool has space: fanin_count entries + 1 pre-alloc - pto2_dep_pool_ensure_space(orch, ring_id, fanin_count + 1); - auto& dep_pool = orch->rings[ring_id].dep_pool; - if (orch->dep_pool_cur_entries[ring_id] == nullptr) { - orch->dep_pool_cur_entries[ring_id] = dep_pool.alloc(); - } + // Ensure dep pool has space: fanin_count entries + 1 pre-alloc + dep_pool.ensure_space(*sched, fc, ring_id, fanin_count + 1); int32_t early_finished = 0; cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early @@ -611,8 +548,6 @@ void pto2_submit_mixed_task( } for (int i = 0; i < fanin_count; i++) { PTO2TaskSlotState& producer_slot_state = *fanin_states[i]; - orch->dep_pool_cur_entries[ring_id]->slot_state = &cur_slot_state; - orch->dep_pool_cur_entries[ring_id]->next = producer_slot_state.fanout_head; #if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle); #else @@ -626,12 +561,9 @@ void pto2_submit_mixed_task( // decrement fanin_count early_finished++; } else { - producer_slot_state.fanout_head = orch->dep_pool_cur_entries[ring_id]; + producer_slot_state.fanout_head = dep_pool.prepend(producer_slot_state.fanout_head, &cur_slot_state); } pto2_fanout_unlock(producer_slot_state); - if (producer_slot_state.fanout_head == orch->dep_pool_cur_entries[ring_id]) { - orch->dep_pool_cur_entries[ring_id] = dep_pool.alloc(); - } } // Combined release: merge early_finished batch with the +1 init release // into a single atomic fetch_add (saves one acq_rel cache-line bounce per task). @@ -707,7 +639,7 @@ void pto2_orchestrator_print_stats(PTO2OrchestratorState* orch) { orch->rings[r].heap_ring.top_ptr->load(std::memory_order_relaxed), orch->rings[r].heap_ring.size); LOG_INFO("Ring %d dep pool: %d / %d", r, - pto2_dep_pool_used(&orch->rings[r].dep_pool), + orch->rings[r].dep_pool.used(), orch->rings[r].dep_pool.capacity); } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h index 0aca69a8..a2d4898d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.h @@ -41,7 +41,6 @@ struct PTO2OrchestratorState { // === PER-RING RESOURCES === PTO2RingSet rings[PTO2_MAX_RING_DEPTH]; - PTO2DepListEntry* dep_pool_cur_entries[PTO2_MAX_RING_DEPTH]; // === TENSOR MAP (Private) === PTO2TensorMap tensor_map; // Producer lookup diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp index 05d96c11..daac7846 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.cpp @@ -43,23 +43,9 @@ void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors, // ============================================================================= // Dependency List Pool Implementation // ============================================================================= - -void pto2_dep_pool_init(PTO2DepListPool* pool, PTO2DepListEntry* base, int32_t capacity) { - pool->base = base; - pool->capacity = capacity; - pool->top = 1; // Start from 1, 0 means NULL/empty - pool->tail = 1; // Match initial top (no reclaimable entries yet) - pool->high_water = 0; - pool->last_reclaimed = 0; - - // Initialize entry 0 as NULL marker - pool->base[0].slot_state = nullptr; - pool->base[0].next = nullptr; -} - -void PTO2DepListPool::reclaim(PTO2SchedulerState* sched, uint8_t ring_id, int32_t sm_last_task_alive) { +void PTO2DepListPool::reclaim(PTO2SchedulerState& sched, uint8_t ring_id, int32_t sm_last_task_alive) { if (sm_last_task_alive >= last_reclaimed + PTO2_DEP_POOL_CLEANUP_INTERVAL && sm_last_task_alive > 0) { - int32_t mark = sched->ring_sched_states[ring_id].get_slot_state_by_task_id(sm_last_task_alive - 1).dep_pool_mark; + int32_t mark = sched.ring_sched_states[ring_id].get_slot_state_by_task_id(sm_last_task_alive - 1).dep_pool_mark; if (mark > 0) { advance_tail(mark); } @@ -67,10 +53,52 @@ void PTO2DepListPool::reclaim(PTO2SchedulerState* sched, uint8_t ring_id, int32_ } } -int32_t pto2_dep_pool_used(PTO2DepListPool* pool) { - return pool->top - pool->tail; -} +void PTO2DepListPool::ensure_space( + PTO2SchedulerState& sched, PTO2RingFlowControl& fc, uint8_t ring_id, int32_t needed) { + if (available() >= needed) return; -int32_t pto2_dep_pool_available(PTO2DepListPool* pool) { - return pool->capacity - (pool->top - pool->tail); -} + int spin_count = 0; + int32_t prev_last_alive = fc.last_task_alive.load(std::memory_order_acquire); + while (available() < needed) { + reclaim(sched, ring_id, prev_last_alive); + if (available() >= needed) return; + + spin_count++; + + // Progress detection: reset spin counter if last_task_alive advances + int32_t cur_last_alive = fc.last_task_alive.load(std::memory_order_acquire); + if (cur_last_alive > prev_last_alive) { + spin_count = 0; + prev_last_alive = cur_last_alive; + } + + if (spin_count >= PTO2_DEP_POOL_SPIN_LIMIT) { + int32_t current = fc.current_task_index.load(std::memory_order_acquire); + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Dependency Pool Deadlock Detected! (ring %d)", ring_id); + LOG_ERROR("========================================"); + LOG_ERROR("DepListPool cannot reclaim space after %d spins (no progress).", spin_count); + LOG_ERROR(" - Pool used: %d / %d (%.1f%%)", + used(), + capacity, + (capacity > 0) ? (100.0 * used() / capacity) : 0.0); + LOG_ERROR(" - Pool top: %d (linear)", top); + LOG_ERROR(" - Pool tail: %d (linear)", tail); + LOG_ERROR(" - High water: %d", high_water); + LOG_ERROR(" - Needed: %d entries", needed); + LOG_ERROR(" - last_task_alive: %d (stuck here)", cur_last_alive); + LOG_ERROR(" - current_task: %d", current); + LOG_ERROR(" - In-flight tasks: %d", current - cur_last_alive); + LOG_ERROR("Diagnosis:"); + LOG_ERROR(" last_task_alive is not advancing, so dep pool tail"); + LOG_ERROR(" cannot reclaim. Check TaskRing diagnostics for root cause."); + LOG_ERROR("Solution:"); + LOG_ERROR(" Increase dep pool capacity (current: %d, recommended: %d)", capacity, high_water * 2); + LOG_ERROR(" Compile-time: PTO2_DEP_LIST_POOL_SIZE in pto_runtime2_types.h"); + LOG_ERROR(" Runtime env: PTO2_RING_DEP_POOL=%d", high_water * 2); + LOG_ERROR("========================================"); + exit(1); + } + SPIN_WAIT_HINT(); + } +} \ No newline at end of file diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h index 2318984a..dc60228a 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_ring_buffer.h @@ -475,6 +475,27 @@ struct PTO2DepListPool { // Error code pointer for fatal error reporting (→ sm_header->orch_error_code) std::atomic* error_code_ptr = nullptr; + /** + * Initialize dependency list pool + * + * @param base Pool base address from shared memory + * @param capacity Total number of entries + */ + void init(PTO2DepListEntry* in_base, int32_t in_capacity, std::atomic* in_error_code_ptr) { + base = in_base; + capacity = in_capacity; + top = 1; // Start from 1, 0 means NULL/empty + tail = 1; // Match initial top (no reclaimable entries yet) + high_water = 0; + last_reclaimed = 0; + + // Initialize entry 0 as NULL marker + base[0].slot_state = nullptr; + base[0].next = nullptr; + + error_code_ptr = in_error_code_ptr; + } + /** * Reclaim dead entries based on scheduler's slot state dep_pool_mark. * Safe to call multiple times — only advances tail forward. @@ -483,7 +504,13 @@ struct PTO2DepListPool { * @param ring_id Ring layer index * @param sm_last_task_alive Current last_task_alive from shared memory */ - void reclaim(PTO2SchedulerState* sched, uint8_t ring_id, int32_t sm_last_task_alive); + void reclaim(PTO2SchedulerState& sched, uint8_t ring_id, int32_t sm_last_task_alive); + + /** + * Ensure dep pool for a specific ring has at least `needed` entries available. + * Spin-waits for reclamation if under pressure. Detects deadlock if no progress. + */ + void ensure_space(PTO2SchedulerState& sched, PTO2RingFlowControl &fc, uint8_t ring_id, int32_t needed); /** * Allocate a single entry from the pool (single-thread per pool instance) @@ -536,7 +563,7 @@ struct PTO2DepListPool { * @param task_slot Task slot to prepend * @return New head offset */ - PTO2DepListEntry* pto2_dep_list_prepend(PTO2DepListEntry* cur, PTO2TaskSlotState* slot_state) { + PTO2DepListEntry* prepend(PTO2DepListEntry* cur, PTO2TaskSlotState* slot_state) { PTO2DepListEntry* new_entry = alloc(); if (!new_entry) return nullptr; new_entry->slot_state = slot_state; @@ -551,22 +578,15 @@ struct PTO2DepListPool { if (offset <= 0) return NULL; return &base[offset]; } -}; -/** - * Initialize dependency list pool - * - * @param pool Pool to initialize - * @param base Pool base address from shared memory - * @param capacity Total number of entries - */ -void pto2_dep_pool_init(PTO2DepListPool* pool, PTO2DepListEntry* base, int32_t capacity); + int32_t used() const { + return top - tail; + } -/** - * Get pool usage statistics - */ -int32_t pto2_dep_pool_used(PTO2DepListPool* pool); -int32_t pto2_dep_pool_available(PTO2DepListPool* pool); + int32_t available() const { + return capacity - used(); + } +}; // ============================================================================= // Ring Set (per-depth aggregate) diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp index bd06e1c9..3d4238ea 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/paged_attention_unroll/kernels/orchestration/paged_attention_orch.cpp @@ -39,8 +39,18 @@ inline uint64_t get_sys_cnt_aicpu() { return ticks; } +#ifdef ENABLE_PROFILING #define CYCLE_COUNT_START() uint64_t _t0 = get_sys_cnt_aicpu(), _t1 -#define CYCLE_COUNT_LAP(acc) do { _t1 = get_sys_cnt_aicpu(); acc += (_t1 - _t0); _t0 = _t1; } while(0) +#define CYCLE_COUNT_LAP(acc) \ + do { \ + _t1 = get_sys_cnt_aicpu(); \ + acc += (_t1 - _t0); \ + _t0 = _t1; \ + } while (0) +#else +#define CYCLE_COUNT_START() (void)0 +#define CYCLE_COUNT_LAP(acc) (void)0 +#endif // Helper to encode float as uint64_t for scalar params static uint64_t float_to_u64(float f) { @@ -70,15 +80,18 @@ __attribute__((visibility("default"))) PTO2OrchestrationConfig aicpu_orchestrati __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, int orch_thread_num, int orch_thread_index) { (void)orch_thread_num; (void)orch_thread_index; +#ifdef ENABLE_PROFILING uint64_t prof_param_extract = 0; uint64_t prof_ext_tensor = 0; uint64_t prof_make_tensor = 0; uint64_t prof_tensor_view = 0; uint64_t prof_param_setup = 0; uint64_t prof_submit_task = 0; + uint64_t prof_scope_and_loop = 0; int prof_submit_count = 0; int prof_make_count = 0; int prof_view_count = 0; +#endif CYCLE_COUNT_START(); @@ -115,8 +128,6 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim DataType data_type = DataType::BFLOAT16; CYCLE_COUNT_LAP(prof_param_extract); - LOG_ALWAYS(rt, ">>>>>> batch = %lu", (unsigned long)batch); - uint32_t query_shapes[2] = {(uint32_t)(batch * num_heads), (uint32_t)head_dim}; uint32_t key_cache_shapes[2] = {(uint32_t)(batch * block_num * block_size), (uint32_t)head_dim}; uint32_t value_cache_shapes[2] = {(uint32_t)(batch * block_num * block_size), (uint32_t)head_dim}; @@ -125,7 +136,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim Tensor key_cache = make_tensor_external(host_key_cache, key_cache_shapes, 2, data_type, false); Tensor value_cache = make_tensor_external(host_value_cache, value_cache_shapes, 2, data_type, false); Tensor out = make_tensor_external(host_out, out_shapes, 2, DataType::FLOAT32); + +#ifdef ENABLE_PROFILING CYCLE_COUNT_LAP(prof_ext_tensor); +#endif // Prefetch first batch's block table data into cache (4 cache lines = 256 bytes) for (int cl = 0; cl < N_UNROLL * (int)sizeof(int); cl += 64) { @@ -148,9 +162,9 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim __builtin_prefetch(&host_context_lens[b_idx + 1], 0, 3); } for (uint64_t q_idx = 0; q_idx < q_loop; q_idx++) { + CYCLE_COUNT_LAP(prof_scope_and_loop); PTO2_SCOPE(rt) { uint64_t cur_offset = b_idx * q_head_num + q_idx * q_tile; - CYCLE_COUNT_START(); uint32_t oi_shapes[2] = {(uint32_t)q_tile, (uint32_t)head_dim}; uint32_t li_shapes[1] = {(uint32_t)q_tile}; @@ -158,8 +172,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim Tensor oi = make_tensor(oi_shapes, 2, DataType::FLOAT32); Tensor li_update = make_tensor(li_shapes, 1, DataType::FLOAT32, false); Tensor mi_update = make_tensor(mi_shapes, 1, DataType::FLOAT32, false); +#ifdef ENABLE_PROFILING prof_make_count += 3; CYCLE_COUNT_LAP(prof_make_tensor); +#endif uint32_t qi_shapes[2] = {(uint32_t)q_tile, (uint32_t)head_dim}; uint32_t qi_offsets[2] = {(uint32_t)cur_offset, 0}; @@ -167,17 +183,20 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim uint32_t out_view_shapes[2] = {(uint32_t)q_tile, (uint32_t)head_dim}; uint32_t out_view_offsets[2] = {(uint32_t)cur_offset, 0}; Tensor out_view = out.view(out_view_shapes, out_view_offsets); +#ifdef ENABLE_PROFILING prof_view_count += 2; CYCLE_COUNT_LAP(prof_tensor_view); - +#endif PTOParam params_inplace; params_inplace.add_output(oi); params_inplace.add_output(li_update); params_inplace.add_output(mi_update); CYCLE_COUNT_LAP(prof_param_setup); pto2_rt_submit_aiv_task(rt, FUNC_AIV_HUB, params_inplace); +#ifdef ENABLE_PROFILING prof_submit_count++; CYCLE_COUNT_LAP(prof_submit_task); +#endif // Reusable PTOParam objects — reset() before each use avoids // repeated stack-frame construction in the inner loop. @@ -195,8 +214,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim // === Task 1: Batched QK matmul === uint32_t sij_buf_shapes[2] = {(uint32_t)q_tile, (uint32_t)(n_blocks * block_size)}; Tensor sij_buf = make_tensor(sij_buf_shapes, 2, DataType::FLOAT32); +#ifdef ENABLE_PROFILING prof_make_count += 1; CYCLE_COUNT_LAP(prof_make_tensor); +#endif params_qk.reset(); params_qk.add_input(qi); @@ -206,16 +227,20 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim params_qk.add_scalars_i32(bt_base + bn, N_UNROLL); CYCLE_COUNT_LAP(prof_param_setup); pto2_rt_submit_aic_task(rt, FUNC_QK_MATMUL, params_qk); +#ifdef ENABLE_PROFILING prof_submit_count++; CYCLE_COUNT_LAP(prof_submit_task); +#endif // === Task 2: Two-pass softmax over all blocks in group === uint32_t pij_buf_shapes[2] = {(uint32_t)q_tile, (uint32_t)(n_blocks * block_size)}; Tensor pij_buf = make_tensor(pij_buf_shapes, 2, data_type); Tensor mi = make_tensor(mi_shapes, 1, DataType::FLOAT32); Tensor li = make_tensor(li_shapes, 1, DataType::FLOAT32); +#ifdef ENABLE_PROFILING prof_make_count += 3; CYCLE_COUNT_LAP(prof_make_tensor); +#endif params_sf.reset(); params_sf.add_input(sij_buf); @@ -227,14 +252,18 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim params_sf.add_scalar(valid_len_last); CYCLE_COUNT_LAP(prof_param_setup); pto2_rt_submit_aiv_task(rt, FUNC_SOFTMAX_PREPARE, params_sf); +#ifdef ENABLE_PROFILING prof_submit_count++; CYCLE_COUNT_LAP(prof_submit_task); +#endif // === Task 3: SplitK PV matmul (accumulated P @ V) === uint32_t oi_new_shapes[2] = {(uint32_t)q_tile, (uint32_t)head_dim}; Tensor oi_new = make_tensor(oi_new_shapes, 2, DataType::FLOAT32); +#ifdef ENABLE_PROFILING prof_make_count += 1; CYCLE_COUNT_LAP(prof_make_tensor); +#endif params_pv.reset(); params_pv.add_input(pij_buf); @@ -244,13 +273,14 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim params_pv.copy_scalars_from(params_qk, 1, N_UNROLL); CYCLE_COUNT_LAP(prof_param_setup); pto2_rt_submit_aic_task(rt, FUNC_PV_MATMUL, params_pv); +#ifdef ENABLE_PROFILING prof_submit_count++; CYCLE_COUNT_LAP(prof_submit_task); +#endif // === Task 4: Online update (per-group) === uint64_t is_first = (bn == 0) ? 1 : 0; uint64_t is_last = (bn + n_blocks >= bn_this_batch) ? 1 : 0; - CYCLE_COUNT_LAP(prof_param_extract); params_up.reset(); params_up.add_input(mi); @@ -264,15 +294,21 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim params_up.add_scalar(is_last); CYCLE_COUNT_LAP(prof_param_setup); pto2_rt_submit_aiv_task(rt, FUNC_ONLINE_UPDATE, params_up); +#ifdef ENABLE_PROFILING prof_submit_count++; CYCLE_COUNT_LAP(prof_submit_task); +#endif } } + CYCLE_COUNT_LAP(prof_scope_and_loop); } } + CYCLE_COUNT_LAP(prof_scope_and_loop); +#ifdef ENABLE_PROFILING uint64_t total = prof_param_extract + prof_ext_tensor + prof_make_tensor + - prof_tensor_view + prof_param_setup + prof_submit_task; + prof_tensor_view + prof_param_setup + prof_submit_task + + prof_scope_and_loop; LOG_ALWAYS(rt, "=== PagedAttn Orch Profiling: %d submits, %d makes, %d views, total=%.3fus ===", prof_submit_count, prof_make_count, prof_view_count, cycles_to_us(total)); if (total > 0) { @@ -291,7 +327,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry(PTO2Runtim LOG_ALWAYS(rt, " submit_task(x%d) : %7.3fus (%5.1f%%) avg=%.3fus", prof_submit_count, cycles_to_us(prof_submit_task), prof_submit_task * 100.0 / total, prof_submit_count > 0 ? cycles_to_us(prof_submit_task) / prof_submit_count : 0.0); + LOG_ALWAYS(rt, " scope_and_loop : %7.3fus (%5.1f%%)", + cycles_to_us(prof_scope_and_loop), prof_scope_and_loop * 100.0 / total); } +#endif #undef CYCLE_COUNT_START #undef CYCLE_COUNT_LAP