From b723f302c8d5fd3e1e924d05cd3754969d0933a4 Mon Sep 17 00:00:00 2001 From: qq_52905393 Date: Sat, 14 Mar 2026 11:16:02 +0800 Subject: [PATCH] Support for pending and running task states --- .../aicpu/aicpu_executor.cpp | 572 ++++++++++++------ 1 file changed, 373 insertions(+), 199 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 495bafa1..8aca5529 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -65,6 +65,7 @@ constexpr int32_t STALL_DUMP_WAIT_MAX = 4; constexpr int32_t STALL_DUMP_CORE_MAX = 8; constexpr int32_t PROGRESS_VERBOSE_THRESHOLD = 10; // log every completion for the first N tasks constexpr int32_t PROGRESS_LOG_INTERVAL = 250; // log every N completions after threshold +constexpr int32_t AICORE_TYPE_NUM = 2; static PTO2Runtime *rt{nullptr}; @@ -116,33 +117,113 @@ struct CoreStateTracker { Cluster clusters[MAX_AIC_PER_THREAD]; int32_t cluster_count; bool core_idle[MAX_CORES_PER_THREAD]; + int32_t pending_id[MAX_CORES_PER_THREAD]; + int32_t running_id[MAX_CORES_PER_THREAD]; + int32_t core_pend_ready_cnt[AICORE_TYPE_NUM]; + int32_t core_run_ready_cnt[AICORE_TYPE_NUM]; + uint32_t run_ready_core_idx[AICORE_TYPE_NUM][MAX_AIV_PER_THREAD]; CoreTypeTracker& aic() { return by_type[0]; } CoreTypeTracker& aiv() { return by_type[1]; } template CoreTypeTracker& get() { return by_type[static_cast(CT)]; } + + bool is_core_pend_ready(int core_id) { + return pending_id[core_id] == AICPU_TASK_INVALID; + } + + bool is_core_run_ready(int core_id) { + return running_id[core_id] == AICPU_TASK_INVALID && pending_id[core_id] == AICPU_TASK_INVALID; + } int32_t find_cluster_for_shape(PTO2ResourceShape shape) { + // find run ready first for (int32_t i = 0; i < cluster_count; i++) { Cluster& c = clusters[i]; switch (shape) { case PTO2ResourceShape::AIC_ONLY: - if (core_idle[c.aic_core_id]) return i; + if (is_core_run_ready(c.aic_core_id)) { + core_run_ready_cnt[static_cast(CoreType::AIC)]--; + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + return i; + } break; case PTO2ResourceShape::AIV_X1: - if (core_idle[c.aiv_core_ids[0]] || core_idle[c.aiv_core_ids[1]]) return i; + if (is_core_run_ready(c.aiv_core_ids[0]) || is_core_run_ready(c.aiv_core_ids[1])) { + core_run_ready_cnt[static_cast(CoreType::AIV)]--; + core_pend_ready_cnt[static_cast(CoreType::AIV)]--; + return i; + } break; case PTO2ResourceShape::AIV_X2: - if (core_idle[c.aiv_core_ids[0]] && core_idle[c.aiv_core_ids[1]]) return i; + if (is_core_run_ready(c.aiv_core_ids[0]) && is_core_run_ready(c.aiv_core_ids[1])) { + core_run_ready_cnt[static_cast(CoreType::AIV)] -= 2; + core_pend_ready_cnt[static_cast(CoreType::AIV)] -= 2; + return i; + } break; case PTO2ResourceShape::AIC_AIV_X1: - if (core_idle[c.aic_core_id] && - (core_idle[c.aiv_core_ids[0]] || core_idle[c.aiv_core_ids[1]])) return i; + if (is_core_run_ready(c.aic_core_id) && + (is_core_run_ready(c.aiv_core_ids[0]) || is_core_run_ready(c.aiv_core_ids[1]))) { + core_run_ready_cnt[static_cast(CoreType::AIC)]--; + core_run_ready_cnt[static_cast(CoreType::AIV)]--; + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + core_pend_ready_cnt[static_cast(CoreType::AIV)]--; + return i; + } break; case PTO2ResourceShape::AIC_AIV_X2: - if (core_idle[c.aic_core_id] && - core_idle[c.aiv_core_ids[0]] && core_idle[c.aiv_core_ids[1]]) return i; + if (is_core_run_ready(c.aic_core_id) && + is_core_run_ready(c.aiv_core_ids[0]) && is_core_run_ready(c.aiv_core_ids[1])) { + core_run_ready_cnt[static_cast(CoreType::AIC)]--; + core_run_ready_cnt[static_cast(CoreType::AIV)] -= 2; + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + core_pend_ready_cnt[static_cast(CoreType::AIV)] -= 2; + return i; + } + break; + } + } + + // find pend ready if can not find run ready + for (int32_t i = 0; i < cluster_count; i++) { + Cluster& c = clusters[i]; + switch (shape) { + case PTO2ResourceShape::AIC_ONLY: + if (is_core_pend_ready(c.aic_core_id)) { + DEV_INFO("pending_id[core_id] : %d ", pending_id[c.aic_core_id]); + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + return i; + } + break; + case PTO2ResourceShape::AIV_X1: + if (is_core_pend_ready(c.aiv_core_ids[0]) || is_core_pend_ready(c.aiv_core_ids[1])) { + core_pend_ready_cnt[static_cast(CoreType::AIV)]--; + return i; + } + break; + case PTO2ResourceShape::AIV_X2: + if (is_core_pend_ready(c.aiv_core_ids[0]) && is_core_pend_ready(c.aiv_core_ids[1])) { + core_pend_ready_cnt[static_cast(CoreType::AIV)] -= 2; + return i; + } + break; + case PTO2ResourceShape::AIC_AIV_X1: + if (is_core_pend_ready(c.aic_core_id) && + (is_core_pend_ready(c.aiv_core_ids[0]) || is_core_pend_ready(c.aiv_core_ids[1]))) { + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + core_pend_ready_cnt[static_cast(CoreType::AIV)]--; + return i; + } + break; + case PTO2ResourceShape::AIC_AIV_X2: + if (is_core_pend_ready(c.aic_core_id) && + is_core_pend_ready(c.aiv_core_ids[0]) && is_core_pend_ready(c.aiv_core_ids[1])) { + core_pend_ready_cnt[static_cast(CoreType::AIC)]--; + core_pend_ready_cnt[static_cast(CoreType::AIV)] -= 2; + return i; + } break; } } @@ -251,24 +332,20 @@ struct AicpuExecutor { } } - // Template methods for Phase 1 and Phase 2 template - void check_running_cores_for_completion(int32_t thread_idx, - CoreTypeTracker& ct, - bool* core_idle, + void finalize_core_task_execution(int32_t thread_idx, Handshake* hank, - int32_t* executing_task_ids, int32_t& completed_this_turn, int32_t& cur_thread_completed, bool& made_progress, PTO2TaskSlotState* deferred_release_slot_states[], int32_t& deferred_release_count, - PTO2LocalReadyBuffer* local_bufs + PTO2LocalReadyBuffer* local_bufs, + int mix_task_id, + int core_id #if PTO2_PROFILING , bool profiling_enabled, - uint64_t& complete_probe_count, - uint64_t& complete_hit_count, uint32_t& phase_complete_count, uint64_t& notify_edges_total, int32_t& notify_max_degree, @@ -281,111 +358,201 @@ struct AicpuExecutor { uint64_t& sched_complete_perf_cycle #endif ) { - for (int32_t i = ct.running_count - 1; i >= 0; i--) { - int32_t core_id = ct.running[i]; - uint64_t reg_addr = core_id_to_reg_addr_[core_id]; - - int32_t task_id = executing_task_ids[core_id]; - uint64_t reg_val = read_reg(reg_addr, RegId::COND); - int32_t reg_task_id = EXTRACT_TASK_ID(reg_val); - int32_t reg_state = EXTRACT_TASK_STATE(reg_val); - bool done = reg_task_id == task_id && reg_state == TASK_FIN_STATE; -#if PTO2_PROFILING - if (profiling_enabled) { - complete_probe_count++; - if (done) { - complete_hit_count++; - } - } -#endif - - if (done) { - executing_task_ids[core_id] = AICPU_TASK_INVALID; - int32_t mixed_task_id = task_id; - PTO2SubtaskSlot subslot = s_executing_subslot[core_id]; - PTO2TaskSlotState& slot_state = rt->scheduler.get_slot_state_by_task_id(mixed_task_id); - - // Two-stage completion: mark subtask done, then handle mixed-task completion - bool mixed_complete = rt->scheduler.on_subtask_complete(slot_state, subslot); - if (mixed_complete) { + PTO2SubtaskSlot subslot = s_executing_subslot[core_id]; + PTO2TaskSlotState& slot_state = rt->scheduler.get_slot_state_by_task_id(mix_task_id); + bool mixed_complete = rt->scheduler.on_subtask_complete(slot_state, subslot); + + if (mixed_complete) { #if PTO2_SCHED_PROFILING - PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, thread_idx, local_bufs); - notify_edges_total += cstats.fanout_edges; - if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; - notify_tasks_enqueued += cstats.tasks_enqueued; - phase_complete_count++; + PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, thread_idx, local_bufs); + notify_edges_total += cstats.fanout_edges; + if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; + notify_tasks_enqueued += cstats.tasks_enqueued; + phase_complete_count++; #elif PTO2_PROFILING - PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); - notify_edges_total += cstats.fanout_edges; - if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; - notify_tasks_enqueued += cstats.tasks_enqueued; - phase_complete_count++; + PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); + notify_edges_total += cstats.fanout_edges; + if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; + notify_tasks_enqueued += cstats.tasks_enqueued; + phase_complete_count++; #else - rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); + rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); #endif - if (deferred_release_count < 256) { - deferred_release_slot_states[deferred_release_count++] = &slot_state; - } else { - DEV_ALWAYS("Thread %d: release", thread_idx); - while (deferred_release_count > 0) { + if (deferred_release_count < 256) { + deferred_release_slot_states[deferred_release_count++] = &slot_state; + } else { + DEV_ALWAYS("Thread %d: release", thread_idx); + while (deferred_release_count > 0) { #if PTO2_SCHED_PROFILING - int32_t fe = rt->scheduler.on_task_release( - *deferred_release_slot_states[--deferred_release_count], thread_idx); + int32_t fe = rt->scheduler.on_task_release( + *deferred_release_slot_states[--deferred_release_count], thread_idx); #else - int32_t fe = - rt->scheduler.on_task_release(*deferred_release_slot_states[--deferred_release_count]); + int32_t fe = + rt->scheduler.on_task_release(*deferred_release_slot_states[--deferred_release_count]); #endif - (void)fe; + (void)fe; #if PTO2_PROFILING - fanin_edges_total += fe; - if (fe > fanin_max_degree) fanin_max_degree = fe; + fanin_edges_total += fe; + if (fe > fanin_max_degree) fanin_max_degree = fe; #endif - } - deferred_release_slot_states[deferred_release_count++] = &slot_state; - } } - ct.move_running_to_idle(i); - core_idle[core_id] = true; + deferred_release_slot_states[deferred_release_count++] = &slot_state; + } + } #if PTO2_PROFILING - if (profiling_enabled) { + if (profiling_enabled) { #if PTO2_SCHED_PROFILING - uint64_t t_perf_start = get_sys_cnt_aicpu(); + uint64_t t_perf_start = get_sys_cnt_aicpu(); #endif - Handshake* h = &hank[core_id]; - uint64_t finish_ts = get_sys_cnt_aicpu(); - PerfBuffer* perf_buf = (PerfBuffer*)h->perf_records_addr; - rmb(); - uint32_t count = perf_buf->count; - if (count > 0) { - PerfRecord* record = &perf_buf->records[count - 1]; - if (record->task_id == static_cast(task_id)) { - // Fill metadata that AICore doesn't know - int32_t perf_slot_idx = static_cast(s_executing_subslot[core_id]); - record->func_id = slot_state.task->kernel_id[perf_slot_idx]; - record->core_type = CT; - perf_aicpu_record_dispatch_and_finish_time( - record, dispatch_timestamps_[core_id], finish_ts); - } - } + Handshake* h = &hank[core_id]; + uint64_t finish_ts = get_sys_cnt_aicpu(); + PerfBuffer* perf_buf = (PerfBuffer*)h->perf_records_addr; + rmb(); + uint32_t count = perf_buf->count; + if (count > 0) { + PerfRecord* record = &perf_buf->records[count - 1]; + if (record->task_id == static_cast(mix_task_id)) { + // Fill metadata that AICore doesn't know + int32_t perf_slot_idx = static_cast(s_executing_subslot[core_id]); + record->func_id = slot_state.task->kernel_id[perf_slot_idx]; + record->core_type = CT; + perf_aicpu_record_dispatch_and_finish_time( + record, dispatch_timestamps_[core_id], finish_ts); + } + } #if PTO2_SCHED_PROFILING - sched_complete_perf_cycle += (get_sys_cnt_aicpu() - t_perf_start); + sched_complete_perf_cycle += (get_sys_cnt_aicpu() - t_perf_start); #endif - } + } #endif + // DEV_DEBUG("Thread %d: %s core %d completed PTO2 task %d (mixed_complete=%d)", + // thread_idx, + // CT == CoreType::AIC ? "AIC" : "AIV", + // core_id, + // mix_task_id, + // mixed_complete ? 1 : 0); + cur_thread_completed++; + if (mixed_complete) { + completed_this_turn++; + } + DEV_INFO("=========="); + made_progress = true; + } - DEV_DEBUG("Thread %d: %s core %d completed PTO2 task %d (mixed_complete=%d)", - thread_idx, - CT == CoreType::AIC ? "AIC" : "AIV", - core_id, - task_id, - mixed_complete ? 1 : 0); - cur_thread_completed++; - if (mixed_complete) { - completed_this_turn++; + // Template methods for Phase 1 and Phase 2 + template + void check_running_cores_for_completion(int32_t thread_idx, + Handshake* hank, + int32_t& completed_this_turn, + int32_t& cur_thread_completed, + bool& made_progress, + PTO2TaskSlotState* deferred_release_slot_states[], + int32_t& deferred_release_count, + PTO2LocalReadyBuffer* local_bufs +#if PTO2_PROFILING + , + bool profiling_enabled, + uint64_t& complete_probe_count, + uint64_t& complete_hit_count, + uint32_t& phase_complete_count, + uint64_t& notify_edges_total, + int32_t& notify_max_degree, + uint64_t& notify_tasks_enqueued, + uint64_t& fanin_edges_total, + int32_t& fanin_max_degree, + uint64_t& resolve_total, + uint64_t& resolve_cnt +#endif +#if PTO2_SCHED_PROFILING + , + uint64_t& sched_complete_perf_cycle +#endif + ) { + resolve_cnt++; + auto resolve_t0 = get_sys_cnt_aicpu(); + CoreStateTracker& tracker = trackers_[thread_idx]; + int iteration_cnt = CT == CoreType::AIC ? 1 : 2; + for (int i = 0; i < tracker.cluster_count * iteration_cnt; i++) { + int core_id = tracker.clusters[i].aic_core_id; + if (CT == CoreType::AIV) { + core_id = i % 2 == 0 ? tracker.clusters[i / 2].aiv_core_ids[0] : tracker.clusters[i / 2].aiv_core_ids[1]; + } + if ((tracker.running_id[core_id] != AICPU_TASK_INVALID || tracker.pending_id[core_id] != AICPU_TASK_INVALID)) { + uint64_t reg_addr = core_id_to_reg_addr_[core_id]; + uint64_t reg_val = read_reg(reg_addr, RegId::COND); + int32_t reg_task_id = EXTRACT_TASK_ID(reg_val); + int32_t reg_state = EXTRACT_TASK_STATE(reg_val); + auto &pending_id_ref = tracker.pending_id[core_id]; + auto &running_id_ref = tracker.running_id[core_id]; + if (pending_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) { + // pending task finish + // running task finish if it exists + int32_t running_id_val = running_id_ref; + int32_t pending_id_val = pending_id_ref; + pending_id_ref = AICPU_TASK_INVALID; + running_id_ref = AICPU_TASK_INVALID; + if (running_id_val != AICPU_TASK_INVALID) { + // running task fishish + // resolve_t0 = get_sys_cnt_aicpu(); + finalize_core_task_execution(thread_idx, hank, completed_this_turn, + cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, + local_bufs, running_id_val, core_id, profiling_enabled, + phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree); + // resolve_total += get_sys_cnt_aicpu() - resolve_t0; + // resolve_cnt++; + } + // pending task finish + // resolve_t0 = get_sys_cnt_aicpu(); + finalize_core_task_execution(thread_idx, hank, completed_this_turn, + cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, + local_bufs, pending_id_val, core_id, profiling_enabled, + phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree); + // resolve_total += get_sys_cnt_aicpu() - resolve_t0; + // resolve_cnt++; + LOG_INFO("Thread %d: Core %d completed task %d (runnung_id = %d)", + thread_idx, core_id, pending_id_val, running_id_val); + } else if (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE) { + int32_t running_id_value_ack = running_id_ref; + running_id_ref = reg_task_id; + pending_id_ref = AICPU_TASK_INVALID; + if (running_id_value_ack != AICPU_TASK_INVALID) { + // running task finish + // resolve_t0 = get_sys_cnt_aicpu(); + finalize_core_task_execution(thread_idx, hank, completed_this_turn, + cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, + local_bufs, running_id_value_ack, core_id, profiling_enabled, + phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree); + // resolve_total += get_sys_cnt_aicpu() - resolve_t0; + // resolve_cnt++; + } + LOG_INFO("Thread %d: Core %d completed task %d (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE)", + thread_idx, core_id, reg_task_id); + } else if (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) { + // running task finish + int32_t running_id_val = running_id_ref; + running_id_ref = AICPU_TASK_INVALID; + // resolve_t0 = get_sys_cnt_aicpu(); + finalize_core_task_execution(thread_idx, hank, completed_this_turn, + cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, + local_bufs, running_id_val, core_id, profiling_enabled, + phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree); + if (tracker.pending_id[core_id] == AICPU_TASK_INVALID) { + tracker.core_run_ready_cnt[static_cast(CT)]++; + } + // resolve_total += get_sys_cnt_aicpu() - resolve_t0; + // resolve_cnt++; + LOG_INFO("Thread %d: Core %d completed task %d (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE)", + thread_idx, core_id, running_id_val); } - made_progress = true; } } + (void) complete_probe_count; + (void) complete_hit_count; + resolve_total += get_sys_cnt_aicpu() - resolve_t0; } static const char* shape_name(PTO2ResourceShape shape) { @@ -472,13 +639,14 @@ struct AicpuExecutor { } void dispatch_subtask_to_core( - Runtime* runtime, CoreStateTracker& tracker, int32_t* executing_task_ids, + Runtime* runtime, CoreStateTracker& tracker, int32_t core_id, CoreType core_type, PTO2TaskSlotState& slot_state, PTO2SubtaskSlot subslot #if PTO2_PROFILING , bool profiling_enabled, int32_t thread_idx #endif ) { + (void) core_type; PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id]; PTO2TaskDescriptor& task = *slot_state.task; int32_t slot_idx = static_cast(subslot); @@ -488,6 +656,7 @@ struct AicpuExecutor { if (profiling_enabled) { dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(runtime, core_id, thread_idx); core_dispatch_counts_[core_id] = 0; } @@ -495,12 +664,7 @@ struct AicpuExecutor { } #endif write_reg(core_id_to_reg_addr_[core_id], RegId::DATA_MAIN_BASE, static_cast(task.mixed_task_id + 1)); - - CoreTypeTracker& ct = tracker.by_type[static_cast(core_type)]; - int32_t idle_idx = ct.find_idle_index(core_id); - ct.move_idle_to_running(idle_idx); - tracker.core_idle[core_id] = false; - executing_task_ids[core_id] = task.mixed_task_id; + tracker.pending_id[core_id] = task.mixed_task_id; } }; @@ -607,14 +771,9 @@ void AicpuExecutor::assign_cores_to_threads() { for (int32_t i = 0; i < thread_num_; i++) { for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { - executing_task_ids_[i][j] = AICPU_TASK_INVALID; + trackers_[i].pending_id[j] = AICPU_TASK_INVALID; + trackers_[i].running_id[j] = AICPU_TASK_INVALID; } - trackers_[i].aic().running_count = 0; - trackers_[i].aiv().running_count = 0; - trackers_[i].aic().idle_count = 0; - trackers_[i].aiv().idle_count = 0; - trackers_[i].cluster_count = 0; - memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle)); } for (int32_t t = 0; t < thread_num_; t++) { @@ -633,19 +792,18 @@ void AicpuExecutor::assign_cores_to_threads() { int32_t aic_wid = aic_cores_[ci].worker_id; int32_t aiv0_wid = aiv_cores_[2 * ci].worker_id; int32_t aiv1_wid = aiv_cores_[2 * ci + 1].worker_id; - + tracker.core_pend_ready_cnt[static_cast(CoreType::AIC)]++; + tracker.core_pend_ready_cnt[static_cast(CoreType::AIV)] += 2; + tracker.core_run_ready_cnt[static_cast(CoreType::AIC)]++; + tracker.core_run_ready_cnt[static_cast(CoreType::AIV)] += 2; + // tracker.run_ready_core_idx[static_cast(CoreType::AIC)][tracker.core_run_ready_cnt[static_cast(CoreType::AIC)]++] = aic_wid; + // tracker.run_ready_core_idx[static_cast(CoreType::AIV)][tracker.core_run_ready_cnt[static_cast(CoreType::AIV)]++] = aiv0_wid; + // tracker.run_ready_core_idx[static_cast(CoreType::AIV)][tracker.core_run_ready_cnt[static_cast(CoreType::AIV)]++] = aiv1_wid; tracker.clusters[tracker.cluster_count++] = {aic_wid, {aiv0_wid, aiv1_wid}}; core_assignments_[t][core_idx++] = aic_wid; - tracker.aic().idle[tracker.aic().idle_count++] = aic_wid; - tracker.core_idle[aic_wid] = true; - core_assignments_[t][core_idx++] = aiv0_wid; core_assignments_[t][core_idx++] = aiv1_wid; - tracker.aiv().idle[tracker.aiv().idle_count++] = aiv0_wid; - tracker.aiv().idle[tracker.aiv().idle_count++] = aiv1_wid; - tracker.core_idle[aiv0_wid] = true; - tracker.core_idle[aiv1_wid] = true; DEV_INFO("Thread %d: cluster %d (AIC=%d, AIV0=%d, AIV1=%d)", t, ci, aic_wid, aiv0_wid, aiv1_wid); @@ -666,64 +824,58 @@ void AicpuExecutor::assign_cores_to_threads() { void AicpuExecutor::reassign_cores_for_all_threads() { DEV_INFO("Reassigning cores (cluster-aligned) for %d threads: %d AIC, %d AIV", thread_num_, aic_count_, aiv_count_); - - // Collect running/idle state from all threads before reassignment - int32_t running_cores[128]; - int32_t running_task_ids[128]; + + int32_t pending_task_ids[MAX_CORES_PER_THREAD]; + int32_t running_task_ids[MAX_CORES_PER_THREAD]; int32_t running_count = 0; - - bool was_idle[MAX_CORES_PER_THREAD]; - memset(was_idle, 0, sizeof(was_idle)); + int32_t pending_count = 0; + memset(pending_task_ids, AICPU_TASK_INVALID, sizeof(pending_task_ids)); + memset(running_task_ids, AICPU_TASK_INVALID, sizeof(running_task_ids)); for (int32_t i = 0; i < thread_num_; i++) { - for (int32_t j = 0; j < trackers_[i].aic().running_count; j++) { - int32_t core_id = trackers_[i].aic().running[j]; - running_cores[running_count] = core_id; - running_task_ids[running_count] = executing_task_ids_[i][core_id]; - running_count++; - } - for (int32_t j = 0; j < trackers_[i].aic().idle_count; j++) { - was_idle[trackers_[i].aic().idle[j]] = true; - } - for (int32_t j = 0; j < trackers_[i].aiv().running_count; j++) { - int32_t core_id = trackers_[i].aiv().running[j]; - running_cores[running_count] = core_id; - running_task_ids[running_count] = executing_task_ids_[i][core_id]; - running_count++; - } - for (int32_t j = 0; j < trackers_[i].aiv().idle_count; j++) { - was_idle[trackers_[i].aiv().idle[j]] = true; + auto tracker = trackers_[i]; + for (int32_t j = 0; j < tracker.cluster_count; j++) { + int32_t core_ids[3]; + core_ids[0] = tracker.clusters[j].aic_core_id; + core_ids[1] = tracker.clusters[j].aiv_core_ids[0]; + core_ids[2] = tracker.clusters[j].aiv_core_ids[1]; + for (int core_idx = 0; core_idx < 3; core_idx++) { + int32_t core_id = core_ids[core_idx]; + if (tracker.pending_id[core_id] != AICPU_TASK_INVALID) { + pending_task_ids[core_id] = tracker.pending_id[core_id]; + pending_count++; + } + if (tracker.running_id[core_id] != AICPU_TASK_INVALID) { + running_task_ids[core_id] = tracker.running_id[core_id]; + running_count++; + } + } } } // Reset all trackers for (int32_t i = 0; i < thread_num_; i++) { core_count_per_thread_[i] = 0; - trackers_[i].aic().running_count = 0; - trackers_[i].aic().idle_count = 0; - trackers_[i].aiv().running_count = 0; - trackers_[i].aiv().idle_count = 0; trackers_[i].cluster_count = 0; - memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle)); - for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { - executing_task_ids_[i][j] = AICPU_TASK_INVALID; - } + trackers_[i].core_pend_ready_cnt[static_cast(CoreType::AIC)] = 0; + trackers_[i].core_pend_ready_cnt[static_cast(CoreType::AIV)] = 0; + trackers_[i].core_run_ready_cnt[static_cast(CoreType::AIC)] = 0; + trackers_[i].core_run_ready_cnt[static_cast(CoreType::AIV)] = 0; + memset(trackers_[i].pending_id, AICPU_TASK_INVALID, sizeof(trackers_[i].pending_id)); + memset(trackers_[i].running_id, AICPU_TASK_INVALID, sizeof(trackers_[i].running_id)); } // Restore a single core's running/idle state into its new thread's tracker - auto reassign_core = [&](int32_t worker_id, CoreTypeTracker& type_tracker, + auto reassign_core = [&](int32_t worker_id, CoreType type, CoreStateTracker& tracker, int32_t thread_idx) { core_assignments_[thread_idx][core_count_per_thread_[thread_idx]++] = worker_id; - for (int32_t j = 0; j < running_count; j++) { - if (running_cores[j] == worker_id) { - type_tracker.running[type_tracker.running_count++] = worker_id; - executing_task_ids_[thread_idx][worker_id] = running_task_ids[j]; - return; - } + if (pending_task_ids[worker_id] != AICPU_TASK_INVALID) { + tracker.pending_id[worker_id] = pending_task_ids[worker_id]; + tracker.core_pend_ready_cnt[static_cast(type)]--; } - if (was_idle[worker_id]) { - type_tracker.idle[type_tracker.idle_count++] = worker_id; - tracker.core_idle[worker_id] = true; + if (running_task_ids[worker_id] != AICPU_TASK_INVALID) { + tracker.running_id[worker_id] = running_task_ids[worker_id]; + tracker.core_run_ready_cnt[static_cast(type)]--; } }; @@ -737,10 +889,14 @@ void AicpuExecutor::reassign_cores_for_all_threads() { int32_t aiv1_wid = aiv_cores_[2 * ci + 1].worker_id; tracker.clusters[tracker.cluster_count++] = {aic_wid, {aiv0_wid, aiv1_wid}}; - - reassign_core(aic_wid, tracker.aic(), tracker, t); - reassign_core(aiv0_wid, tracker.aiv(), tracker, t); - reassign_core(aiv1_wid, tracker.aiv(), tracker, t); + tracker.core_pend_ready_cnt[static_cast(CoreType::AIC)]++; + tracker.core_pend_ready_cnt[static_cast(CoreType::AIV)] += 2; + tracker.core_run_ready_cnt[static_cast(CoreType::AIC)]++; + tracker.core_run_ready_cnt[static_cast(CoreType::AIV)] += 2; + + reassign_core(aic_wid, CoreType::AIC, tracker, t); + reassign_core(aiv0_wid, CoreType::AIV, tracker, t); + reassign_core(aiv1_wid, CoreType::AIV, tracker, t); } // Log final distribution for verification @@ -930,6 +1086,11 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa uint32_t phase_dispatch_count = 0; uint64_t local_dispatch_count = 0; uint64_t local_overflow_count = 0; + uint64_t phase1_total = 0; + uint64_t phase2_total = 0; + uint64_t phase3_total = 0; + uint64_t resolve_total = 0; + uint64_t resovle_cnt = 0; #if PTO2_SCHED_PROFILING uint64_t sched_complete_perf_cycle = 0; uint64_t sched_dispatch_pop_cycle = 0; @@ -958,15 +1119,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa uint64_t _t0_phase = _t0; #endif int32_t task_count = 0; - if (tracker.aic().running_count == 0 && tracker.aiv().running_count == 0) { - bool orch_done = orchestrator_done_; - if (orch_done) { - task_count = total_tasks_; - if (task_count > 0 && completed_tasks_.load(std::memory_order_relaxed) >= task_count) { - completed_.store(true, std::memory_order_release); - DEV_INFO("Thread %d: PTO2 completed tasks %d/%d", thread_idx, completed_tasks_.load(std::memory_order_relaxed), task_count); - break; - } + bool orch_done = orchestrator_done_; + if (orch_done) { + task_count = total_tasks_; + if (task_count > 0 && completed_tasks_.load(std::memory_order_relaxed) >= task_count) { + completed_.store(true, std::memory_order_release); + DEV_INFO("Thread %d: PTO2 completed tasks %d/%d", thread_idx, completed_tasks_.load(std::memory_order_relaxed), task_count); + break; } } @@ -1001,17 +1160,17 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Check AIC running cores bool try_completed = false; always_assert(local_bufs[0].count == 0 && local_bufs[1].count == 0); // Invariant: previous iteration fully consumed - if (tracker.aic().running_count > 0) { + auto phase1_t0 = get_sys_cnt_aicpu(); + if (tracker.core_run_ready_cnt[static_cast(CoreType::AIC)] < tracker.cluster_count) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aic(), tracker.core_idle, hank, executing_task_ids, - completed_this_turn, cur_thread_completed, made_progress, + thread_idx, hank, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs #if PTO2_PROFILING , profiling_enabled, complete_probe_count, complete_hit_count, phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, - fanin_edges_total, fanin_max_degree + fanin_edges_total, fanin_max_degree, resolve_total, resovle_cnt #endif #if PTO2_SCHED_PROFILING , sched_complete_perf_cycle @@ -1020,23 +1179,23 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } // Check AIV running cores - if (tracker.aiv().running_count > 0) { + if (tracker.core_run_ready_cnt[static_cast(CoreType::AIV)] < tracker.cluster_count * 2) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aiv(), tracker.core_idle, hank, executing_task_ids, - completed_this_turn, cur_thread_completed, made_progress, + thread_idx, hank, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs #if PTO2_PROFILING , profiling_enabled, complete_probe_count, complete_hit_count, phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, - fanin_edges_total, fanin_max_degree + fanin_edges_total, fanin_max_degree, resolve_total, resovle_cnt #endif #if PTO2_SCHED_PROFILING , sched_complete_perf_cycle #endif ); } + phase1_total += get_sys_cnt_aicpu() - phase1_t0; if (completed_this_turn > 0) { #if PTO2_SCHED_PROFILING rt->scheduler.tasks_completed.fetch_add(completed_this_turn, std::memory_order_relaxed); @@ -1053,7 +1212,6 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } } } - #if PTO2_PROFILING if (!try_completed) { CYCLE_COUNT_LAP(sched_idle_cycle); @@ -1075,12 +1233,12 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Local dispatch: drain both per-CoreType local_bufs, match to idle clusters by shape PTO2TaskSlotState* overflow_ptrs[LOCAL_READY_CAP_PER_TYPE * PTO2_LOCAL_DISPATCH_TYPE_NUM]; int overflow_count = 0; + auto phase2_t0 = get_sys_cnt_aicpu(); for (int bi = 0; bi < PTO2_LOCAL_DISPATCH_TYPE_NUM; bi++) { while (local_bufs[bi].count > 0) { PTO2TaskSlotState* slot_state = local_bufs[bi].pop(); PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state->active_mask); int32_t ci = tracker.find_cluster_for_shape(shape); - if (ci >= 0) { try_pushed = true; Cluster& c = tracker.clusters[ci]; @@ -1090,7 +1248,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1098,8 +1256,13 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (rc.aiv >= 1) { - int32_t aiv0 = tracker.core_idle[c.aiv_core_ids[0]] ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, + int32_t aiv0 = c.aiv_core_ids[0]; + if (tracker.is_core_run_ready(c.aiv_core_ids[0]) || tracker.is_core_run_ready(c.aiv_core_ids[1])) { + aiv0 = tracker.is_core_run_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; + }else if (tracker.is_core_pend_ready(c.aiv_core_ids[0]) || tracker.is_core_pend_ready(c.aiv_core_ids[1])) { + aiv0 = tracker.is_core_pend_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; + } + dispatch_subtask_to_core(runtime, tracker, aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1107,7 +1270,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1137,6 +1300,8 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } } + auto phase2_t1 = get_sys_cnt_aicpu(); + phase2_total += phase2_t1 - phase2_t0; // Push overflow to global readyQ (shape-based) for (int i = 0; i < overflow_count; i++) { rt->scheduler.requeue_ready_task(*overflow_ptrs[i]); @@ -1145,10 +1310,10 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Phase 3: Global dispatch — fill remaining idle cores from global readyQ (cluster-based) const PTO2ResourceShape* dispatch_order = get_dispatch_order(thread_idx); + auto phase3_t0 = get_sys_cnt_aicpu(); for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) { PTO2ResourceShape shape = dispatch_order[si]; if (rt->scheduler.ready_queues[static_cast(shape)].size() == 0) continue; - while (true) { int32_t ci = tracker.find_cluster_for_shape(shape); if (ci < 0) break; @@ -1174,7 +1339,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1182,17 +1347,21 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (rc.aiv >= 1) { - int32_t aiv_id = tracker.core_idle[c.aiv_core_ids[0]] - ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, - aiv_id, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 + int32_t aiv0 = c.aiv_core_ids[0]; + if (tracker.is_core_run_ready(c.aiv_core_ids[0]) || tracker.is_core_run_ready(c.aiv_core_ids[1])) { + aiv0 = tracker.is_core_run_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; + }else if (tracker.is_core_pend_ready(c.aiv_core_ids[0]) || tracker.is_core_pend_ready(c.aiv_core_ids[1])) { + aiv0 = tracker.is_core_pend_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; + } + dispatch_subtask_to_core(runtime, tracker, + aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING , profiling_enabled, thread_idx #endif ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1210,6 +1379,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ci); } } + auto phase3_t1 = get_sys_cnt_aicpu(); + phase3_total += phase3_t1 - phase3_t0; + #if PTO2_PROFILING if (!try_pushed) { @@ -1462,6 +1634,8 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa cycles_to_us(sched_total), (unsigned long long)sched_loop_count, cur_thread_completed); + DEV_ALWAYS("RESOLVE CNT: %lu, RESOLVE COST:%.3fus, AVG RESOLVE COST: %.3fus, PHASE 1 COST: %.3fus, PHASE 2 COST: %.3fus, PAHSE 3 COST: %.3fus", + resovle_cnt, cycles_to_us(resolve_total), cycles_to_us(resolve_total / resovle_cnt), cycles_to_us(phase1_total),cycles_to_us(phase2_total), cycles_to_us(phase3_total)); #endif #if PTO2_PROFILING @@ -1471,7 +1645,6 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa perf_aicpu_flush_phase_buffers(thread_idx); } #endif - return cur_thread_completed; } @@ -1909,6 +2082,7 @@ void AicpuExecutor::deinit(Runtime* runtime) { executing_task_ids_[i][j] = AICPU_TASK_INVALID; } } + regs_ = 0; // Clear file-scope PTO2Runtime pointer (freed by orchestrator thread before deinit)