From 160b6ada0c36e8e8df35b96047342bf61e0bde7c Mon Sep 17 00:00:00 2001 From: Songyangyang18 Date: Wed, 18 Mar 2026 20:24:44 +0800 Subject: [PATCH] discard mix --- .../aicpu/aicpu_executor.cpp | 345 +++++++++--------- .../runtime/pto_orchestrator.cpp | 3 +- .../runtime/pto_scheduler.cpp | 9 +- .../runtime/pto_scheduler.h | 100 ++--- .../runtime/pto_submit_types.h | 27 +- 5 files changed, 234 insertions(+), 250 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index ae52455a..83c6ed4c 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -102,6 +102,19 @@ struct CoreTypeTracker { } return -1; } + + int32_t take_last_idle() { + if (idle_count == 0) return -1; + int32_t core_id = idle[idle_count - 1]; + idle_count--; + running[running_count++] = core_id; + return core_id; + } + + int32_t peek_last_idle() const { + if (idle_count == 0) return -1; + return idle[idle_count - 1]; + } }; struct Cluster { @@ -120,32 +133,6 @@ struct CoreStateTracker { template CoreTypeTracker& get() { return by_type[static_cast(CT)]; } - - int32_t find_cluster_for_shape(PTO2ResourceShape shape) { - for (int32_t i = 0; i < cluster_count; i++) { - Cluster& c = clusters[i]; - switch (shape) { - case PTO2ResourceShape::AIC_ONLY: - if (core_idle[c.aic_core_id]) return i; - break; - case PTO2ResourceShape::AIV_X1: - if (core_idle[c.aiv_core_ids[0]] || core_idle[c.aiv_core_ids[1]]) return i; - break; - case PTO2ResourceShape::AIV_X2: - if (core_idle[c.aiv_core_ids[0]] && core_idle[c.aiv_core_ids[1]]) return i; - break; - case PTO2ResourceShape::AIC_AIV_X1: - if (core_idle[c.aic_core_id] && - (core_idle[c.aiv_core_ids[0]] || core_idle[c.aiv_core_ids[1]])) return i; - break; - case PTO2ResourceShape::AIC_AIV_X2: - if (core_idle[c.aic_core_id] && - core_idle[c.aiv_core_ids[0]] && core_idle[c.aiv_core_ids[1]]) return i; - break; - } - } - return -1; - } }; struct AicpuExecutor { @@ -411,15 +398,8 @@ struct AicpuExecutor { } } - static const char* shape_name(PTO2ResourceShape shape) { - switch (shape) { - case PTO2ResourceShape::AIC_ONLY: return "AIC_ONLY"; - case PTO2ResourceShape::AIV_X1: return "AIV_X1"; - case PTO2ResourceShape::AIV_X2: return "AIV_X2"; - case PTO2ResourceShape::AIC_AIV_X1: return "AIC_AIV_X1"; - case PTO2ResourceShape::AIC_AIV_X2: return "AIC_AIV_X2"; - } - return "UNKNOWN"; + static const char* shape_name(PTO2ResourceShape) { + return "UNIFIED"; } struct ResourceCount { @@ -427,44 +407,15 @@ struct AicpuExecutor { int32_t aiv; }; - static constexpr ResourceCount shape_resource_count(PTO2ResourceShape shape) { - constexpr ResourceCount kTable[PTO2_NUM_RESOURCE_SHAPES] = { - {1, 0}, // AIC_ONLY = 0 - {0, 1}, // AIV_X1 = 1 - {0, 2}, // AIV_X2 = 2 - {1, 1}, // AIC_AIV_X1 = 3 - {1, 2}, // AIC_AIV_X2 = 4 - }; - return kTable[static_cast(shape)]; - } - - /** - * Returns the dispatch probe order for a given scheduler thread. - * Widest shapes first to avoid consuming cluster resources with narrow tasks. - * Even/odd threads use different fallback orders (AIC-first vs AIV-first) - * to reduce contention on the same ready queue across adjacent threads. - */ - static const PTO2ResourceShape* get_dispatch_order(int32_t thread_idx) { - // Even threads: AIC-first fallback after widest - static constexpr PTO2ResourceShape kEvenOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::AIC_AIV_X2, - PTO2ResourceShape::AIC_AIV_X1, - PTO2ResourceShape::AIC_ONLY, - PTO2ResourceShape::AIV_X2, - PTO2ResourceShape::AIV_X1, - }; - // Odd threads: AIV-first fallback after widest - static constexpr PTO2ResourceShape kOddOrder[PTO2_NUM_RESOURCE_SHAPES] = { - PTO2ResourceShape::AIC_AIV_X2, - PTO2ResourceShape::AIV_X2, - PTO2ResourceShape::AIC_AIV_X1, - PTO2ResourceShape::AIV_X1, - PTO2ResourceShape::AIC_ONLY, - }; - return (thread_idx % 2 == 0) ? kEvenOrder : kOddOrder; + static ResourceCount get_task_resource_count(uint8_t active_mask) { + ResourceCount rc; + rc.aic = (active_mask & PTO2_SUBTASK_MASK_AIC) ? 1 : 0; + rc.aiv = ((active_mask & PTO2_SUBTASK_MASK_AIV0) ? 1 : 0) + + ((active_mask & PTO2_SUBTASK_MASK_AIV1) ? 1 : 0); + return rc; } - PTO2TaskSlotState* pop_ready_task(PTO2ResourceShape shape, int32_t thread_idx + PTO2TaskSlotState* pop_ready_task(int32_t thread_idx #if PTO2_SCHED_PROFILING , uint64_t& pop_hit, uint64_t& pop_miss , uint64_t& sched_dispatch_pop_cycle @@ -474,11 +425,11 @@ struct AicpuExecutor { #if PTO2_SCHED_PROFILING extern uint64_t g_sched_pop_atomic_count[], g_sched_pop_wait_cycle[]; uint64_t t_pop_start = get_sys_cnt_aicpu(); - PTO2TaskSlotState* slot_state = rt->scheduler.get_ready_task(shape, + PTO2TaskSlotState* slot_state = rt->scheduler.get_ready_task( g_sched_pop_atomic_count[thread_idx], g_sched_pop_wait_cycle[thread_idx]); sched_dispatch_pop_cycle += (get_sys_cnt_aicpu() - t_pop_start); #else - PTO2TaskSlotState* slot_state = rt->scheduler.get_ready_task(shape); + PTO2TaskSlotState* slot_state = rt->scheduler.get_ready_task(); #endif if (slot_state) { #if PTO2_SCHED_PROFILING @@ -539,6 +490,47 @@ struct AicpuExecutor { tracker.core_idle[core_id] = false; executing_reg_task_ids[core_id] = reg_task_id; } + + void dispatch_from_idle_pool( + Runtime* runtime, CoreStateTracker& tracker, int32_t* executing_reg_task_ids, + CoreType core_type, PTO2TaskSlotState& slot_state, + PTO2SubtaskSlot subslot +#if PTO2_PROFILING + , bool profiling_enabled, int32_t thread_idx +#endif + ) { + CoreTypeTracker& ct = tracker.by_type[static_cast(core_type)]; + int32_t core_id = ct.take_last_idle(); + if (core_id < 0) return; + + PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id]; + PTO2TaskDescriptor& task = *slot_state.task; + int32_t slot_idx = static_cast(subslot); + build_pto2_payload(payload, task.kernel_id[slot_idx], *slot_state.payload); + executing_subslot_by_core_[core_id] = subslot; + executing_slot_state_by_core_[core_id] = &slot_state; +#if PTO2_PROFILING + if (profiling_enabled) { + dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); + if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(runtime, core_id, thread_idx); + core_dispatch_counts_[core_id] = 0; + } + core_dispatch_counts_[core_id]++; + } +#endif + dispatch_seq_by_core_[core_id]++; + uint32_t reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; + while (reg_task_id == AICORE_IDLE_TASK_ID || + (reg_task_id + 1) == AICORE_EXIT_SIGNAL) { + dispatch_seq_by_core_[core_id]++; + reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; + } + write_reg(core_id_to_reg_addr_[core_id], RegId::DATA_MAIN_BASE, static_cast(reg_task_id)); + + tracker.core_idle[core_id] = false; + executing_reg_task_ids[core_id] = reg_task_id; + } }; static AicpuExecutor g_aicpu_executor; @@ -1064,7 +1056,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Check AIC running cores bool try_completed = false; - always_assert(local_bufs[0].count == 0 && local_bufs[1].count == 0); // Invariant: previous iteration fully consumed + always_assert(local_bufs[0].count == 0); // Invariant: previous iteration fully consumed if (tracker.aic().running_count > 0) { try_completed = true; check_running_cores_for_completion( @@ -1133,143 +1125,143 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa #endif // Phase 2: Local dispatch — drain local_bufs, match to idle clusters (zero MPMC operations) - // Phase 3: Global queue — push overflow to readyQ + fill remaining idle cores from readyQ + // Phase 2 & 3: Unified dispatch — drain local_bufs first, then global readyQ + // Break cluster constraint: use global AIC/AIV resource pools for maximum utilization bool try_pushed = false; - // Local dispatch: drain both per-CoreType local_bufs, match to idle clusters by shape + // Local dispatch: drain unified local_buf, allocate from global resource pools PTO2TaskSlotState* overflow_ptrs[LOCAL_READY_CAP_PER_TYPE * PTO2_LOCAL_DISPATCH_TYPE_NUM]; int overflow_count = 0; - for (int bi = 0; bi < PTO2_LOCAL_DISPATCH_TYPE_NUM; bi++) { - while (local_bufs[bi].count > 0) { - PTO2TaskSlotState* slot_state = local_bufs[bi].pop(); - PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state->active_mask); - int32_t ci = tracker.find_cluster_for_shape(shape); + while (local_bufs[0].count > 0) { + PTO2TaskSlotState* slot_state = local_bufs[0].pop(); + ResourceCount rc = get_task_resource_count(slot_state->active_mask); - if (ci >= 0) { - try_pushed = true; - Cluster& c = tracker.clusters[ci]; + // Check global resource availability (no cluster constraint) + bool can_dispatch = true; + if (rc.aic > 0 && tracker.aic().idle_count < rc.aic) can_dispatch = false; + if (rc.aiv > 0 && tracker.aiv().idle_count < rc.aiv) can_dispatch = false; + + if (can_dispatch) { + try_pushed = true; #if PTO2_SCHED_PROFILING - uint64_t t_setup_start = get_sys_cnt_aicpu(); + uint64_t t_setup_start = get_sys_cnt_aicpu(); #endif - ResourceCount rc = shape_resource_count(shape); - - if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC + // Allocate AIC from global idle pool (O(1) fast path) + if (rc.aic > 0) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled, thread_idx #endif - ); - } - if (rc.aiv >= 1) { - int32_t aiv0 = tracker.core_idle[c.aiv_core_ids[0]] ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 + ); + } + // Allocate AIVs from global idle pool (O(1) fast path) + if (rc.aiv >= 1) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled, thread_idx #endif - ); - } - if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 + ); + } + if (rc.aiv >= 2) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled, thread_idx #endif - ); - } + ); + } #if PTO2_PROFILING - phase_dispatch_count++; + phase_dispatch_count++; #endif #if PTO2_SCHED_PROFILING - pop_hit++; - local_dispatch_count++; - sched_dispatch_setup_cycle += (get_sys_cnt_aicpu() - t_setup_start); + pop_hit++; + local_dispatch_count++; + sched_dispatch_setup_cycle += (get_sys_cnt_aicpu() - t_setup_start); #endif - made_progress = true; - DEV_DEBUG("Thread %d: Dispatching %s task %lld to cluster %d (local)", - thread_idx, - shape_name(shape), - (long long)pto2_task_id_raw(slot_state->task->mixed_task_id), - ci); - } else { - overflow_ptrs[overflow_count++] = slot_state; + made_progress = true; + DEV_DEBUG("Thread %d: Dispatching task %lld (local, aic=%d aiv=%d) from global pools", + thread_idx, + (long long)pto2_task_id_raw(slot_state->task->mixed_task_id), + rc.aic, rc.aiv); + } else { + overflow_ptrs[overflow_count++] = slot_state; #if PTO2_SCHED_PROFILING - local_overflow_count++; + local_overflow_count++; #endif - } } } - // Push overflow to global readyQ (shape-based) + // Push overflow to global readyQ for (int i = 0; i < overflow_count; i++) { rt->scheduler.requeue_ready_task(*overflow_ptrs[i]); } - // Phase 3: Global dispatch — fill remaining idle cores from global readyQ (cluster-based) - const PTO2ResourceShape* dispatch_order = get_dispatch_order(thread_idx); + // Phase 3: Global dispatch — batch get tasks from unified readyQ and dispatch from global pools + if (tracker.aic().idle_count > 0 || tracker.aiv().idle_count > 0) { + // Batch get ready tasks (estimate max tasks we can dispatch based on resource availability) + int32_t max_batch = tracker.aic().idle_count + (tracker.aiv().idle_count / 2) + 1; + PTO2TaskSlotState* batch_tasks[256]; + int32_t batch_count = rt->scheduler.get_ready_tasks_batch(batch_tasks, + (max_batch > 256) ? 256 : max_batch); - for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) { - PTO2ResourceShape shape = dispatch_order[si]; - if (rt->scheduler.ready_queues[static_cast(shape)].size() == 0) continue; + for (int32_t bi = 0; bi < batch_count; bi++) { + PTO2TaskSlotState* slot_state = batch_tasks[bi]; + ResourceCount rc = get_task_resource_count(slot_state->active_mask); - while (true) { - int32_t ci = tracker.find_cluster_for_shape(shape); - if (ci < 0) break; + // Check global resource availability (no cluster constraint) + bool can_dispatch = true; + if (rc.aic > 0 && tracker.aic().idle_count < rc.aic) can_dispatch = false; + if (rc.aiv > 0 && tracker.aiv().idle_count < rc.aiv) can_dispatch = false; - PTO2TaskSlotState* slot_state = pop_ready_task(shape, thread_idx + if (can_dispatch) { + try_pushed = true; #if PTO2_SCHED_PROFILING - , pop_hit, pop_miss - , sched_dispatch_pop_cycle + uint64_t t_setup_start = get_sys_cnt_aicpu(); #endif - ); - if (!slot_state) break; - - try_pushed = true; + // Allocate AIC from global idle pool (O(1) fast path) + if (rc.aic > 0) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING - phase_dispatch_count++; -#endif -#if PTO2_SCHED_PROFILING - uint64_t t_setup_start = get_sys_cnt_aicpu(); + , profiling_enabled, thread_idx #endif - Cluster& c = tracker.clusters[ci]; - ResourceCount rc = shape_resource_count(shape); - - if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC + ); + } + // Allocate AIVs from global idle pool (O(1) fast path) + if (rc.aiv >= 1) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled, thread_idx #endif - ); - } - if (rc.aiv >= 1) { - int32_t aiv_id = tracker.core_idle[c.aiv_core_ids[0]] - ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - aiv_id, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 + ); + } + if (rc.aiv >= 2) { + dispatch_from_idle_pool(runtime, tracker, executing_reg_task_ids, + CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled, thread_idx #endif - ); - } - if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 + ); + } + made_progress = true; #if PTO2_PROFILING - , profiling_enabled, thread_idx + phase_dispatch_count++; #endif - ); - } - made_progress = true; #if PTO2_SCHED_PROFILING - sched_dispatch_setup_cycle += (get_sys_cnt_aicpu() - t_setup_start); + pop_hit++; + sched_dispatch_setup_cycle += (get_sys_cnt_aicpu() - t_setup_start); #endif - DEV_DEBUG("Thread %d: Dispatching %s task %lld to cluster %d", - thread_idx, - shape_name(shape), - (long long)pto2_task_id_raw(slot_state->task->mixed_task_id), - ci); + DEV_DEBUG("Thread %d: Dispatching task %lld (global, aic=%d aiv=%d) from global pools", + thread_idx, + (long long)pto2_task_id_raw(slot_state->task->mixed_task_id), + rc.aic, rc.aiv); + } else { + // Requeue task that couldn't be dispatched + rt->scheduler.requeue_ready_task(*slot_state); + } } } @@ -2079,16 +2071,11 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int32_t thread_idx, DEV_ALWAYS("Progress: %d/%d tasks (%.1f%%)", completed, total, total > 0 ? completed * 100.0 / total : 0.0); - uint64_t aic_ready = 0, aiv_ready = 0, aiv_x2_ready = 0, mixed_x1_ready = 0, mixed_x2_ready = 0; + uint64_t unified_ready = 0; if (rt) { - aic_ready = sched->ready_queues[static_cast(PTO2ResourceShape::AIC_ONLY)].size(); - aiv_ready = sched->ready_queues[static_cast(PTO2ResourceShape::AIV_X1)].size(); - aiv_x2_ready = sched->ready_queues[static_cast(PTO2ResourceShape::AIV_X2)].size(); - mixed_x1_ready = sched->ready_queues[static_cast(PTO2ResourceShape::AIC_AIV_X1)].size(); - mixed_x2_ready = sched->ready_queues[static_cast(PTO2ResourceShape::AIC_AIV_X2)].size(); + unified_ready = sched->ready_queues[0].size(); } - DEV_ALWAYS("Ready Queues: AIC=%lu, AIV=%lu, AIV_X2=%lu, AIC_AIV_X1=%lu, AIC_AIV_X2=%lu", - aic_ready, aiv_ready, aiv_x2_ready, mixed_x1_ready, mixed_x2_ready); + DEV_ALWAYS("Ready Queue: UNIFIED=%lu", unified_ready); int32_t busy_cores = 0; int32_t idle_cores = 0; @@ -2130,7 +2117,7 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int32_t thread_idx, DEV_ALWAYS("Summary: %d busy, %d idle", busy_cores, idle_cores); // Diagnose deadlock vs livelock - if (busy_cores == 0 && aic_ready == 0 && aiv_ready == 0 && completed < total) { + if (busy_cores == 0 && unified_ready == 0 && completed < total) { DEV_ALWAYS("*** DEADLOCK DETECTED ***"); DEV_ALWAYS("All cores idle, no ready tasks, but %d tasks incomplete", total - completed); DEV_ALWAYS("Check PTO2 shared memory for task dependency state"); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index e21bd291..02d20806 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -639,8 +639,7 @@ void pto2_submit_mixed_task( int32_t new_rc = cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel) + initial_refcount; if (new_rc >= fanin_count + 1) { - PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask); - sched->ready_queues[static_cast(shape)].push(&cur_slot_state); + sched->ready_queues[0].push(&cur_slot_state); } // Record dep pool watermark in local slot state (used by tail reclamation) cur_slot_state.dep_pool_mark = orch->rings[ring_id].dep_pool.top; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp index 7e2abca3..d32b475f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.cpp @@ -215,13 +215,6 @@ void pto2_scheduler_print_stats(PTO2SchedulerState* sched) { void pto2_scheduler_print_queues(PTO2SchedulerState* sched) { LOG_INFO("=== Ready Queues ==="); - - const char* shape_names[] = {"AIC_ONLY", "AIV_X1", "AIV_X2", "AIC_AIV_X1", "AIC_AIV_X2"}; - - for (int i = 0; i < PTO2_NUM_RESOURCE_SHAPES; i++) { - LOG_INFO(" %s: count=%" PRIu64, shape_names[i], - sched->ready_queues[i].size()); - } - + LOG_INFO(" UNIFIED: count=%" PRIu64, sched->ready_queues[0].size()); LOG_INFO("===================="); } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h index b3b16ef0..73580b27 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_scheduler.h @@ -48,16 +48,15 @@ struct PTO2ReadyQueueSlot { /** * Thread-local ready buffer for local-first dispatch optimization. * - * Two buffers per scheduling thread, one per CoreType (AIC=0, AIV=1). + * Single unified buffer per scheduling thread (no longer split by CoreType). * Initialized once before the scheduling loop; must be empty at * the start of each iteration (verified by always_assert). * - * Phase 1 fills per-CoreType buffers via on_task_complete(). - * dispatch_ready_tasks_to_idle_cores drains them: local-first via + * Phase 1 fills the unified buffer via on_task_complete(). + * dispatch_ready_tasks_to_idle_cores drains it: local-first via * get_ready_task, then remaining tasks pushed to global readyQ. */ -// Number of CoreType values eligible for local dispatch (AIC=0, AIV=1) -static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 2; +static constexpr int PTO2_LOCAL_DISPATCH_TYPE_NUM = 1; struct PTO2LocalReadyBuffer { PTO2TaskSlotState** slot_states = nullptr; @@ -425,24 +424,21 @@ struct PTO2SchedulerState { bool release_fanin_and_check_ready(PTO2TaskSlotState& slot_state, PTO2LocalReadyBuffer* local_bufs = nullptr) { - // Atomically increment fanin_refcount and check if all producers are done - // ACQ_REL on fanin_refcount already synchronizes with the orchestrator's - // init release, making fanin_count visible — plain load suffices. int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; if (new_refcount == slot_state.fanin_count) { - // Local-first: try per-CoreType thread-local buffer before global queue - // Route by active_mask: AIC-containing tasks → buf[0], AIV-only → buf[1] - PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state.active_mask); - bool pushed_local = false; - if (local_bufs) { - int32_t buf_idx = (slot_state.active_mask & 0x01) ? 0 : 1; - pushed_local = local_bufs[buf_idx].try_push(&slot_state); - } - if (!pushed_local) { - ready_queues[static_cast(shape)].push(&slot_state); + PTO2TaskState expected = PTO2_TASK_PENDING; + if (slot_state.task_state.compare_exchange_strong( + expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire)) { + bool pushed_local = false; + if (local_bufs) { + pushed_local = local_bufs[0].try_push(&slot_state); + } + if (!pushed_local) { + ready_queues[0].push(&slot_state); + } + return true; } - return true; } return false; } @@ -452,22 +448,19 @@ struct PTO2SchedulerState { uint64_t& atomic_count, uint64_t& push_wait, PTO2LocalReadyBuffer* local_bufs = nullptr) { int32_t new_refcount = slot_state.fanin_refcount.fetch_add(1, std::memory_order_acq_rel) + 1; - atomic_count += 1; // fanin_refcount.fetch_add + atomic_count += 1; if (new_refcount == slot_state.fanin_count) { PTO2TaskState expected = PTO2_TASK_PENDING; if (slot_state.task_state.compare_exchange_strong( expected, PTO2_TASK_READY, std::memory_order_acq_rel, std::memory_order_acquire)) { - atomic_count += 1; // CAS(task_state PENDING→READY) - // Local-first: try per-CoreType thread-local buffer before global queue - PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state.active_mask); + atomic_count += 1; bool pushed_local = false; if (local_bufs) { - int32_t buf_idx = (slot_state.active_mask & 0x01) ? 0 : 1; - pushed_local = local_bufs[buf_idx].try_push(&slot_state); + pushed_local = local_bufs[0].try_push(&slot_state); } if (!pushed_local) { - ready_queues[static_cast(shape)].push(&slot_state, atomic_count, push_wait); + ready_queues[0].push(&slot_state, atomic_count, push_wait); } return true; } @@ -476,42 +469,65 @@ struct PTO2SchedulerState { } #endif - PTO2TaskSlotState* get_ready_task(PTO2ResourceShape shape) { - return ready_queues[static_cast(shape)].pop(); + PTO2TaskSlotState* get_ready_task() { + return ready_queues[0].pop(); + } + + PTO2TaskSlotState* get_ready_task(PTO2ResourceShape) { + return ready_queues[0].pop(); } template PTO2TaskSlotState* get_ready_task(PTO2LocalReadyBuffer* local_bufs) { - constexpr int ct = static_cast(CT); - if (local_bufs && local_bufs[ct].count > 0) { - return local_bufs[ct].pop(); + if (local_bufs && local_bufs[0].count > 0) { + return local_bufs[0].pop(); } - return ready_queues[ct].pop(); + return ready_queues[0].pop(); } #if PTO2_SCHED_PROFILING - PTO2TaskSlotState* get_ready_task(PTO2ResourceShape shape, uint64_t& atomic_count, uint64_t& wait_cycle) { - return ready_queues[static_cast(shape)].pop(atomic_count, wait_cycle); + PTO2TaskSlotState* get_ready_task(PTO2ResourceShape, uint64_t& atomic_count, uint64_t& wait_cycle) { + return ready_queues[0].pop(atomic_count, wait_cycle); } template PTO2TaskSlotState* get_ready_task(PTO2LocalReadyBuffer* local_bufs, uint64_t& atomic_count, uint64_t& wait_cycle) { - constexpr int ct = static_cast(CT); - if (local_bufs && local_bufs[ct].count > 0) { - return local_bufs[ct].pop(); + if (local_bufs && local_bufs[0].count > 0) { + return local_bufs[0].pop(); } - return ready_queues[ct].pop(atomic_count, wait_cycle); + return ready_queues[0].pop(atomic_count, wait_cycle); } #endif /** - * Requeue a ready task that could not be dispatched (no suitable cluster). - * Pushes the task back into its shape-based queue. + * Batch get ready tasks from the unified queue. + * Returns the number of tasks retrieved. + */ + int32_t get_ready_tasks_batch(PTO2TaskSlotState** out_tasks, int32_t max_count) { + int32_t count = 0; + while (count < max_count) { + PTO2TaskSlotState* task = ready_queues[0].pop(); + if (!task) break; + out_tasks[count++] = task; + } + return count; + } + + /** + * Requeue a ready task that could not be dispatched. */ void requeue_ready_task(PTO2TaskSlotState& slot_state) { - PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state.active_mask); - ready_queues[static_cast(shape)].push(&slot_state); + ready_queues[0].push(&slot_state); + } + + /** + * Requeue multiple ready tasks. + */ + void requeue_ready_tasks(PTO2TaskSlotState** tasks, int32_t count) { + for (int32_t i = 0; i < count; i++) { + ready_queues[0].push(tasks[i]); + } } void on_scope_end(PTO2TaskSlotState** task_slot_states, int32_t count) { diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_submit_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_submit_types.h index 177781a3..e293a0ad 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_submit_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_submit_types.h @@ -53,34 +53,23 @@ struct MixedKernels { }; /** - * Resource shape — classifies a MixedKernels into one of 5 queue buckets. + * Resource shape — unified single shape for simplified scheduling. + * All tasks now use a single global ready queue, eliminating shape-based classification. + * The active_mask still determines which cores (AIC/AIV) a task requires. */ enum class PTO2ResourceShape : uint8_t { - AIC_ONLY = 0, // AIC only - AIV_X1 = 1, // One AIV slot - AIV_X2 = 2, // Both AIV slots - AIC_AIV_X1 = 3, // AIC + one AIV - AIC_AIV_X2 = 4, // AIC + both AIV + UNIFIED = 0, // Single unified shape for all tasks }; -inline constexpr int32_t PTO2_NUM_RESOURCE_SHAPES = 5; +inline constexpr int32_t PTO2_NUM_RESOURCE_SHAPES = 1; /** * Derive resource shape from active_mask. - * Caller must ensure active_mask is valid (at least one bit set). + * Always returns UNIFIED shape since we no longer classify by resource type. */ static inline PTO2ResourceShape pto2_active_mask_to_shape(uint8_t active_mask) { - bool has_aic = (active_mask & PTO2_SUBTASK_MASK_AIC) != 0; - int aiv_count = ((active_mask & PTO2_SUBTASK_MASK_AIV0) != 0) - + ((active_mask & PTO2_SUBTASK_MASK_AIV1) != 0); - - if (has_aic) { - if (aiv_count == 0) return PTO2ResourceShape::AIC_ONLY; - if (aiv_count == 1) return PTO2ResourceShape::AIC_AIV_X1; - return PTO2ResourceShape::AIC_AIV_X2; - } - if (aiv_count == 1) return PTO2ResourceShape::AIV_X1; - return PTO2ResourceShape::AIV_X2; + (void)active_mask; + return PTO2ResourceShape::UNIFIED; } /**