From 72fb6e81d0c1404c09f0ba169ee2753225f9eaec Mon Sep 17 00:00:00 2001 From: wangzihao122 Date: Wed, 18 Mar 2026 17:49:05 +0800 Subject: [PATCH] Update: promote task_id to mixed_task_id (u64) across profiling pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the 32-bit task_id field in PerfRecord and AicpuPhaseRecord with the full 64-bit mixed_task_id (ring_id<<32 | local_id) for cross-view correlation in multi-ring profiling scenarios. Platform: - PerfRecord: task_id (u32) → mixed_task_id (u64); fanout[] int32_t → uint64_t; add fanout_filled flag to skip duplicate host-side fill - AicpuPhaseRecord: replace tasks_processed (u32) + padding with a union of mixed_task_id / tasks_processed (both u64) - Update perf_aicpu_record_phase/orch_phase signatures to u64 tensormap_and_ringbuffer: - Replace pto2_slot_states_ptr_ (single) with per-ring array pto2_ring_slot_states_ptrs_[PTO2_MAX_RING_DEPTH] - complete_perf_records: skip fanout_filled records, decode ring_id from mixed_task_id, look up per-ring slot states, store u64 fanout entries - aicpu_executor: overwrite mixed_task_id with full pto2_task_id_raw, set fanout_filled=1, register per-ring slot states host_build_graph: - Match PerfRecord by mixed_task_id; store fanout as u64; set fanout_filled swimlane_converter: - Add format_task_display() to render mixed_task_id as r{ring}t{local} - Replace orch_finalize→orch_fanin flow arrows (use fanin end time) --- .../aicore/performance_collector_aicore.h | 4 +- .../aicpu/performance_collector_aicpu.h | 8 +- .../platform/include/common/perf_profiling.h | 15 +- .../src/aicpu/performance_collector_aicpu.cpp | 9 +- .../src/host/performance_collector.cpp | 10 +- .../host_build_graph/aicpu/aicpu_executor.cpp | 4 +- .../host_build_graph/runtime/runtime.cpp | 10 +- .../aicpu/aicpu_executor.cpp | 16 ++- .../runtime/pto_orchestrator.cpp | 16 +-- .../runtime/pto_runtime2_types.h | 1 + .../runtime/runtime.cpp | 62 ++++---- .../runtime/runtime.h | 5 +- tools/swimlane_converter.py | 134 ++++++++++-------- 13 files changed, 162 insertions(+), 132 deletions(-) diff --git a/src/a2a3/platform/include/aicore/performance_collector_aicore.h b/src/a2a3/platform/include/aicore/performance_collector_aicore.h index 2c25797f..95664439 100644 --- a/src/a2a3/platform/include/aicore/performance_collector_aicore.h +++ b/src/a2a3/platform/include/aicore/performance_collector_aicore.h @@ -58,7 +58,9 @@ static inline void perf_aicore_record_task( record->start_time = start_time; record->end_time = end_time; record->kernel_ready_time = kernel_ready_time; - record->task_id = task_id; + record->mixed_task_id = task_id; + record->fanout_count = 0; + record->fanout_filled = 0; perf_buf->count = idx + 1; diff --git a/src/a2a3/platform/include/aicpu/performance_collector_aicpu.h b/src/a2a3/platform/include/aicpu/performance_collector_aicpu.h index ace0809b..d10610df 100644 --- a/src/a2a3/platform/include/aicpu/performance_collector_aicpu.h +++ b/src/a2a3/platform/include/aicpu/performance_collector_aicpu.h @@ -101,12 +101,12 @@ void perf_aicpu_init_phase_profiling(Runtime* runtime, int num_sched_threads, in * @param start_time Phase start timestamp * @param end_time Phase end timestamp * @param loop_iter Current loop iteration number - * @param tasks_processed Number of tasks processed in this phase + * @param tasks_processed Number of tasks processed (scheduler) or mixed_task_id raw (orchestrator) */ void perf_aicpu_record_phase(int thread_idx, AicpuPhaseId phase_id, uint64_t start_time, uint64_t end_time, - uint32_t loop_iter, uint32_t tasks_processed); + uint32_t loop_iter, uint64_t tasks_processed); /** * Write orchestrator cumulative summary @@ -138,11 +138,11 @@ void perf_aicpu_set_orch_thread_idx(int thread_idx); * @param start_time Phase start timestamp * @param end_time Phase end timestamp * @param submit_idx Task submission index (acts as loop_iter) - * @param task_id Task ID (stored in tasks_processed field for task tracking) + * @param mixed_task_id Mixed task id raw value (pto2_task_id_raw) for cross-view correlation */ void perf_aicpu_record_orch_phase(AicpuPhaseId phase_id, uint64_t start_time, uint64_t end_time, - uint32_t submit_idx, uint32_t task_id); + uint32_t submit_idx, uint64_t mixed_task_id); /** * Write core-to-thread assignment mapping to shared memory diff --git a/src/a2a3/platform/include/common/perf_profiling.h b/src/a2a3/platform/include/common/perf_profiling.h index 462cdc74..b29e6c7e 100644 --- a/src/a2a3/platform/include/common/perf_profiling.h +++ b/src/a2a3/platform/include/common/perf_profiling.h @@ -75,15 +75,16 @@ struct PerfRecord { uint64_t finish_time; // AICPU timestamp: when AICPU observed task completion (task_status back to 0) // Task identification - uint32_t task_id; // Register dispatch id (per-core monotonic counter, NOT mixed_task_id). - // May collide across cores; use (ring_id, task_id, core_id) as unique key. + uint64_t mixed_task_id; // pto2_task_id_raw (ring_id<<32 | local_id) for cross-view correlation. + // Written by AICore as dispatch counter; overwritten by AICPU executor. uint32_t func_id; // Kernel function identifier CoreType core_type; // Core type (AIC/AIV) uint8_t ring_id; // Ring layer (0 for single-ring / legacy) // Dependency relationship (fanout only) - int32_t fanout[RUNTIME_MAX_FANOUT]; // Successor task ID array - int32_t fanout_count; // Number of successor tasks + uint64_t fanout[RUNTIME_MAX_FANOUT]; // Successor task mixed_task_id array + int32_t fanout_count; // Number of successor tasks + uint8_t fanout_filled; // 1: fanout has been populated by AICPU or fallback } __attribute__((aligned(64))); static_assert(sizeof(PerfRecord) % 64 == 0, @@ -262,8 +263,10 @@ struct AicpuPhaseRecord { uint64_t end_time; // Phase end timestamp uint32_t loop_iter; // Loop iteration number AicpuPhaseId phase_id; // Phase type - uint32_t tasks_processed; // Tasks processed in this phase - uint32_t padding; // Alignment padding + union { + uint64_t mixed_task_id; // Orchestrator phases: pto2_task_id_raw for cross-view correlation + uint64_t tasks_processed; // Scheduler phases: number of tasks processed in this batch + }; }; /** diff --git a/src/a2a3/platform/src/aicpu/performance_collector_aicpu.cpp b/src/a2a3/platform/src/aicpu/performance_collector_aicpu.cpp index e461bfc0..2e5fe5a9 100644 --- a/src/a2a3/platform/src/aicpu/performance_collector_aicpu.cpp +++ b/src/a2a3/platform/src/aicpu/performance_collector_aicpu.cpp @@ -389,7 +389,7 @@ static void switch_phase_buffer(int thread_idx) { void perf_aicpu_record_phase(int thread_idx, AicpuPhaseId phase_id, uint64_t start_time, uint64_t end_time, - uint32_t loop_iter, uint32_t tasks_processed) { + uint32_t loop_iter, uint64_t tasks_processed) { if (s_phase_header == nullptr) { return; } @@ -440,8 +440,7 @@ void perf_aicpu_record_phase(int thread_idx, record->end_time = end_time; record->loop_iter = loop_iter; record->phase_id = phase_id; - record->tasks_processed = tasks_processed; - record->padding = 0; + record->mixed_task_id = tasks_processed; buf->count = idx + 1; } @@ -470,9 +469,9 @@ void perf_aicpu_set_orch_thread_idx(int thread_idx) { void perf_aicpu_record_orch_phase(AicpuPhaseId phase_id, uint64_t start_time, uint64_t end_time, - uint32_t submit_idx, uint32_t task_id) { + uint32_t submit_idx, uint64_t mixed_task_id) { if (s_orch_thread_idx < 0 || s_phase_header == nullptr) return; - perf_aicpu_record_phase(s_orch_thread_idx, phase_id, start_time, end_time, submit_idx, task_id); + perf_aicpu_record_phase(s_orch_thread_idx, phase_id, start_time, end_time, submit_idx, mixed_task_id); } void perf_aicpu_flush_phase_buffers(int thread_idx) { diff --git a/src/a2a3/platform/src/host/performance_collector.cpp b/src/a2a3/platform/src/host/performance_collector.cpp index 5f2ad3b1..134cb8c1 100644 --- a/src/a2a3/platform/src/host/performance_collector.cpp +++ b/src/a2a3/platform/src/host/performance_collector.cpp @@ -919,7 +919,7 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { // Sort by task_id std::sort(tagged_records.begin(), tagged_records.end(), [](const TaggedRecord& a, const TaggedRecord& b) { - return a.record->task_id < b.record->task_id; + return a.record->mixed_task_id < b.record->mixed_task_id; }); // Step 4: Calculate base time (minimum kernel_ready_time, including phase timestamps) @@ -930,8 +930,8 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { } if (tagged.record->dispatch_time < base_time_cycles && tagged.record->dispatch_time > 0) { base_time_cycles = tagged.record->dispatch_time; - LOG_WARN("Timestamp violation: dispatch_time (%lu) < base_time (%lu) for task %u, using dispatch_time as new base_time", - tagged.record->dispatch_time, base_time_cycles, tagged.record->task_id); + LOG_WARN("Timestamp violation: dispatch_time (%lu) < base_time (%lu) for task %llu, using dispatch_time as new base_time", + tagged.record->dispatch_time, base_time_cycles, (unsigned long long)tagged.record->mixed_task_id); } } @@ -987,7 +987,7 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { const char* core_type_str = (record.core_type == CoreType::AIC) ? "aic" : "aiv"; outfile << " {\n"; - outfile << " \"task_id\": " << record.task_id << ",\n"; + outfile << " \"task_id\": " << record.mixed_task_id << ",\n"; outfile << " \"func_id\": " << record.func_id << ",\n"; outfile << " \"core_id\": " << tagged.core_id << ",\n"; outfile << " \"core_type\": \"" << core_type_str << "\",\n"; @@ -1113,7 +1113,7 @@ int PerformanceCollector::export_swimlane_json(const std::string& output_path) { << ", \"start_time_us\": " << std::fixed << std::setprecision(3) << start_us << ", \"end_time_us\": " << std::fixed << std::setprecision(3) << end_us << ", \"submit_idx\": " << pr.loop_iter - << ", \"task_id\": " << static_cast(pr.tasks_processed) + << ", \"task_id\": " << static_cast(pr.mixed_task_id) << "}"; first = false; } diff --git a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp index d2923711..0ca620ba 100644 --- a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp @@ -632,7 +632,7 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const uint32_t count = perf_buf->count; if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; - if (record->task_id == static_cast(completed_task_id)) { + if (record->mixed_task_id == static_cast(completed_task_id)) { record->func_id = runtime.tasks[completed_task_id].func_id; record->core_type = h->core_type; perf_aicpu_record_dispatch_and_finish_time( @@ -769,7 +769,7 @@ int AicpuExecutor::resolve_and_dispatch(Runtime& runtime, int thread_idx, const uint32_t count = perf_buf->count; if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; - if (record->task_id == static_cast(completed_task_id)) { + if (record->mixed_task_id == static_cast(completed_task_id)) { record->func_id = runtime.tasks[completed_task_id].func_id; record->core_type = h->core_type; perf_aicpu_record_dispatch_and_finish_time( diff --git a/src/a2a3/runtime/host_build_graph/runtime/runtime.cpp b/src/a2a3/runtime/host_build_graph/runtime/runtime.cpp index 96fa4854..3744ff36 100644 --- a/src/a2a3/runtime/host_build_graph/runtime/runtime.cpp +++ b/src/a2a3/runtime/host_build_graph/runtime/runtime.cpp @@ -228,18 +228,20 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { for (uint32_t i = 0; i < count; i++) { PerfRecord* record = &perf_buf->records[i]; - uint32_t task_id = record->task_id; + // In host_build_graph, AICore writes a plain uint32_t dispatch counter into + // mixed_task_id (upper 32 bits are always 0), so truncating to uint32_t is safe. + uint32_t task_id = static_cast(record->mixed_task_id); // Query Task by task_id (O(1) array indexing) Task* task = get_task(task_id); + record->fanout_count = 0; if (task != nullptr) { record->fanout_count = task->fanout_count; for (int32_t j = 0; j < task->fanout_count; j++) { - record->fanout[j] = task->fanout[j]; + record->fanout[j] = static_cast(task->fanout[j]); } - } else { - record->fanout_count = 0; } + record->fanout_filled = 1; } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index ad2b098d..0bc00f29 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -367,9 +367,11 @@ struct AicpuExecutor { uint32_t count = perf_buf->count; if (count > 0) { PerfRecord* record = &perf_buf->records[count - 1]; - if (record->task_id == static_cast(expected_reg_task_id)) { + if (record->mixed_task_id == static_cast(expected_reg_task_id)) { // Fill metadata that AICore doesn't know int32_t perf_slot_idx = static_cast(executing_subslot_by_core_[core_id]); + // Overwrite with full mixed_task_id for cross-view correlation. + record->mixed_task_id = pto2_task_id_raw(slot_state.task->mixed_task_id); record->func_id = slot_state.task->kernel_id[perf_slot_idx]; record->core_type = CT; perf_aicpu_record_dispatch_and_finish_time( @@ -384,10 +386,11 @@ struct AicpuExecutor { record->fanout_count = 0; PTO2DepListEntry* cur = slot_state.fanout_head; while (cur != nullptr && record->fanout_count < RUNTIME_MAX_FANOUT) { - record->fanout[record->fanout_count++] = static_cast( - pto2_task_id_local(cur->slot_state->task->mixed_task_id)); + record->fanout[record->fanout_count++] = + pto2_task_id_raw(cur->slot_state->task->mixed_task_id); cur = cur->next; } + record->fanout_filled = 1; } } #if PTO2_SCHED_PROFILING @@ -1716,9 +1719,10 @@ int32_t AicpuExecutor::run(Runtime* runtime) { } #endif - // With multi-ring, slot_states are per-ring inside the scheduler. - // Fanout fill-in in complete_perf_records is disabled (slot_states_ptr = nullptr). - runtime->set_pto2_slot_states_ptr(nullptr); + // Register per-ring slot states for complete_perf_records fallback. + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + runtime->set_pto2_ring_slot_states_ptr(r, rt->scheduler.ring_sched_states[r].slot_states); + } // Store shared state for other orchestrator threads orch_func_ = orch_func; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index d62e0f9a..56192ca3 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -43,7 +43,7 @@ __attribute__((weak, visibility("hidden"))) uint64_t get_sys_cnt_aicpu() { retur // The strong symbol from the AICPU build wins when profiling is available. // Also hidden to prevent HOST .so from polluting the global symbol table. __attribute__((weak, visibility("hidden"))) void perf_aicpu_record_orch_phase( - AicpuPhaseId, uint64_t, uint64_t, uint32_t, uint32_t) {} + AicpuPhaseId, uint64_t, uint64_t, uint32_t, uint64_t) {} // Accumulated cycles per sub-step (only needed for ORCH_PROFILING export) static uint64_t g_orch_sync_cycle = 0; // tensormap sync static uint64_t g_orch_alloc_cycle = 0; // task ring alloc @@ -78,7 +78,7 @@ uint64_t g_orch_scope_end_atomic_count = 0; #include "aicpu/performance_collector_aicpu.h" __attribute__((weak, visibility("hidden"))) uint64_t get_sys_cnt_aicpu() { return 0; } __attribute__((weak, visibility("hidden"))) void perf_aicpu_record_orch_phase( - AicpuPhaseId, uint64_t, uint64_t, uint32_t, uint32_t) {} + AicpuPhaseId, uint64_t, uint64_t, uint32_t, uint64_t) {} // submit_idx needed for swimlane task_id tagging (no cycle accumulation at this level) static uint32_t g_orch_submit_idx = 0; #define CYCLE_COUNT_START() \ @@ -387,7 +387,7 @@ void pto2_submit_mixed_task( PTO2TaskSlotState* fanin_states[PTO2_MAX_INPUTS]; int32_t fanin_count = 0; - CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC, static_cast(mixed_task_id)); // === STEP 2: Calculate output size + heap alloc (read from params only, no GM access) === int32_t total_output_size = 0; @@ -405,7 +405,7 @@ void pto2_submit_mixed_task( if (!local_packed_base) { orch->fatal = true; return; } local_packed_end = (char*)local_packed_base + total_output_size; } - CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP, static_cast(mixed_task_id)); #if PTO2_ORCH_PROFILING if (total_output_size > 0) { g_orch_heap_atomic_count += 1; // heap_top.store in pto2_alloc_packed_buffer @@ -484,7 +484,7 @@ void pto2_submit_mixed_task( } } - CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_lookup_cycle, AicpuPhaseId::ORCH_LOOKUP, static_cast(mixed_task_id)); // === STEP 5: Register outputs/inouts in TensorMap (must be separate from lookup) === for (int i = 0; i < params.tensor_count; i++) { @@ -496,7 +496,7 @@ void pto2_submit_mixed_task( } } - CYCLE_COUNT_LAP_RECORD(g_orch_insert_cycle, AicpuPhaseId::ORCH_INSERT, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_insert_cycle, AicpuPhaseId::ORCH_INSERT, static_cast(mixed_task_id)); // === STEP 6: Batch-write to GM (single cache line burst) === // Deferred from allocation phase to avoid scattered GM writes that get @@ -521,7 +521,7 @@ void pto2_submit_mixed_task( payload->init(params); - CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_params_cycle, AicpuPhaseId::ORCH_PARAMS, static_cast(mixed_task_id)); #if PTO2_ORCH_PROFILING g_orch_params_atomic_count += 2; // fanout_lock.store + fanout_count.store #endif @@ -586,7 +586,7 @@ void pto2_submit_mixed_task( #endif } - CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN, local_id); + CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN, static_cast(mixed_task_id)); #if PTO2_PROFILING orch->tasks_submitted++; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index 141be544..742b31b4 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -125,6 +125,7 @@ struct PTO2TaskId { constexpr uint8_t ring() const { return static_cast(raw >> 32); } constexpr uint32_t local() const { return static_cast(raw & 0xFFFFFFFFu); } + constexpr explicit operator uint64_t() const { return raw; } constexpr bool operator==(const PTO2TaskId& other) const { return raw == other.raw; } constexpr bool operator!=(const PTO2TaskId& other) const { return raw != other.raw; } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp index f4ccf0d5..88abad7c 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.cpp @@ -35,7 +35,9 @@ Runtime::Runtime() { orch_built_on_host_ = true; pto2_gm_sm_ptr_ = nullptr; pto2_gm_heap_ptr_ = nullptr; - pto2_slot_states_ptr_ = nullptr; + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + pto2_ring_slot_states_ptrs_[r] = nullptr; + } orch_args_ = nullptr; orch_arg_count_ = 0; @@ -95,7 +97,11 @@ int Runtime::get_orch_arg_count() const { return orch_arg_count_; } void Runtime::set_orch_built_on_host(bool v) { orch_built_on_host_ = v; } void Runtime::set_pto2_gm_sm_ptr(void* p) { pto2_gm_sm_ptr_ = p; } void Runtime::set_pto2_gm_heap(void* p) { pto2_gm_heap_ptr_ = p; } -void Runtime::set_pto2_slot_states_ptr(void* p) { pto2_slot_states_ptr_ = p; } +void Runtime::set_pto2_ring_slot_states_ptr(int ring_id, void* p) { + if (ring_id >= 0 && ring_id < PTO2_MAX_RING_DEPTH) { + pto2_ring_slot_states_ptrs_[ring_id] = p; + } +} void Runtime::set_orch_args(uint64_t* args, int count) { orch_arg_count_ = count <= RUNTIME_MAX_ARGS ? count : RUNTIME_MAX_ARGS; if (args && orch_arg_count_ > 0) { @@ -164,49 +170,43 @@ void Runtime::complete_perf_records(PerfBuffer* perf_buf) { // Get PTO2 shared memory context void* sm_base = get_pto2_gm_sm_ptr(); if (sm_base == nullptr) { - // No PTO2 context, cannot complete records - return; - } - - // Get slot states for fanout traversal - // With multi-ring, slot_states are per-ring inside the scheduler and - // pto2_slot_states_ptr_ is nullptr. Fanout and ring_id are filled on the - // AICPU side (aicpu_executor.cpp) where slot_state is directly available. - PTO2TaskSlotState* slot_states = static_cast(pto2_slot_states_ptr_); - if (slot_states == nullptr) { return; } - // Get window mask from shared memory header (ring 0 for legacy single-ring path) PTO2SharedMemoryHeader* header = static_cast(sm_base); - int32_t window_mask = static_cast(header->rings[0].task_window_size) - 1; - uint32_t count = perf_buf->count; for (uint32_t i = 0; i < count; i++) { PerfRecord* record = &perf_buf->records[i]; - int32_t task_id = record->task_id; - // Get slot state for fanout traversal - int32_t slot = task_id & window_mask; + // Already filled by AICPU side at task completion time. + if (record->fanout_filled != 0) { + continue; + } + + PTO2TaskId task_id{record->mixed_task_id}; + uint8_t ring_id = task_id.ring(); + if (ring_id >= PTO2_MAX_RING_DEPTH) { + continue; + } + + PTO2TaskSlotState* slot_states = static_cast(pto2_ring_slot_states_ptrs_[ring_id]); + if (slot_states == nullptr) { + continue; + } + + int32_t window_mask = static_cast(header->rings[ring_id].task_window_size) - 1; + int32_t slot = static_cast(task_id.local()) & window_mask; PTO2TaskSlotState& ss = slot_states[slot]; - // Fill fanout information by traversing the linked list - record->fanout_count = 0; + // Fill fanout by traversing the linked list (best-effort: no lock, see aicpu_executor.cpp) PTO2DepListEntry* cur = ss.fanout_head; - + record->fanout_count = 0; while (cur != nullptr && record->fanout_count < RUNTIME_MAX_FANOUT) { - // PerfRecord.fanout stores 32-bit legacy task IDs. Our multi-ring task ID - // encodes ring_id in the upper 32 bits, so only the legacy single-ring - // case (ring_id==0) is representable here. - uint64_t mixed = pto2_task_id_raw(cur->slot_state->task->mixed_task_id); - if ((mixed >> 32) != 0) { - // Skip: cannot represent (ring_id, local_id) in a 32-bit fanout slot. - cur = cur->next; - continue; - } - record->fanout[record->fanout_count++] = static_cast(mixed & 0xFFFFFFFFu); + record->fanout[record->fanout_count++] = + static_cast(cur->slot_state->task->mixed_task_id); cur = cur->next; } + record->fanout_filled = 1; } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h index 62508b8b..5fae7294 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/runtime.h @@ -25,6 +25,7 @@ #include "common/perf_profiling.h" #include "common/platform_config.h" #include "pto2_dispatch_payload.h" +#include "pto_runtime2_types.h" // ============================================================================= // Configuration Macros @@ -173,7 +174,7 @@ class Runtime { bool orch_built_on_host_; void* pto2_gm_sm_ptr_; // GM pointer to PTO2 shared memory (device) void* pto2_gm_heap_ptr_; // GM heap for orchestrator output buffers (device) - void* pto2_slot_states_ptr_; // Pointer to PTO2TaskSlotState array (scheduler-private, for profiling) + void* pto2_ring_slot_states_ptrs_[PTO2_MAX_RING_DEPTH]; // Per-ring PTO2TaskSlotState arrays (for profiling fallback) uint64_t* orch_args_; // Arguments for device orchestration int orch_arg_count_; uint64_t orch_args_storage_[RUNTIME_MAX_ARGS]; // Copy of args for device @@ -239,7 +240,7 @@ class Runtime { void set_orch_built_on_host(bool v); void set_pto2_gm_sm_ptr(void* p); void set_pto2_gm_heap(void* p); - void set_pto2_slot_states_ptr(void* p); + void set_pto2_ring_slot_states_ptr(int ring_id, void* p); void set_orch_args(uint64_t* args, int count); // Device orchestration SO binary (for dlopen on AICPU thread 3) diff --git a/tools/swimlane_converter.py b/tools/swimlane_converter.py index 77490a67..1d521fc4 100644 --- a/tools/swimlane_converter.py +++ b/tools/swimlane_converter.py @@ -150,6 +150,23 @@ def parse_sched_cpu_from_device_log(log_path, task_count): } +def format_task_display(task_id): + """Format task id for labels. + + task_id in perf JSON may be mixed_task_id (ring_id<<32 | local_id). + Display as: + - ring_id > 0: r{ring_id}t{local_id} + - ring_id == 0: t{local_id} + """ + if not isinstance(task_id, int) or task_id < 0: + return str(task_id) + ring_id = (task_id >> 32) & 0xFFFFFFFF + local_id = task_id & 0xFFFFFFFF + if ring_id > 0: + return f"r{ring_id}t{local_id}" + return f"t{local_id}" + + def print_task_statistics(tasks, func_id_to_name=None, sched_info=None): """Print task statistics grouped by func_id. @@ -425,11 +442,11 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose func_id = task['func_id'] if func_id_to_name and str(func_id) in func_id_to_name: func_name = func_id_to_name[str(func_id)] - # New format: FuncName(task_id) - task_name = f"{func_name}({task['task_id']})" + # New format: FuncName(t{task_id}) + task_name = f"{func_name}({format_task_display(task['task_id'])})" else: - # Fallback format: Func_{func_id}(task_id) - task_name = f"Func_{func_id}({task['task_id']})" + # Fallback format: Func_{func_id}(t{task_id}) + task_name = f"Func_{func_id}({format_task_display(task['task_id'])})" events.append({ "args": { @@ -468,9 +485,9 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose func_id = task['func_id'] if func_id_to_name and str(func_id) in func_id_to_name: func_name = func_id_to_name[str(func_id)] - task_name = f"{func_name}({task['task_id']})" + task_name = f"{func_name}({format_task_display(task['task_id'])})" else: - task_name = f"Func_{func_id}({task['task_id']})" + task_name = f"Func_{func_id}({format_task_display(task['task_id'])})" events.append({ "args": { @@ -680,7 +697,7 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose # Show task_id in name for task-specific phases if task_id >= 0: - label = f"{display_name}(t{task_id})" + label = f"{display_name}({format_task_display(task_id)})" else: label = f"{display_name}({submit_idx})" @@ -867,62 +884,63 @@ def generate_chrome_trace_json(tasks, output_path, func_id_to_name=None, verbose }) flow_id += 1 - # Orchestrator FINALIZE → Scheduler DISPATCH arrows (per task_id) - if orchestrator_phases and scheduler_phases: - # Flatten per-thread orch phases and track source thread - orch_finalize_by_task = {} + # Orchestrator FANIN(end) → Scheduler DISPATCH(start) arrows (per mixed_task_id) + if orchestrator_phases and scheduler_phases and has_aicpu_data: + # Build mixed_task_id -> (fanin_record, orch_thread_idx); keep latest end_time_us on collision. + orch_fanin_by_task = {} for orch_idx, thread_records in enumerate(orch_threads): for record in thread_records: - if record.get("phase") == "orch_finalize": - task_id = record.get("task_id", -1) - if task_id >= 0: - orch_finalize_by_task[task_id] = (record, orch_idx) - - # Use core_to_sched_thread mapping (built above) to find the correct - # scheduler thread for each task's core. - if orch_finalize_by_task and has_aicpu_data: - for task in tasks: - tid = task.get('task_id') - if tid is None or tid not in orch_finalize_by_task: + if record.get("phase") != "orch_fanin": continue - - dispatch_us = task.get('dispatch_time_us', 0) - if dispatch_us <= 0: + task_id = record.get("task_id") + if not isinstance(task_id, int) or task_id < 0: continue + prev = orch_fanin_by_task.get(task_id) + if prev is None or record.get("end_time_us", 0) > prev[0].get("end_time_us", 0): + orch_fanin_by_task[task_id] = (record, orch_idx) + + for task in tasks: + task_id = task.get('task_id') + if task_id is None or task_id not in orch_fanin_by_task: + continue - finalize_rec, orch_idx = orch_finalize_by_task[tid] - # Use finalize start_time: init_task() runs at the beginning of FINALIZE, - # making the task dispatchable before FINALIZE ends. Using start avoids - # reverse arrows when the scheduler dispatches during FINALIZE. - finalize_start_us = finalize_rec["start_time_us"] - - matched_thread = core_to_sched_thread.get(task['core_id']) - - if matched_thread is not None: - sched_tid = 3000 + matched_thread - orch_tid = 4000 + orch_idx - - # Flow: Orchestrator finalize start → Scheduler DISPATCH - events.append({ - "cat": "flow", - "id": flow_id, - "name": "orch→dispatch", - "ph": "s", - "pid": 4, - "tid": orch_tid, - "ts": finalize_start_us - }) - events.append({ - "cat": "flow", - "id": flow_id, - "name": "orch→dispatch", - "ph": "f", - "pid": 3, - "tid": sched_tid, - "ts": dispatch_us, - "bp": "e" - }) - flow_id += 1 + dispatch_us = task.get('dispatch_time_us', 0) + if dispatch_us <= 0: + continue + + fanin_rec, orch_idx = orch_fanin_by_task[task_id] + fanin_end_us = fanin_rec.get("end_time_us", 0) + if fanin_end_us <= 0: + continue + + matched_thread = core_to_sched_thread.get(task['core_id']) + if matched_thread is None: + continue + + sched_tid = 3000 + matched_thread + orch_tid = 4000 + orch_idx + + # Flow: Orchestrator FANIN end → Scheduler DISPATCH start + events.append({ + "cat": "flow", + "id": flow_id, + "name": "orch→dispatch", + "ph": "s", + "pid": 4, + "tid": orch_tid, + "ts": fanin_end_us + }) + events.append({ + "cat": "flow", + "id": flow_id, + "name": "orch→dispatch", + "ph": "f", + "pid": 3, + "tid": sched_tid, + "ts": dispatch_us, + "bp": "e" + }) + flow_id += 1 if verbose: print(f" Total events: {len(events)}")