From c46278211f6525f5abb5321cda6afa9461917572 Mon Sep 17 00:00:00 2001 From: Chao Wang <26245345+ChaoWao@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:42:07 +0800 Subject: [PATCH] feat(runtime): double-buffered payload dispatch for AICore-AICPU pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two payload slots per core enable AICPU to pre-stage the next task while AICore is still executing, eliminating idle gaps between dispatches. Key changes: AICPU side (aicpu_executor.cpp): - Double-buffered payload array: s_pto2_payload_per_core[core][2] with XOR-flip slot selection - 4-case completion state machine (Case A/B/C/D) handling all combinations of pending + running task FIN/ACK signals - Two-level cluster search: first pass finds fully-idle clusters, second pass finds pend-ready clusters (pending slot empty, core running) - ACK-wait guard in dispatch_subtask_to_core: spin-waits until AICore ACKs the current running task before overwriting hank->task and DATA_MAIN_BASE, preventing the race where AICore skips a task - Pending subslot tracking (pending_subslot_by_core_) for correct subtask_done_mask bit when promoting pending to running - Extracted complete_subtask() helper to deduplicate completion logic across the four state machine cases - Correct profiling: saved running dispatch timestamp before pipeline overwrite; both running and pending records filled in Case A/B AICore side (aicore_executor.cpp): - FIN-skip protocol: after task execution, read DATA_MAIN_BASE to check if AICPU already dispatched a pending task. If pending exists, skip FIN write — the next ACK implicitly signals completion - Per-dispatch hank->task read: invalidate full data cache and re-read payload address each dispatch (AICPU updates it per slot) --- .../aicore/aicore_executor.cpp | 23 +- .../aicpu/aicpu_executor.cpp | 665 +++++++++++++++--- 2 files changed, 570 insertions(+), 118 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp index 860c91c2..bd303445 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicore/aicore_executor.cpp @@ -42,6 +42,8 @@ __aicore__ __attribute__((always_inline)) static void execute_task( * * Task dispatch reads PTO2DispatchPayload address from Handshake.task. * Task ID is derived from the register value (task_id + 1 encoding). + * With double-buffering, AICPU updates hank->task to point to the + * appropriate payload slot each dispatch. * * @param runtime Pointer to Runtime in global memory * @param block_idx Block index (core ID) @@ -72,10 +74,6 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in dcci(my_hank, SINGLE_CACHE_LINE, CACHELINE_OUT); - // Cache payload address (set once by AICPU during initialization, never changes) - __gm__ PTO2DispatchPayload* payload = - reinterpret_cast<__gm__ PTO2DispatchPayload*>(my_hank->task); - bool profiling_enabled = runtime->enable_profiling; uint64_t kernel_ready_time = get_sys_cnt_aicore(); @@ -101,8 +99,12 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in { uint32_t task_id = reg_val; // Decode: register holds task_id directly - // Invalidate payload buffer (AICPU updates its content each dispatch) - dcci(payload, ENTIRE_DATA_CACHE); + // Invalidate entire data cache to read fresh payload and hank->task + dcci(my_hank, ENTIRE_DATA_CACHE); + + // Read per-task dispatch payload address (updated by AICPU each dispatch) + __gm__ PTO2DispatchPayload* payload = + reinterpret_cast<__gm__ PTO2DispatchPayload*>(my_hank->task); write_reg(RegId::COND, MAKE_ACK_VALUE(task_id)); @@ -122,7 +124,14 @@ __aicore__ __attribute__((weak)) void aicore_execute(__gm__ Runtime* runtime, in } last_reg_val = reg_val; - write_reg(RegId::COND, MAKE_FIN_VALUE(task_id)); + + // Check if AICPU already dispatched a pending task + uint32_t next = static_cast(read_reg(RegId::DATA_MAIN_BASE)); + if (next == last_reg_val || next == AICPU_IDLE_TASK_ID) { + // No pending task — write FIN so AICPU knows this core is idle + write_reg(RegId::COND, MAKE_FIN_VALUE(task_id)); + } + // Pending task exists — skip FIN; the next ACK implicitly signals completion } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 478e6640..8ed5b80e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -69,8 +69,8 @@ constexpr int32_t PROGRESS_LOG_INTERVAL = 250; // log every N completions a static PTO2Runtime *rt{nullptr}; -// Per-core dispatch payload storage (one per physical core) -static PTO2DispatchPayload s_pto2_payload_per_core[RUNTIME_MAX_WORKER]; +// Per-core dispatch payload storage (two slots per core for double-buffering) +static PTO2DispatchPayload s_pto2_payload_per_core[RUNTIME_MAX_WORKER][2]; // Core information for discovery (with register address for fast dispatch) struct CoreInfo { @@ -121,7 +121,12 @@ struct CoreStateTracker { template CoreTypeTracker& get() { return by_type[static_cast(CT)]; } - int32_t find_cluster_for_shape(PTO2ResourceShape shape) { + // Two-level cluster search: run-ready (fully idle) first, pend-ready fallback. + // When pending_ids is provided, a second pass finds clusters where all needed + // cores have their pending slot empty (but may still be running). + int32_t find_cluster_for_shape(PTO2ResourceShape shape, + const int32_t* pending_ids = nullptr) { + // First pass: fully idle cores (original behavior) for (int32_t i = 0; i < cluster_count; i++) { Cluster& c = clusters[i]; switch (shape) { @@ -144,8 +149,49 @@ struct CoreStateTracker { break; } } + // Second pass: pend-ready (pending slot empty, core may be running) + if (pending_ids) { + for (int32_t i = 0; i < cluster_count; i++) { + Cluster& c = clusters[i]; + switch (shape) { + case PTO2ResourceShape::AIC_ONLY: + if (pending_ids[c.aic_core_id] == AICPU_TASK_INVALID) return i; + break; + case PTO2ResourceShape::AIV_X1: + if (pending_ids[c.aiv_core_ids[0]] == AICPU_TASK_INVALID || + pending_ids[c.aiv_core_ids[1]] == AICPU_TASK_INVALID) return i; + break; + case PTO2ResourceShape::AIV_X2: + if (pending_ids[c.aiv_core_ids[0]] == AICPU_TASK_INVALID && + pending_ids[c.aiv_core_ids[1]] == AICPU_TASK_INVALID) return i; + break; + case PTO2ResourceShape::AIC_AIV_X1: + if (pending_ids[c.aic_core_id] == AICPU_TASK_INVALID && + (pending_ids[c.aiv_core_ids[0]] == AICPU_TASK_INVALID || + pending_ids[c.aiv_core_ids[1]] == AICPU_TASK_INVALID)) return i; + break; + case PTO2ResourceShape::AIC_AIV_X2: + if (pending_ids[c.aic_core_id] == AICPU_TASK_INVALID && + pending_ids[c.aiv_core_ids[0]] == AICPU_TASK_INVALID && + pending_ids[c.aiv_core_ids[1]] == AICPU_TASK_INVALID) return i; + break; + } + } + } return -1; } + + // Select an AIV core: prefer idle, fall back to pend-ready + int32_t select_aiv_core(int32_t aiv0, int32_t aiv1, + const int32_t* pending_ids = nullptr) { + if (core_idle[aiv0]) return aiv0; + if (core_idle[aiv1]) return aiv1; + if (pending_ids) { + if (pending_ids[aiv0] == AICPU_TASK_INVALID) return aiv0; + if (pending_ids[aiv1] == AICPU_TASK_INVALID) return aiv1; + } + return aiv0; + } }; struct AicpuExecutor { @@ -194,6 +240,10 @@ struct AicpuExecutor { // NOTE: this is NOT the mixed_task_id; it is the per-core dispatch id used by the // register protocol (derived from dispatch_seq_by_core_ and masked by TASK_ID_MASK). int32_t executing_reg_task_ids_[MAX_AICPU_THREADS][MAX_CORES_PER_THREAD]; + int32_t pending_reg_task_ids_[MAX_AICPU_THREADS][MAX_CORES_PER_THREAD]; + PTO2TaskSlotState* pending_slot_state_by_core_[RUNTIME_MAX_WORKER]{}; + PTO2SubtaskSlot pending_subslot_by_core_[RUNTIME_MAX_WORKER]{}; + int32_t payload_slot_[MAX_AICPU_THREADS][MAX_CORES_PER_THREAD]; CoreStateTracker trackers_[MAX_AICPU_THREADS]; // ===== Task queue state (managed by scheduler ready queues) ===== @@ -226,6 +276,7 @@ struct AicpuExecutor { // ===== Performance profiling state ===== uint64_t dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Per-core AICPU dispatch timestamp + uint64_t running_dispatch_timestamps_[RUNTIME_MAX_WORKER]; // Saved running task dispatch timestamp (before pipeline overwrite) uint32_t core_dispatch_counts_[RUNTIME_MAX_WORKER]; // Per-core total dispatched task counter (for buffer management) uint64_t* func_id_to_addr_; @@ -264,13 +315,71 @@ struct AicpuExecutor { } } - // Template methods for Phase 1 and Phase 2 + // Helper: complete a subtask and handle mixed-task completion + deferred release + void complete_subtask(PTO2TaskSlotState& slot_state, PTO2SubtaskSlot subslot, + int32_t thread_idx, + int32_t& completed_this_turn, + int32_t& cur_thread_completed, + PTO2TaskSlotState* deferred_release_slot_states[], + int32_t& deferred_release_count, + PTO2LocalReadyBuffer* local_bufs +#if PTO2_PROFILING + , uint32_t& phase_complete_count +#endif +#if PTO2_SCHED_PROFILING + , + uint64_t& notify_edges_total, + int32_t& notify_max_degree, + uint64_t& notify_tasks_enqueued, + uint64_t& fanin_edges_total, + int32_t& fanin_max_degree +#endif + ) { + bool mixed_complete = rt->scheduler.on_subtask_complete(slot_state, subslot); + if (mixed_complete) { +#if PTO2_SCHED_PROFILING + PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, thread_idx, local_bufs); + notify_edges_total += cstats.fanout_edges; + if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; + notify_tasks_enqueued += cstats.tasks_enqueued; + phase_complete_count++; +#else + rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); +#if PTO2_PROFILING + phase_complete_count++; +#endif +#endif + if (deferred_release_count >= 256) { + DEV_ALWAYS("Thread %d: release", thread_idx); + while (deferred_release_count > 0) { +#if PTO2_SCHED_PROFILING + int32_t fe = rt->scheduler.on_task_release( + *deferred_release_slot_states[--deferred_release_count], thread_idx); +#else + int32_t fe = + rt->scheduler.on_task_release(*deferred_release_slot_states[--deferred_release_count]); +#endif + (void)fe; +#if PTO2_SCHED_PROFILING + fanin_edges_total += fe; + if (fe > fanin_max_degree) fanin_max_degree = fe; +#endif + } + } + deferred_release_slot_states[deferred_release_count++] = &slot_state; + completed_this_turn++; + } + cur_thread_completed++; + } + + // Template methods for completion check with 4-case double-buffer state machine template void check_running_cores_for_completion(int32_t thread_idx, CoreTypeTracker& ct, - bool* core_idle, Handshake* hank, int32_t* executing_reg_task_ids, + int32_t* pending_reg_task_ids, + bool* core_idle, int32_t& completed_this_turn, int32_t& cur_thread_completed, bool& made_progress, @@ -298,66 +407,181 @@ struct AicpuExecutor { int32_t core_id = ct.running[i]; uint64_t reg_addr = core_id_to_reg_addr_[core_id]; - int32_t expected_reg_task_id = executing_reg_task_ids[core_id]; + int32_t running_id = executing_reg_task_ids[core_id]; + int32_t pending_id = pending_reg_task_ids[core_id]; uint64_t reg_val = read_reg(reg_addr, RegId::COND); int32_t reg_task_id = EXTRACT_TASK_ID(reg_val); int32_t reg_state = EXTRACT_TASK_STATE(reg_val); - bool done = reg_task_id == expected_reg_task_id && reg_state == TASK_FIN_STATE; + #if PTO2_SCHED_PROFILING if (profiling_enabled) { complete_probe_count++; - if (done) { - complete_hit_count++; - } } #endif - if (done) { - executing_reg_task_ids[core_id] = AICPU_TASK_INVALID; - PTO2SubtaskSlot subslot = executing_subslot_by_core_[core_id]; - PTO2TaskSlotState& slot_state = *executing_slot_state_by_core_[core_id]; + // Case A: Pending task FIN'd directly — both tasks are done + if (pending_id != AICPU_TASK_INVALID && + reg_task_id == pending_id && reg_state == TASK_FIN_STATE) { - // Two-stage completion: mark subtask done, then handle mixed-task completion - bool mixed_complete = rt->scheduler.on_subtask_complete(slot_state, subslot); - if (mixed_complete) { + // Running task implicitly completed + if (running_id != AICPU_TASK_INVALID) { + PTO2TaskSlotState& running_slot = *executing_slot_state_by_core_[core_id]; + complete_subtask(running_slot, executing_subslot_by_core_[core_id], thread_idx, + completed_this_turn, cur_thread_completed, + deferred_release_slot_states, deferred_release_count, local_bufs +#if PTO2_PROFILING + , phase_complete_count +#endif #if PTO2_SCHED_PROFILING - PTO2CompletionStats cstats = rt->scheduler.on_mixed_task_complete(slot_state, thread_idx, local_bufs); - notify_edges_total += cstats.fanout_edges; - if (cstats.fanout_edges > notify_max_degree) notify_max_degree = cstats.fanout_edges; - notify_tasks_enqueued += cstats.tasks_enqueued; - phase_complete_count++; -#else - rt->scheduler.on_mixed_task_complete(slot_state, local_bufs); + , notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree +#endif + ); + } + // Pending task explicitly completed + PTO2TaskSlotState& pending_slot = *pending_slot_state_by_core_[core_id]; + PTO2SubtaskSlot pending_subslot = pending_subslot_by_core_[core_id]; + complete_subtask(pending_slot, pending_subslot, thread_idx, + completed_this_turn, cur_thread_completed, + deferred_release_slot_states, deferred_release_count, local_bufs #if PTO2_PROFILING - phase_complete_count++; + , phase_complete_count #endif +#if PTO2_SCHED_PROFILING + , notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree #endif - if (deferred_release_count < 256) { - deferred_release_slot_states[deferred_release_count++] = &slot_state; - } else { - DEV_ALWAYS("Thread %d: release", thread_idx); - while (deferred_release_count > 0) { + ); + + executing_reg_task_ids[core_id] = AICPU_TASK_INVALID; + pending_reg_task_ids[core_id] = AICPU_TASK_INVALID; + ct.move_running_to_idle(i); + core_idle[core_id] = true; + made_progress = true; +#if PTO2_PROFILING + if (profiling_enabled) { #if PTO2_SCHED_PROFILING - int32_t fe = rt->scheduler.on_task_release( - *deferred_release_slot_states[--deferred_release_count], thread_idx); -#else - int32_t fe = - rt->scheduler.on_task_release(*deferred_release_slot_states[--deferred_release_count]); + complete_hit_count++; + uint64_t t_perf_start = get_sys_cnt_aicpu(); #endif - (void)fe; + Handshake* h = &hank[core_id]; + uint64_t finish_ts = get_sys_cnt_aicpu(); + PerfBuffer* perf_buf = (PerfBuffer*)h->perf_records_addr; + rmb(); + uint32_t count = perf_buf->count; + // Running task: use saved dispatch timestamp + if (count >= 2 && running_id != AICPU_TASK_INVALID) { + PerfRecord* record = &perf_buf->records[count - 2]; + if (record->task_id == static_cast(running_id)) { + PTO2TaskSlotState& rs = *executing_slot_state_by_core_[core_id]; + int32_t perf_slot_idx = static_cast(executing_subslot_by_core_[core_id]); + record->func_id = rs.task->kernel_id[perf_slot_idx]; + record->core_type = CT; + perf_aicpu_record_dispatch_and_finish_time( + record, running_dispatch_timestamps_[core_id], finish_ts); + } + } + // Pending task: use current dispatch timestamp + if (count > 0) { + PerfRecord* record = &perf_buf->records[count - 1]; + if (record->task_id == static_cast(pending_id)) { + int32_t perf_slot_idx_p = static_cast(pending_subslot); + record->func_id = pending_slot.task->kernel_id[perf_slot_idx_p]; + record->core_type = CT; + perf_aicpu_record_dispatch_and_finish_time( + record, dispatch_timestamps_[core_id], finish_ts); + } + } #if PTO2_SCHED_PROFILING - fanin_edges_total += fe; - if (fe > fanin_max_degree) fanin_max_degree = fe; + sched_complete_perf_cycle += (get_sys_cnt_aicpu() - t_perf_start); #endif + } +#endif + DEV_DEBUG("Thread %d: %s core %d Case A — pending task %d FIN'd (running %d implicit)", + thread_idx, CT == CoreType::AIC ? "AIC" : "AIV", core_id, pending_id, running_id); + } + // Case B: Pending task ACK'd — running task implicitly done, pending becomes running + else if (pending_id != AICPU_TASK_INVALID && + reg_task_id == pending_id && reg_state == TASK_ACK_STATE) { + + if (running_id != AICPU_TASK_INVALID) { + PTO2TaskSlotState& running_slot = *executing_slot_state_by_core_[core_id]; + complete_subtask(running_slot, executing_subslot_by_core_[core_id], thread_idx, + completed_this_turn, cur_thread_completed, + deferred_release_slot_states, deferred_release_count, local_bufs +#if PTO2_PROFILING + , phase_complete_count +#endif +#if PTO2_SCHED_PROFILING + , notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree +#endif + ); + } + + PTO2SubtaskSlot pending_subslot_b = pending_subslot_by_core_[core_id]; + executing_reg_task_ids[core_id] = pending_id; + executing_slot_state_by_core_[core_id] = pending_slot_state_by_core_[core_id]; + pending_reg_task_ids[core_id] = AICPU_TASK_INVALID; + pending_slot_state_by_core_[core_id] = nullptr; + made_progress = true; +#if PTO2_PROFILING + if (profiling_enabled) { +#if PTO2_SCHED_PROFILING + complete_hit_count++; + uint64_t t_perf_start = get_sys_cnt_aicpu(); +#endif + Handshake* h = &hank[core_id]; + uint64_t finish_ts = get_sys_cnt_aicpu(); + PerfBuffer* perf_buf = (PerfBuffer*)h->perf_records_addr; + rmb(); + uint32_t count = perf_buf->count; + if (count > 0 && running_id != AICPU_TASK_INVALID) { + PerfRecord* record = &perf_buf->records[count - 1]; + if (record->task_id == static_cast(running_id)) { + int32_t perf_slot_idx = static_cast(executing_subslot_by_core_[core_id]); + PTO2TaskSlotState& rs = *executing_slot_state_by_core_[core_id]; + record->func_id = rs.task->kernel_id[perf_slot_idx]; + record->core_type = CT; + perf_aicpu_record_dispatch_and_finish_time( + record, running_dispatch_timestamps_[core_id], finish_ts); } - deferred_release_slot_states[deferred_release_count++] = &slot_state; } +#if PTO2_SCHED_PROFILING + sched_complete_perf_cycle += (get_sys_cnt_aicpu() - t_perf_start); +#endif } +#endif + executing_subslot_by_core_[core_id] = pending_subslot_b; + DEV_DEBUG("Thread %d: %s core %d Case B — pending task %d ACK'd (running %d done)", + thread_idx, CT == CoreType::AIC ? "AIC" : "AIV", core_id, pending_id, running_id); + } + // Case C: Running task FIN'd (no pending) + else if (pending_id == AICPU_TASK_INVALID && + running_id != AICPU_TASK_INVALID && + reg_task_id == running_id && reg_state == TASK_FIN_STATE) { + + PTO2TaskSlotState& slot_state = *executing_slot_state_by_core_[core_id]; + complete_subtask(slot_state, executing_subslot_by_core_[core_id], thread_idx, + completed_this_turn, cur_thread_completed, + deferred_release_slot_states, deferred_release_count, local_bufs +#if PTO2_PROFILING + , phase_complete_count +#endif +#if PTO2_SCHED_PROFILING + , notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree +#endif + ); + + executing_reg_task_ids[core_id] = AICPU_TASK_INVALID; ct.move_running_to_idle(i); core_idle[core_id] = true; + made_progress = true; #if PTO2_PROFILING if (profiling_enabled) { #if PTO2_SCHED_PROFILING + complete_hit_count++; uint64_t t_perf_start = get_sys_cnt_aicpu(); #endif Handshake* h = &hank[core_id]; @@ -367,27 +591,12 @@ struct AicpuExecutor { uint32_t count = perf_buf->count; if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; - if (record->task_id == static_cast(expected_reg_task_id)) { - // Fill metadata that AICore doesn't know + if (record->task_id == static_cast(running_id)) { int32_t perf_slot_idx = static_cast(executing_subslot_by_core_[core_id]); record->func_id = slot_state.task->kernel_id[perf_slot_idx]; record->core_type = CT; perf_aicpu_record_dispatch_and_finish_time( record, dispatch_timestamps_[core_id], finish_ts); - - // Fill ring_id from slot state - record->ring_id = slot_state.ring_id; - - // Fill fanout from slot_state's dependency linked list. - // No lock: head-insert guarantees existing nodes' next pointers - // are stable, so this snapshot is consistent (best-effort). - record->fanout_count = 0; - PTO2DepListEntry* cur = slot_state.fanout_head; - while (cur != nullptr && record->fanout_count < RUNTIME_MAX_FANOUT) { - record->fanout[record->fanout_count++] = static_cast( - pto2_task_id_local(cur->slot_state->task->mixed_task_id)); - cur = cur->next; - } } } #if PTO2_SCHED_PROFILING @@ -395,18 +604,62 @@ struct AicpuExecutor { #endif } #endif + DEV_DEBUG("Thread %d: %s core %d Case C — running task %d FIN'd", + thread_idx, CT == CoreType::AIC ? "AIC" : "AIV", core_id, running_id); + } + // Case D: Running task FIN'd while pending exists — running done, pending not yet started + else if (pending_id != AICPU_TASK_INVALID && + running_id != AICPU_TASK_INVALID && + reg_task_id == running_id && reg_state == TASK_FIN_STATE) { - DEV_DEBUG("Thread %d: %s core %d completed PTO2 task %d (mixed_complete=%d)", - thread_idx, - CT == CoreType::AIC ? "AIC" : "AIV", - core_id, - expected_reg_task_id, - mixed_complete ? 1 : 0); - cur_thread_completed++; - if (mixed_complete) { - completed_this_turn++; - } + PTO2TaskSlotState& slot_state = *executing_slot_state_by_core_[core_id]; + complete_subtask(slot_state, executing_subslot_by_core_[core_id], thread_idx, + completed_this_turn, cur_thread_completed, + deferred_release_slot_states, deferred_release_count, local_bufs +#if PTO2_PROFILING + , phase_complete_count +#endif +#if PTO2_SCHED_PROFILING + , notify_edges_total, notify_max_degree, notify_tasks_enqueued, + fanin_edges_total, fanin_max_degree +#endif + ); + + PTO2SubtaskSlot pending_subslot_d = pending_subslot_by_core_[core_id]; + executing_reg_task_ids[core_id] = pending_id; + executing_slot_state_by_core_[core_id] = pending_slot_state_by_core_[core_id]; + pending_reg_task_ids[core_id] = AICPU_TASK_INVALID; + pending_slot_state_by_core_[core_id] = nullptr; made_progress = true; +#if PTO2_PROFILING + if (profiling_enabled) { +#if PTO2_SCHED_PROFILING + complete_hit_count++; + uint64_t t_perf_start = get_sys_cnt_aicpu(); +#endif + Handshake* h = &hank[core_id]; + uint64_t finish_ts = get_sys_cnt_aicpu(); + PerfBuffer* perf_buf = (PerfBuffer*)h->perf_records_addr; + rmb(); + uint32_t count = perf_buf->count; + if (count > 0) { + PerfRecord* record = &perf_buf->records[count - 1]; + if (record->task_id == static_cast(running_id)) { + int32_t perf_slot_idx = static_cast(executing_subslot_by_core_[core_id]); + record->func_id = slot_state.task->kernel_id[perf_slot_idx]; + record->core_type = CT; + perf_aicpu_record_dispatch_and_finish_time( + record, running_dispatch_timestamps_[core_id], finish_ts); + } + } +#if PTO2_SCHED_PROFILING + sched_complete_perf_cycle += (get_sys_cnt_aicpu() - t_perf_start); +#endif + } +#endif + executing_subslot_by_core_[core_id] = pending_subslot_d; + DEV_DEBUG("Thread %d: %s core %d Case D — running task %d FIN'd (pending %d not yet started)", + thread_idx, CT == CoreType::AIC ? "AIC" : "AIV", core_id, running_id, pending_id); } } } @@ -493,21 +746,27 @@ struct AicpuExecutor { } void dispatch_subtask_to_core( - Runtime* runtime, CoreStateTracker& tracker, int32_t* executing_reg_task_ids, + Runtime* runtime, CoreStateTracker& tracker, + int32_t* executing_reg_task_ids, int32_t* pending_reg_task_ids, int32_t core_id, CoreType core_type, PTO2TaskSlotState& slot_state, - PTO2SubtaskSlot subslot + PTO2SubtaskSlot subslot, + int32_t thread_idx, + Handshake* hank #if PTO2_PROFILING - , bool profiling_enabled, int32_t thread_idx + , bool profiling_enabled #endif ) { - PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id]; + int32_t slot = payload_slot_[thread_idx][core_id]; + PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id][slot]; PTO2TaskDescriptor& task = *slot_state.task; int32_t slot_idx = static_cast(subslot); build_pto2_payload(payload, task.kernel_id[slot_idx], *slot_state.payload); - executing_subslot_by_core_[core_id] = subslot; - executing_slot_state_by_core_[core_id] = &slot_state; + payload_slot_[thread_idx][core_id] ^= 1; #if PTO2_PROFILING if (profiling_enabled) { + if (!tracker.core_idle[core_id]) { + running_dispatch_timestamps_[core_id] = dispatch_timestamps_[core_id]; + } dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { perf_aicpu_switch_buffer(runtime, core_id, thread_idx); @@ -517,12 +776,6 @@ struct AicpuExecutor { } #endif // Per-core monotonic counter for register protocol uniqueness. - // mixed_task_id encodes (ring_id << 32 | local_id); truncation to - // uint32 loses ring_id, so tasks from different rings with the same - // local_id would write identical DATA_MAIN_BASE values. The AICore - // uses last_reg_val to detect new dispatches and would skip the - // duplicate, while the stale COND register from the previous task - // (same local_id) would cause a false-positive completion. dispatch_seq_by_core_[core_id]++; uint32_t reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; // Skip reserved sentinel values @@ -531,13 +784,131 @@ struct AicpuExecutor { dispatch_seq_by_core_[core_id]++; reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; } + // When dispatching to a running core (pending slot), ensure AICore has + // acknowledged the current running task before overwriting hank->task + // and DATA_MAIN_BASE. Without this, AICore might read the pending + // payload address while still processing the running task. + if (!tracker.core_idle[core_id]) { + int32_t running_id = executing_reg_task_ids[core_id]; + while (true) { + uint64_t cond_val = read_reg(core_id_to_reg_addr_[core_id], RegId::COND); + if (EXTRACT_TASK_ID(cond_val) == running_id) break; + } + } + hank[core_id].task = reinterpret_cast(&payload); write_reg(core_id_to_reg_addr_[core_id], RegId::DATA_MAIN_BASE, static_cast(reg_task_id)); - CoreTypeTracker& ct = tracker.by_type[static_cast(core_type)]; - int32_t idle_idx = ct.find_idle_index(core_id); - ct.move_idle_to_running(idle_idx); - tracker.core_idle[core_id] = false; - executing_reg_task_ids[core_id] = reg_task_id; + if (tracker.core_idle[core_id]) { + CoreTypeTracker& ct = tracker.by_type[static_cast(core_type)]; + int32_t idle_idx = ct.find_idle_index(core_id); + ct.move_idle_to_running(idle_idx); + tracker.core_idle[core_id] = false; + executing_reg_task_ids[core_id] = reg_task_id; + executing_subslot_by_core_[core_id] = subslot; + executing_slot_state_by_core_[core_id] = &slot_state; + } else { + pending_reg_task_ids[core_id] = reg_task_id; + pending_slot_state_by_core_[core_id] = &slot_state; + pending_subslot_by_core_[core_id] = subslot; + } + } + + // Pipeline dispatch: dispatch single-core tasks to running cores with empty pending slots + template + void dispatch_ready_tasks_to_running_cores(Runtime* runtime, + int32_t thread_idx, + CoreTypeTracker& ct, + int32_t* executing_reg_task_ids, + int32_t* pending_reg_task_ids, + bool& made_progress, + PTO2LocalReadyBuffer* local_bufs, + Handshake* hank +#if PTO2_PROFILING + , + bool profiling_enabled +#endif +#if PTO2_SCHED_PROFILING + , + uint64_t& pop_hit, + uint64_t& pop_miss, + uint32_t& phase_dispatch_count, + uint64_t& sched_dispatch_pop_cycle, + uint64_t& sched_dispatch_setup_cycle +#endif + ) { + (void)local_bufs; + constexpr PTO2ResourceShape shape = (CT == CoreType::AIC) ? + PTO2ResourceShape::AIC_ONLY : PTO2ResourceShape::AIV_X1; + if (ct.running_count > 0 && rt->scheduler.ready_queues[static_cast(shape)].size() > 0) { + for (int32_t i = ct.running_count - 1; i >= 0; i--) { + int32_t core_id = ct.running[i]; + if (pending_reg_task_ids[core_id] != AICPU_TASK_INVALID) continue; + + // Only pipeline-dispatch if AICore has ACKed the current running task. + // Without this check, a freshly-dispatched core (idle→running in this + // iteration) might not have read DATA_MAIN_BASE yet. Overwriting it + // with the pending task's id would cause AICore to skip the running task. + int32_t running_id = executing_reg_task_ids[core_id]; + uint64_t cond_val = read_reg(core_id_to_reg_addr_[core_id], RegId::COND); + if (EXTRACT_TASK_ID(cond_val) != running_id || + EXTRACT_TASK_STATE(cond_val) != TASK_ACK_STATE) { + continue; + } + + PTO2TaskSlotState* slot_state = pop_ready_task(shape, thread_idx +#if PTO2_SCHED_PROFILING + , pop_hit, pop_miss + , sched_dispatch_pop_cycle +#endif + ); + if (slot_state) { +#if PTO2_SCHED_PROFILING + phase_dispatch_count++; + uint64_t t_setup_start = get_sys_cnt_aicpu(); +#endif + PTO2TaskDescriptor& task = *slot_state->task; + int32_t slot = payload_slot_[thread_idx][core_id]; + PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id][slot]; + constexpr PTO2SubtaskSlot subslot = (CT == CoreType::AIC) ? + PTO2SubtaskSlot::AIC : PTO2SubtaskSlot::AIV0; + int32_t slot_idx = static_cast(subslot); + build_pto2_payload(payload, task.kernel_id[slot_idx], *slot_state->payload); + payload_slot_[thread_idx][core_id] ^= 1; + hank[core_id].task = reinterpret_cast(&payload); +#if PTO2_PROFILING + if (profiling_enabled) { + running_dispatch_timestamps_[core_id] = dispatch_timestamps_[core_id]; + dispatch_timestamps_[core_id] = get_sys_cnt_aicpu(); + if (core_dispatch_counts_[core_id] >= PLATFORM_PROF_BUFFER_SIZE) { + perf_aicpu_switch_buffer(runtime, core_id, thread_idx); + core_dispatch_counts_[core_id] = 0; + } + core_dispatch_counts_[core_id]++; + } +#endif + dispatch_seq_by_core_[core_id]++; + uint32_t reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; + while (reg_task_id == AICORE_IDLE_TASK_ID || + (reg_task_id + 1) == AICORE_EXIT_SIGNAL) { + dispatch_seq_by_core_[core_id]++; + reg_task_id = dispatch_seq_by_core_[core_id] & TASK_ID_MASK; + } + write_reg(core_id_to_reg_addr_[core_id], RegId::DATA_MAIN_BASE, static_cast(reg_task_id)); + pending_reg_task_ids[core_id] = reg_task_id; + pending_slot_state_by_core_[core_id] = slot_state; + pending_subslot_by_core_[core_id] = subslot; + made_progress = true; +#if PTO2_SCHED_PROFILING + sched_dispatch_setup_cycle += (get_sys_cnt_aicpu() - t_setup_start); +#endif + DEV_DEBUG("Thread %d: Pipeline dispatch PTO2 task %lld to %s core %d (pending)", + thread_idx, (long long)pto2_task_id_raw(task.mixed_task_id), + CT == CoreType::AIC ? "AIC" : "AIV", core_id); + } else { + break; + } + } + } } }; @@ -567,7 +938,7 @@ int32_t AicpuExecutor::handshake_all_cores(Runtime* runtime) { // Step 1: Write per-core payload addresses and send handshake signal // task must be written BEFORE aicpu_ready so AICore sees it after waking up for (int32_t i = 0; i < cores_total_num_; i++) { - all_handshakes[i].task = reinterpret_cast(&s_pto2_payload_per_core[i]); + all_handshakes[i].task = reinterpret_cast(&s_pto2_payload_per_core[i][0]); all_handshakes[i].aicpu_ready = 1; } @@ -649,6 +1020,8 @@ void AicpuExecutor::assign_cores_to_threads() { for (int32_t i = 0; i < thread_num_; i++) { for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + pending_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + payload_slot_[i][j] = 0; } trackers_[i].aic().running_count = 0; trackers_[i].aiv().running_count = 0; @@ -711,29 +1084,41 @@ void AicpuExecutor::reassign_cores_for_all_threads() { // Collect running/idle state from all threads before reassignment int32_t running_cores[128]; int32_t running_task_ids[128]; + int32_t running_pending_ids[128]; + int32_t running_payload_slots[128]; int32_t running_count = 0; bool was_idle[MAX_CORES_PER_THREAD]; + int32_t idle_payload_slots[MAX_CORES_PER_THREAD]; memset(was_idle, 0, sizeof(was_idle)); + memset(idle_payload_slots, 0, sizeof(idle_payload_slots)); for (int32_t i = 0; i < thread_num_; i++) { for (int32_t j = 0; j < trackers_[i].aic().running_count; j++) { int32_t core_id = trackers_[i].aic().running[j]; running_cores[running_count] = core_id; running_task_ids[running_count] = executing_reg_task_ids_[i][core_id]; + running_pending_ids[running_count] = pending_reg_task_ids_[i][core_id]; + running_payload_slots[running_count] = payload_slot_[i][core_id]; running_count++; } for (int32_t j = 0; j < trackers_[i].aic().idle_count; j++) { - was_idle[trackers_[i].aic().idle[j]] = true; + int32_t core_id = trackers_[i].aic().idle[j]; + was_idle[core_id] = true; + idle_payload_slots[core_id] = payload_slot_[i][core_id]; } for (int32_t j = 0; j < trackers_[i].aiv().running_count; j++) { int32_t core_id = trackers_[i].aiv().running[j]; running_cores[running_count] = core_id; running_task_ids[running_count] = executing_reg_task_ids_[i][core_id]; + running_pending_ids[running_count] = pending_reg_task_ids_[i][core_id]; + running_payload_slots[running_count] = payload_slot_[i][core_id]; running_count++; } for (int32_t j = 0; j < trackers_[i].aiv().idle_count; j++) { - was_idle[trackers_[i].aiv().idle[j]] = true; + int32_t core_id = trackers_[i].aiv().idle[j]; + was_idle[core_id] = true; + idle_payload_slots[core_id] = payload_slot_[i][core_id]; } } @@ -748,6 +1133,8 @@ void AicpuExecutor::reassign_cores_for_all_threads() { memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle)); for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + pending_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + payload_slot_[i][j] = 0; } } @@ -759,12 +1146,15 @@ void AicpuExecutor::reassign_cores_for_all_threads() { if (running_cores[j] == worker_id) { type_tracker.running[type_tracker.running_count++] = worker_id; executing_reg_task_ids_[thread_idx][worker_id] = running_task_ids[j]; + pending_reg_task_ids_[thread_idx][worker_id] = running_pending_ids[j]; + payload_slot_[thread_idx][worker_id] = running_payload_slots[j]; return; } } if (was_idle[worker_id]) { type_tracker.idle[type_tracker.idle_count++] = worker_id; tracker.core_idle[worker_id] = true; + payload_slot_[thread_idx][worker_id] = idle_payload_slots[worker_id]; } }; @@ -863,6 +1253,7 @@ int32_t AicpuExecutor::init(Runtime* runtime) { // Reset per-core dispatch timestamps and task counters for (int32_t i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + running_dispatch_timestamps_[i] = 0; core_dispatch_counts_[i] = 0; } @@ -871,6 +1262,7 @@ int32_t AicpuExecutor::init(Runtime* runtime) { memset(dispatch_seq_by_core_, 0, sizeof(dispatch_seq_by_core_)); memset(executing_subslot_by_core_, 0, sizeof(executing_subslot_by_core_)); memset(executing_slot_state_by_core_, 0, sizeof(executing_slot_state_by_core_)); + memset(pending_slot_state_by_core_, 0, sizeof(pending_slot_state_by_core_)); DEV_INFO("Init: PTO2 mode, task count from shared memory"); @@ -906,6 +1298,7 @@ int32_t AicpuExecutor::shutdown_aicore(Runtime* runtime, int32_t thread_idx, con int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t thread_idx) { int32_t &core_num = core_count_per_thread_[thread_idx]; int32_t* executing_reg_task_ids = executing_reg_task_ids_[thread_idx]; + int32_t* pending_reg_task_ids = pending_reg_task_ids_[thread_idx]; CoreStateTracker& tracker = trackers_[thread_idx]; DEV_INFO("Thread %d: resolve_and_dispatch_pto2 entry", thread_idx); @@ -1064,7 +1457,8 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (tracker.aic().running_count > 0) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aic(), tracker.core_idle, hank, executing_reg_task_ids, + thread_idx, tracker.aic(), hank, executing_reg_task_ids, pending_reg_task_ids, + tracker.core_idle, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs @@ -1083,7 +1477,8 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (tracker.aiv().running_count > 0) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aiv(), tracker.core_idle, hank, executing_reg_task_ids, + thread_idx, tracker.aiv(), hank, executing_reg_task_ids, pending_reg_task_ids, + tracker.core_idle, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs @@ -1139,7 +1534,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa while (local_bufs[bi].count > 0) { PTO2TaskSlotState* slot_state = local_bufs[bi].pop(); PTO2ResourceShape shape = pto2_active_mask_to_shape(slot_state->active_mask); - int32_t ci = tracker.find_cluster_for_shape(shape); + int32_t ci = tracker.find_cluster_for_shape(shape, pending_reg_task_ids); if (ci >= 0) { try_pushed = true; @@ -1150,27 +1545,34 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } if (rc.aiv >= 1) { - int32_t aiv0 = tracker.core_idle[c.aiv_core_ids[0]] ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 + int32_t aiv0 = tracker.select_aiv_core( + c.aiv_core_ids[0], c.aiv_core_ids[1], pending_reg_task_ids); + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } @@ -1202,7 +1604,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa rt->scheduler.requeue_ready_task(*overflow_ptrs[i]); } - // Phase 3: Global dispatch — fill remaining idle cores from global readyQ (cluster-based) + // Global dispatch — fill idle and pend-ready cores from global readyQ const PTO2ResourceShape* dispatch_order = get_dispatch_order(thread_idx); for (int32_t si = 0; si < PTO2_NUM_RESOURCE_SHAPES; si++) { @@ -1210,7 +1612,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (rt->scheduler.ready_queues[static_cast(shape)].size() == 0) continue; while (true) { - int32_t ci = tracker.find_cluster_for_shape(shape); + int32_t ci = tracker.find_cluster_for_shape(shape, pending_reg_task_ids); if (ci < 0) break; PTO2TaskSlotState* slot_state = pop_ready_task(shape, thread_idx @@ -1232,28 +1634,34 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } if (rc.aiv >= 1) { - int32_t aiv_id = tracker.core_idle[c.aiv_core_ids[0]] - ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - aiv_id, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 + int32_t aiv_id = tracker.select_aiv_core( + c.aiv_core_ids[0], c.aiv_core_ids[1], pending_reg_task_ids); + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + aiv_id, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, - c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 + dispatch_subtask_to_core(runtime, tracker, + executing_reg_task_ids, pending_reg_task_ids, + c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1, + thread_idx, hank #if PTO2_PROFILING - , profiling_enabled, thread_idx + , profiling_enabled #endif ); } @@ -1269,6 +1677,37 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } } + // Pipeline dispatch — fill running cores with pending tasks from readyQ + if (tracker.aic().running_count > 0 && rt->scheduler.ready_queues[static_cast(PTO2ResourceShape::AIC_ONLY)].size() > 0) { + try_pushed = true; + dispatch_ready_tasks_to_running_cores( + runtime, thread_idx, tracker.aic(), executing_reg_task_ids, pending_reg_task_ids, made_progress, + local_bufs, hank +#if PTO2_PROFILING + , profiling_enabled +#endif +#if PTO2_SCHED_PROFILING + , pop_hit, pop_miss, phase_dispatch_count, + sched_dispatch_pop_cycle, sched_dispatch_setup_cycle +#endif + ); + } + + if (tracker.aiv().running_count > 0 && rt->scheduler.ready_queues[static_cast(PTO2ResourceShape::AIV_X1)].size() > 0) { + try_pushed = true; + dispatch_ready_tasks_to_running_cores( + runtime, thread_idx, tracker.aiv(), executing_reg_task_ids, pending_reg_task_ids, made_progress, + local_bufs, hank +#if PTO2_PROFILING + , profiling_enabled +#endif +#if PTO2_SCHED_PROFILING + , pop_hit, pop_miss, phase_dispatch_count, + sched_dispatch_pop_cycle, sched_dispatch_setup_cycle +#endif + ); + } + #if PTO2_PROFILING if (!try_pushed) { CYCLE_COUNT_LAP(sched_idle_cycle); @@ -1996,6 +2435,7 @@ void AicpuExecutor::deinit(Runtime* runtime) { // Reset per-core dispatch timestamps and task counters for (int32_t i = 0; i < RUNTIME_MAX_WORKER; i++) { dispatch_timestamps_[i] = 0; + running_dispatch_timestamps_[i] = 0; core_dispatch_counts_[i] = 0; } @@ -2004,6 +2444,7 @@ void AicpuExecutor::deinit(Runtime* runtime) { memset(dispatch_seq_by_core_, 0, sizeof(dispatch_seq_by_core_)); memset(executing_subslot_by_core_, 0, sizeof(executing_subslot_by_core_)); memset(executing_slot_state_by_core_, 0, sizeof(executing_slot_state_by_core_)); + memset(pending_slot_state_by_core_, 0, sizeof(pending_slot_state_by_core_)); completed_tasks_.store(0, std::memory_order_release); total_tasks_ = 0; @@ -2031,6 +2472,8 @@ void AicpuExecutor::deinit(Runtime* runtime) { for (int32_t i = 0; i < thread_num_; i++) { for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + pending_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + payload_slot_[i][j] = 0; } } regs_ = 0;