diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index ad2b098d..85803c6f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -81,10 +81,10 @@ struct CoreInfo { }; struct CoreTypeTracker { - int32_t idle[MAX_CORES_PER_THREAD]; - int32_t running[MAX_CORES_PER_THREAD]; int32_t idle_count; int32_t running_count; + int32_t idle[MAX_CORES_PER_THREAD]; + int32_t running[MAX_CORES_PER_THREAD]; void move_idle_to_running(int32_t idx) { running[running_count++] = idle[idx]; @@ -151,6 +151,7 @@ struct CoreStateTracker { struct AicpuExecutor { int32_t orch_thread_num_; int32_t sched_thread_num_; + bool orch_to_sched_{false}; // ===== Thread management state ===== std::atomic thread_idx_{0}; @@ -193,7 +194,7 @@ struct AicpuExecutor { // Track executing register task_id per core (AICPU_TASK_INVALID = idle). // NOTE: this is NOT the mixed_task_id; it is the per-core dispatch id used by the // register protocol (derived from dispatch_seq_by_core_ and masked by TASK_ID_MASK). - int32_t executing_reg_task_ids_[MAX_AICPU_THREADS][MAX_CORES_PER_THREAD]; + int32_t executing_reg_task_ids_[MAX_CORES_PER_THREAD]; CoreStateTracker trackers_[MAX_AICPU_THREADS]; // ===== Task queue state (managed by scheduler ready queues) ===== @@ -202,8 +203,9 @@ struct AicpuExecutor { std::atomic completed_tasks_{0}; int32_t total_tasks_{0}; std::atomic finished_count_{0}; - // Device orchestration: set by Thread 3 when graph is built; workers wait for it - bool orchestrator_done_{false}; + // Device orchestration: set by last orchestrator when graph is built; schedulers poll it. + // volatile prevents the compiler from hoisting the load out of spin loops. + volatile bool orchestrator_done_{false}; std::atomic pto2_init_done_{false}; std::atomic runtime_init_ready_{false}; std::atomic pto2_init_complete_{false}; // init block finished; others wait for this @@ -270,7 +272,6 @@ struct AicpuExecutor { CoreTypeTracker& ct, bool* core_idle, Handshake* hank, - int32_t* executing_reg_task_ids, int32_t& completed_this_turn, int32_t& cur_thread_completed, bool& made_progress, @@ -298,7 +299,7 @@ struct AicpuExecutor { int32_t core_id = ct.running[i]; uint64_t reg_addr = core_id_to_reg_addr_[core_id]; - int32_t expected_reg_task_id = executing_reg_task_ids[core_id]; + int32_t expected_reg_task_id = executing_reg_task_ids_[core_id]; uint64_t reg_val = read_reg(reg_addr, RegId::COND); int32_t reg_task_id = EXTRACT_TASK_ID(reg_val); int32_t reg_state = EXTRACT_TASK_STATE(reg_val); @@ -313,7 +314,7 @@ struct AicpuExecutor { #endif if (done) { - executing_reg_task_ids[core_id] = AICPU_TASK_INVALID; + executing_reg_task_ids_[core_id] = AICPU_TASK_INVALID; PTO2SubtaskSlot subslot = executing_subslot_by_core_[core_id]; PTO2TaskSlotState& slot_state = *executing_slot_state_by_core_[core_id]; @@ -492,12 +493,16 @@ struct AicpuExecutor { return slot_state; } - void dispatch_subtask_to_core( - Runtime* runtime, CoreStateTracker& tracker, int32_t* executing_reg_task_ids, - int32_t core_id, CoreType core_type, PTO2TaskSlotState& slot_state, + void dispatch_subtask_to_core(Runtime* runtime, + CoreStateTracker& tracker, + int32_t core_id, + CoreType core_type, + PTO2TaskSlotState& slot_state, PTO2SubtaskSlot subslot #if PTO2_PROFILING - , bool profiling_enabled, int32_t thread_idx + , + bool profiling_enabled, + int32_t thread_idx #endif ) { PTO2DispatchPayload& payload = s_pto2_payload_per_core[core_id]; @@ -537,7 +542,7 @@ struct AicpuExecutor { int32_t idle_idx = ct.find_idle_index(core_id); ct.move_idle_to_running(idle_idx); tracker.core_idle[core_id] = false; - executing_reg_task_ids[core_id] = reg_task_id; + executing_reg_task_ids_[core_id] = reg_task_id; } }; @@ -646,10 +651,11 @@ void AicpuExecutor::assign_cores_to_threads() { DEV_INFO("Assigning cores (round-robin): %d clusters across %d sched threads (%d AIC, %d AIV)", cluster_count, divisor, aic_count_, aiv_count_); + for (int32_t i = 0; i < MAX_CORES_PER_THREAD; i++) { + executing_reg_task_ids_[i] = AICPU_TASK_INVALID; + } for (int32_t i = 0; i < thread_num_; i++) { - for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { - executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; - } + trackers_[i].aic().running_count = 0; trackers_[i].aiv().running_count = 0; trackers_[i].aic().idle_count = 0; @@ -713,31 +719,17 @@ void AicpuExecutor::reassign_cores_for_all_threads() { thread_num_, aic_count_, aiv_count_); // Collect running/idle state from all threads before reassignment - int32_t running_cores[128]; - int32_t running_task_ids[128]; - int32_t running_count = 0; - - bool was_idle[MAX_CORES_PER_THREAD]; - memset(was_idle, 0, sizeof(was_idle)); + bool running_cores[MAX_CORES_PER_THREAD]; + memset(running_cores, 0, sizeof(running_cores)); for (int32_t i = 0; i < thread_num_; i++) { for (int32_t j = 0; j < trackers_[i].aic().running_count; j++) { int32_t core_id = trackers_[i].aic().running[j]; - running_cores[running_count] = core_id; - running_task_ids[running_count] = executing_reg_task_ids_[i][core_id]; - running_count++; - } - for (int32_t j = 0; j < trackers_[i].aic().idle_count; j++) { - was_idle[trackers_[i].aic().idle[j]] = true; + running_cores[core_id] = true; } for (int32_t j = 0; j < trackers_[i].aiv().running_count; j++) { int32_t core_id = trackers_[i].aiv().running[j]; - running_cores[running_count] = core_id; - running_task_ids[running_count] = executing_reg_task_ids_[i][core_id]; - running_count++; - } - for (int32_t j = 0; j < trackers_[i].aiv().idle_count; j++) { - was_idle[trackers_[i].aiv().idle[j]] = true; + running_cores[core_id] = true; } } @@ -749,28 +741,24 @@ void AicpuExecutor::reassign_cores_for_all_threads() { trackers_[i].aiv().running_count = 0; trackers_[i].aiv().idle_count = 0; trackers_[i].cluster_count = 0; - memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle)); - for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { - executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; - } + // // 这里的初始化可能不需要,因为状态没发生变化 + // memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle)); + // // 这里的初始化可能不需要,因为对于没有重分配到其他核上的core,本身状态不变,而其他核上初始已经修改过了 + // for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { + // executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; + // } } // Restore a single core's running/idle state into its new thread's tracker - auto reassign_core = [&](int32_t worker_id, CoreTypeTracker& type_tracker, - CoreStateTracker& tracker, int32_t thread_idx) { - core_assignments_[thread_idx][core_count_per_thread_[thread_idx]++] = worker_id; - for (int32_t j = 0; j < running_count; j++) { - if (running_cores[j] == worker_id) { + auto reassign_core = + [&](int32_t worker_id, CoreTypeTracker& type_tracker, int32_t thread_idx) { + core_assignments_[thread_idx][core_count_per_thread_[thread_idx]++] = worker_id; + if (running_cores[worker_id]) { type_tracker.running[type_tracker.running_count++] = worker_id; - executing_reg_task_ids_[thread_idx][worker_id] = running_task_ids[j]; - return; + } else { + type_tracker.idle[type_tracker.idle_count++] = worker_id; } - } - if (was_idle[worker_id]) { - type_tracker.idle[type_tracker.idle_count++] = worker_id; - tracker.core_idle[worker_id] = true; - } - }; + }; // Assign whole clusters round-robin across all threads for (int32_t ci = 0; ci < aic_count_; ci++) { @@ -783,9 +771,9 @@ void AicpuExecutor::reassign_cores_for_all_threads() { tracker.clusters[tracker.cluster_count++] = {aic_wid, {aiv0_wid, aiv1_wid}}; - reassign_core(aic_wid, tracker.aic(), tracker, t); - reassign_core(aiv0_wid, tracker.aiv(), tracker, t); - reassign_core(aiv1_wid, tracker.aiv(), tracker, t); + reassign_core(aic_wid, tracker.aic(), t); + reassign_core(aiv0_wid, tracker.aiv(), t); + reassign_core(aiv1_wid, tracker.aiv(), t); } // Log final distribution for verification @@ -818,8 +806,17 @@ int32_t AicpuExecutor::init(Runtime* runtime) { thread_num_ = runtime->sche_cpu_num; orch_thread_num_ = runtime->orch_thread_num; sched_thread_num_ = thread_num_ - orch_thread_num_; + orch_to_sched_ = runtime->orch_to_sched; if (thread_num_ == 0) thread_num_ = 1; + if (!orch_to_sched_ && sched_thread_num_ == 0) { + DEV_ERROR( + "no scheduler and orch not trans to schedulers when finished, maybe you need set env PTO2_ORCH_TO_SCHED=1 " + "or scale down orch number."); + init_failed_.store(true, std::memory_order_release); + return -1; + } + if (thread_num_ < 1 || thread_num_ > MAX_AICPU_THREADS) { DEV_ERROR("Invalid thread_num: %d", thread_num_); init_failed_.store(true, std::memory_order_release); @@ -909,7 +906,6 @@ int32_t AicpuExecutor::shutdown_aicore(Runtime* runtime, int32_t thread_idx, con int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t thread_idx) { int32_t &core_num = core_count_per_thread_[thread_idx]; - int32_t* executing_reg_task_ids = executing_reg_task_ids_[thread_idx]; CoreStateTracker& tracker = trackers_[thread_idx]; DEV_INFO("Thread %d: resolve_and_dispatch_pto2 entry", thread_idx); @@ -1035,7 +1031,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } // Check for core transition request (execute once per thread) - if (!cores_released && transition_requested_.load(std::memory_order_acquire)) { + if (!cores_released && orch_to_sched_ && transition_requested_.load(std::memory_order_acquire)) { if (!reassigned_.load(std::memory_order_acquire)) { wait_reassign_.fetch_add(1, std::memory_order_release); while (!reassigned_.load(std::memory_order_acquire)) { @@ -1068,7 +1064,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (tracker.aic().running_count > 0) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aic(), tracker.core_idle, hank, executing_reg_task_ids, + thread_idx, tracker.aic(), tracker.core_idle, hank, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs @@ -1087,7 +1083,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (tracker.aiv().running_count > 0) { try_completed = true; check_running_cores_for_completion( - thread_idx, tracker.aiv(), tracker.core_idle, hank, executing_reg_task_ids, + thread_idx, tracker.aiv(), tracker.core_idle, hank, completed_this_turn, cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, local_bufs @@ -1154,7 +1150,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1163,7 +1159,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa } if (rc.aiv >= 1) { int32_t aiv0 = tracker.core_idle[c.aiv_core_ids[0]] ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, aiv0, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1171,7 +1167,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1236,7 +1232,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ResourceCount rc = shape_resource_count(shape); if (rc.aic) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aic_core_id, CoreType::AIC, *slot_state, PTO2SubtaskSlot::AIC #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1246,7 +1242,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa if (rc.aiv >= 1) { int32_t aiv_id = tracker.core_idle[c.aiv_core_ids[0]] ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, aiv_id, CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV0 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1254,7 +1250,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa ); } if (rc.aiv >= 2) { - dispatch_subtask_to_core(runtime, tracker, executing_reg_task_ids, + dispatch_subtask_to_core(runtime, tracker, c.aiv_core_ids[1], CoreType::AIV, *slot_state, PTO2SubtaskSlot::AIV1 #if PTO2_PROFILING , profiling_enabled, thread_idx @@ -1368,7 +1364,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Dump AIC running cores for (int32_t ci = 0; ci < tracker.aic().running_count && ci < STALL_DUMP_CORE_MAX; ci++) { int32_t cid = tracker.aic().running[ci]; - int32_t sw_tid = executing_reg_task_ids[cid]; + int32_t sw_tid = executing_reg_task_ids_[cid]; int32_t hw_kernel = -1; if (sw_tid >= 0 && executing_slot_state_by_core_[cid]) { int32_t diag_slot = static_cast(executing_subslot_by_core_[cid]); @@ -1383,7 +1379,7 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa // Dump AIV running cores for (int32_t ci = 0; ci < tracker.aiv().running_count && ci < STALL_DUMP_CORE_MAX; ci++) { int32_t cid = tracker.aiv().running[ci]; - int32_t sw_tid = executing_reg_task_ids[cid]; + int32_t sw_tid = executing_reg_task_ids_[cid]; int32_t hw_kernel = -1; if (sw_tid >= 0 && executing_slot_state_by_core_[cid]) { int32_t diag_slot = static_cast(executing_subslot_by_core_[cid]); @@ -1912,12 +1908,16 @@ int32_t AicpuExecutor::run(Runtime* runtime) { } } +#if PTO2_ORCH_PROFILING + uint64_t reassign_cycle_start = get_sys_cnt_aicpu(); +#endif + // Skip core transition on fatal error — cores already shut down above if (completed_.load(std::memory_order_acquire)) { // Signal transition to unblock scheduler threads waiting at core transition transition_requested_.store(true, std::memory_order_release); reassigned_.store(true, std::memory_order_release); - } else { + } else if (orch_to_sched_) { // Compute new core assignments for all threads and initialize donated slots DEV_INFO("Thread %d: Set orchestrator_done=true, requesting core transition", thread_idx); #if PTO2_PROFILING @@ -1941,24 +1941,35 @@ int32_t AicpuExecutor::run(Runtime* runtime) { reassigned_.store(true, std::memory_order_release); } } + +#if PTO2_ORCH_PROFILING + uint64_t reassign_cycle_end = get_sys_cnt_aicpu(); + DEV_ALWAYS("Thread %d: reassign, cost %.3fus (orch_idx=%d)", + thread_idx, + cycles_to_us(reassign_cycle_end - reassign_cycle_start), + orch_idx); +#endif } else { // Non-last orchestrator: wait for last orchestrator to finish setup - while (!transition_requested_.load(std::memory_order_acquire)) { - SPIN_WAIT_HINT(); - } - while (!reassigned_.load(std::memory_order_acquire)) { - if (completed_.load(std::memory_order_acquire)) { - break; + if (orch_to_sched_) { + while (!transition_requested_.load(std::memory_order_acquire)) { + SPIN_WAIT_HINT(); + } + while (!reassigned_.load(std::memory_order_acquire)) { + if (completed_.load(std::memory_order_acquire)) { + break; + } + SPIN_WAIT_HINT(); } - SPIN_WAIT_HINT(); } } } DEV_INFO("Thread %d: Orchestrator completed (orch_idx=%d)", thread_idx, orch_idx); } - // Scheduler thread - if (!completed_.load(std::memory_order_acquire)) { + // Scheduler thread (orchestrator threads skip dispatch when orch_to_sched_ is false) + if (!completed_.load(std::memory_order_acquire) && + (thread_idx < sched_thread_num_ || orch_to_sched_)) { DEV_ALWAYS("Thread %d: Starting PTO2 dispatch", thread_idx); // Device orchestration: wait for primary orchestrator to initialize SM header if (!runtime->get_orch_built_on_host()) { @@ -2050,11 +2061,7 @@ void AicpuExecutor::deinit(Runtime* runtime) { // Reset register-related state for (int32_t i = 0; i < MAX_CORES_PER_THREAD; i++) { core_id_to_reg_addr_[i] = 0; - } - for (int32_t i = 0; i < thread_num_; i++) { - for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) { - executing_reg_task_ids_[i][j] = AICPU_TASK_INVALID; - } + executing_reg_task_ids_[i] = AICPU_TASK_INVALID; } regs_ = 0; @@ -2122,7 +2129,7 @@ void AicpuExecutor::diagnose_stuck_state(Runtime* runtime, int32_t thread_idx, uint64_t reg_val = read_reg(reg_addr, RegId::COND); int32_t reg_task_id = EXTRACT_TASK_ID(reg_val); int32_t reg_state = EXTRACT_TASK_STATE(reg_val); - int32_t task_id = executing_reg_task_ids_[thread_idx][core_id]; + int32_t task_id = executing_reg_task_ids_[core_id]; if (reg_state != TASK_FIN_STATE || task_id >= 0) { busy_cores++; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp index ae22d562..3b3c5e33 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/host/runtime_maker.cpp @@ -261,6 +261,15 @@ extern "C" int init_runtime_impl(Runtime *runtime, LOG_INFO("Ready queue shards: %d", runtime->ready_queue_shards); } + // Read orchestrator-to-scheduler transition flag from environment + { + const char* env_val = std::getenv("PTO2_ORCH_TO_SCHED"); + if (env_val && (env_val[0] == '1' || env_val[0] == 't' || env_val[0] == 'T')) { + runtime->orch_to_sched = true; + } + LOG_INFO("Orchestrator-to-scheduler transition: %s", runtime->orch_to_sched ? "enabled" : "disabled"); + } + // Read ring buffer size overrides from environment { runtime->pto2_task_window_size = parse_env_uint64("PTO2_RING_TASK_WINDOW", 4, true); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp index f4ccf0d5..85501c47 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp @@ -27,6 +27,7 @@ Runtime::Runtime() { pto2_task_window_size = 0; pto2_heap_size = 0; pto2_dep_pool_size = 0; + orch_to_sched = false; // Initialize tensor pairs tensor_pair_count = 0; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h index 62508b8b..4097f00f 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h @@ -158,6 +158,12 @@ class Runtime { // Profiling support bool enable_profiling; // Enable profiling flag + + // Orchestrator-to-scheduler transition control + // When true, orchestrator threads convert to scheduler threads after orchestration completes. + // When false (default), orchestrator threads exit after orchestration without dispatching tasks. + // Controlled via PTO2_ORCH_TO_SCHED environment variable. + bool orch_to_sched; uint64_t perf_data_base; // Performance data shared memory base address (device-side) private: