Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1271,9 +1271,9 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime* runtime, int32_t threa
int32_t cnt_ready = 0, cnt_waiting = 0, cnt_inflight = 0;
for (int32_t si = 0; si < task_count; si++) {
int32_t slot = si & window_mask;
PTO2TaskState st = sched->slot_states[slot].task_state.load(std::memory_order_relaxed);
int32_t rc = sched->slot_states[slot].fanin_refcount.load(std::memory_order_relaxed);
int32_t fi = sched->slot_states[slot].fanin_count;
PTO2TaskState st = sched->get_consumed_entry(si).task_state.load(std::memory_order_relaxed);
int32_t rc = sched->get_main_slot(si).fanin_refcount.load(std::memory_order_relaxed);
int32_t fi = sched->get_main_slot(si).fanin_count;
int32_t kid = task_descriptors[slot].kernel_id[0];
if (st >= PTO2_TASK_COMPLETED) continue; // Already done
if (st == PTO2_TASK_READY || st == PTO2_TASK_RUNNING) { cnt_inflight++; continue; }
Expand Down Expand Up @@ -1614,6 +1614,10 @@ int32_t AicpuExecutor::run(Runtime* runtime) {
if (runtime->pto2_heap_size > 0) {
heap_size = runtime->pto2_heap_size;
}
uint64_t consumed_window_size = task_window_size * 4; // default
if (runtime->pto2_consumed_window_size > 0) {
consumed_window_size = runtime->pto2_consumed_window_size;
}
DEV_INFO("Thread %d: Ring sizes: task_window=%lu, heap=%lu",
thread_idx, (unsigned long)task_window_size, (unsigned long)heap_size);

Expand All @@ -1631,6 +1635,9 @@ int32_t AicpuExecutor::run(Runtime* runtime) {
return -1;
}

// Set consumed_window_size before runtime creation
sm_handle->header->consumed_window_size = consumed_window_size;

rt = pto2_runtime_create_from_sm(PTO2_MODE_EXECUTE,
sm_handle, gm_heap, heap_size, orch_thread_num_);
if (!rt) {
Expand All @@ -1641,8 +1648,8 @@ int32_t AicpuExecutor::run(Runtime* runtime) {
return -1;
}

// Wire up slot_states pointer for profiling (complete_perf_records)
runtime->set_pto2_slot_states_ptr(rt->scheduler.slot_states);
// Wire up consumed_ring pointer for profiling (complete_perf_records)
runtime->set_pto2_consumed_ring_ptr(rt->scheduler.consumed_ring);

// Store shared state for other orchestrator threads
orch_func_ = orch_func;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,12 @@ extern "C" int init_runtime_impl(Runtime *runtime,
{
runtime->pto2_task_window_size = parse_env_uint64("PTO2_RING_TASK_WINDOW", 4, true);
runtime->pto2_heap_size = parse_env_uint64("PTO2_RING_HEAP", 1024, true);
if (runtime->pto2_task_window_size || runtime->pto2_heap_size) {
LOG_INFO("Ring buffer overrides: task_window=%lu heap=%lu",
runtime->pto2_consumed_window_size = parse_env_uint64("PTO2_RING_CONSUMED_WINDOW", 4, true);
if (runtime->pto2_task_window_size || runtime->pto2_heap_size || runtime->pto2_consumed_window_size) {
LOG_INFO("Ring buffer overrides: task_window=%lu heap=%lu consumed_window=%lu",
(unsigned long)(runtime->pto2_task_window_size ? runtime->pto2_task_window_size : PTO2_TASK_WINDOW_SIZE),
(unsigned long)(runtime->pto2_heap_size ? runtime->pto2_heap_size : PTO2_HEAP_SIZE));
(unsigned long)(runtime->pto2_heap_size ? runtime->pto2_heap_size : PTO2_HEAP_SIZE),
(unsigned long)(runtime->pto2_consumed_window_size ? runtime->pto2_consumed_window_size : 0));
}
Comment on lines +269 to 274

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message for consumed_window can be misleading. When the PTO2_RING_CONSUMED_WINDOW environment variable is not set, runtime->pto2_consumed_window_size is 0, and the log will show consumed_window=0. However, the actual default value used by the runtime is 4 times the effective task window size. This discrepancy can be confusing when debugging performance or memory usage. The log message should reflect the effective default value that will be used.

        if (runtime->pto2_task_window_size || runtime->pto2_heap_size || runtime->pto2_consumed_window_size) {
            uint64_t eff_task_window = runtime->pto2_task_window_size ? runtime->pto2_task_window_size : PTO2_TASK_WINDOW_SIZE;
            LOG_INFO("Ring buffer overrides: task_window=%lu heap=%lu consumed_window=%lu",
                     (unsigned long)eff_task_window,
                     (unsigned long)(runtime->pto2_heap_size ? runtime->pto2_heap_size : PTO2_HEAP_SIZE),
                     (unsigned long)(runtime->pto2_consumed_window_size ? runtime->pto2_consumed_window_size : eff_task_window * 4));
        }

}

Expand Down
104 changes: 57 additions & 47 deletions src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ bool pto2_orchestrator_init(
pto2_task_ring_init(&orch->task_ring,
sm_handle->task_descriptors,
sm_handle->header->task_window_size,
&sm_handle->header->last_task_alive,
&sm_handle->header->last_task_released,
&sm_handle->header->current_task_index);

// Allocate and initialize dependency list pool (per-orchestrator, no shared memory)
Expand All @@ -122,7 +122,7 @@ bool pto2_orchestrator_init(
orch->dep_pool_last_reclaimed = 0;

// Initialize TensorMap
if (!orch->tensor_map.init_default(sm_handle->header->task_window_size)) {
if (!orch->tensor_map.init_default(sm_handle->header->consumed_window_size)) {
free(dep_entries);
return false;
}
Expand Down Expand Up @@ -245,17 +245,17 @@ void pto2_submit_mixed_task(
// === STEP 0: Sync TensorMap validity and optional cleanup ===
orch->tensor_map.sync_tensormap();

// Reclaim dead dep pool entries based on scheduler's last_task_alive
// Reclaim dead dep pool entries based on scheduler's last_task_released
{
int32_t last_alive = orch->sm_handle->header->last_task_alive.load(std::memory_order_acquire);
if (last_alive > orch->dep_pool_last_reclaimed && last_alive > 0) {
int32_t newest_consumed = last_alive - 1;
int32_t slot_rc = orch->task_ring.get_task_slot(newest_consumed);
int32_t mark = orch->sm_handle->task_payloads[slot_rc].dep_pool_mark;
int32_t last_released = orch->sm_handle->header->last_task_released.load(std::memory_order_acquire);
if (last_released > orch->dep_pool_last_reclaimed && last_released > 0) {
int32_t newest_released = last_released - 1;
PTO2ConsumedRingEntry& cr_entry = orch->scheduler->get_consumed_entry(newest_released);
int32_t mark = cr_entry.dep_pool_mark;
if (mark > 0) {
orch->dep_pool.advance_tail(mark);
}
orch->dep_pool_last_reclaimed = last_alive;
orch->dep_pool_last_reclaimed = last_released;
}
}

Expand All @@ -265,32 +265,35 @@ void pto2_submit_mixed_task(
always_assert(orch->scope_stack_top >= 0 && "Cannot submit task outside a scope");

// === Scope deadlock pre-check ===
// Tasks within a scope hold a fanout_count reference released only at scope_end.
// If scope task count >= window_size, no slots can ever be reclaimed → deadlock.
// Tasks within a scope hold a fanout_count reference (in consumed ring)
// released only at scope_end. The consumed ring cannot reclaim these entries
// until CONSUMED, so scope_task_count >= consumed_window_size → deadlock.
// (Main ring slots CAN be reclaimed at RELEASED within a scope.)
{
int32_t scope_task_count = orch->scope_tasks_size - orch->scope_begins[orch->scope_stack_top];
if (scope_task_count >= orch->task_ring.window_size - 1) {
int64_t consumed_window = static_cast<int64_t>(orch->sm_handle->header->consumed_window_size);
if (scope_task_count >= consumed_window - 1) {
int32_t total_submitted = orch->task_ring.current_index_ptr->load(std::memory_order_acquire);
int32_t last_alive = orch->task_ring.last_alive_ptr->load(std::memory_order_acquire);
int32_t active_count = total_submitted - last_alive;
int32_t last_released = orch->task_ring.last_released_ptr->load(std::memory_order_acquire);
int32_t active_count = total_submitted - last_released;

LOG_ERROR("========================================");
LOG_ERROR("FATAL: Scope Deadlock Detected!");
LOG_ERROR("========================================");
LOG_ERROR("Tasks in current scope (%d) >= task_window_size (%d).",
scope_task_count, orch->task_ring.window_size);
LOG_ERROR("Tasks in current scope (%d) >= consumed_window_size (%ld).",
scope_task_count, (long)consumed_window);
LOG_ERROR(" scope_depth: %d", orch->scope_stack_top + 1);
LOG_ERROR(" scope_task_count: %d", scope_task_count);
LOG_ERROR(" total_submitted: %d", total_submitted);
LOG_ERROR(" last_task_alive: %d", last_alive);
LOG_ERROR(" last_task_released: %d", last_released);
LOG_ERROR(" active_tasks: %d / %d", active_count, orch->task_ring.window_size);
LOG_ERROR("Root Cause:");
LOG_ERROR(" Tasks within a scope hold a fanout_count reference that is only");
LOG_ERROR(" released at scope_end. When scope task count >= window_size,");
LOG_ERROR(" no slots can be reclaimed -> deadlock.");
LOG_ERROR(" Consumed ring entries hold fanout_count references released only at");
LOG_ERROR(" scope_end. When scope task count >= consumed_window_size,");
LOG_ERROR(" no consumed ring entries can be reclaimed -> deadlock.");
LOG_ERROR("Solution:");
LOG_ERROR(" 1. Reduce tasks per scope (use batching/unroll)");
LOG_ERROR(" 2. Increase PTO2_TASK_WINDOW_SIZE (current: %d)", orch->task_ring.window_size);
LOG_ERROR(" 2. Increase consumed_window_size (current: %ld)", (long)consumed_window);
LOG_ERROR(" 3. Split work across multiple scopes");
LOG_ERROR("========================================");
exit(1);
Expand All @@ -313,19 +316,22 @@ void pto2_submit_mixed_task(
task.active_mask = active_mask;
task.subtask_done_mask.store(0, std::memory_order_relaxed);
task.packed_buffer_base = NULL;
task.packed_buffer_end = NULL;

// Initialize slot state (scheduler-private)
// Initialize scheduler-private state (consumed ring + main slot)
PTO2SchedulerState* sched = orch->scheduler;
if (sched) {
PTO2TaskSlotState& slot_state = sched->slot_states[slot];
slot_state.fanin_count = 0;
slot_state.fanout_head = nullptr;
slot_state.fanout_lock.store(0, std::memory_order_relaxed);
PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id);
cr_entry.fanout_head = nullptr;
cr_entry.fanout_lock.store(0, std::memory_order_relaxed);
// Initial fanout_count = 1 (the owning scope holds one reference)
slot_state.fanout_count = 1;
slot_state.fanout_refcount.store(0, std::memory_order_release);
slot_state.fanin_refcount.store(0, std::memory_order_release);
cr_entry.fanout_count = 1;
cr_entry.fanout_refcount.store(0, std::memory_order_release);
cr_entry.packed_buffer_end = nullptr;
cr_entry.dep_pool_mark = 0;

PTO2MainSlotState& main_slot = sched->get_main_slot(task_id);
main_slot.fanin_count = 0;
main_slot.fanin_refcount.store(0, std::memory_order_release);
}

// Register this task in its owning scope
Expand Down Expand Up @@ -366,7 +372,8 @@ void pto2_submit_mixed_task(

if (total_output_size > 0) {
task.packed_buffer_base = orch->pto2_alloc_packed_buffer(total_output_size);
task.packed_buffer_end = (char*)task.packed_buffer_base + total_output_size;
PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id);
cr_entry.packed_buffer_end = (char*)task.packed_buffer_base + total_output_size;
}
CYCLE_COUNT_LAP_RECORD(g_orch_heap_cycle, AicpuPhaseId::ORCH_HEAP, task_id);
#if PTO2_ORCH_PROFILING
Expand Down Expand Up @@ -452,54 +459,54 @@ void pto2_submit_mixed_task(
// === STEP 5: Finalize fanin list ===
// First build the fanin list
if (sched) {
PTO2TaskSlotState& cur_slot_state = sched->slot_states[slot];
PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id);
PTO2MainSlotState& cur_main_slot = sched->get_main_slot(task_id);
// Initialize scheduler state BEFORE adding to producer fanout lists,
// so concurrent on_mixed_task_complete can safely access task_state/fanout_refcount.
cur_slot_state.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed);
cur_slot_state.fanout_refcount.store(0, std::memory_order_relaxed);
cr_entry.task_state.store(PTO2_TASK_PENDING, std::memory_order_relaxed);
cr_entry.fanout_refcount.store(0, std::memory_order_relaxed);

auto& dep_pool = orch->dep_pool;
if (orch->dep_pool_cur_entry == nullptr) {
orch->dep_pool_cur_entry = &dep_pool.alloc();
}

int32_t early_finished = 0;
cur_slot_state.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early
cur_main_slot.fanin_count = fanin_count + 1; // +1 redundance for not being ready too early
payload->fanin_actual_count = fanin_count;
for (int i = 0; i < fanin_count; i++) {
payload->fanin_tasks[i] = fanin_temp[i];
}
for (int i = 0; i < fanin_count; i++) {
int32_t producer_task_id = fanin_temp[i];
// Add this task to producer's fanout list (with spinlock)
int32_t prod_slot = task_ring.get_task_slot(producer_task_id);
PTO2TaskSlotState& producer_slot_state = sched->slot_states[prod_slot];
PTO2ConsumedRingEntry& producer_cr = sched->get_consumed_entry(producer_task_id);
orch->dep_pool_cur_entry->task_id = task_id;
orch->dep_pool_cur_entry->next = producer_slot_state.fanout_head;
orch->dep_pool_cur_entry->next = producer_cr.fanout_head;
#if PTO2_ORCH_PROFILING || PTO2_SCHED_PROFILING
pto2_fanout_lock(producer_slot_state, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle);
pto2_fanout_lock(producer_cr, g_orch_fanin_atomic_count, g_orch_fanin_wait_cycle);
#else
pto2_fanout_lock(producer_slot_state);
pto2_fanout_lock(producer_cr);
#endif
// Normal path: prepend consumer to producer's fanout list
producer_slot_state.fanout_count += 1;
int32_t prod_state = producer_slot_state.task_state.load(std::memory_order_acquire);
producer_cr.fanout_count += 1;
int32_t prod_state = producer_cr.task_state.load(std::memory_order_acquire);
if (prod_state >= PTO2_TASK_COMPLETED) {
// Early return optimization: if producer already completed, we can skip adding dependency and directly
// decrement fanin_count
early_finished++;
} else {
producer_slot_state.fanout_head = orch->dep_pool_cur_entry;
producer_cr.fanout_head = orch->dep_pool_cur_entry;
}
pto2_fanout_unlock(producer_slot_state);
if (producer_slot_state.fanout_head == orch->dep_pool_cur_entry) {
pto2_fanout_unlock(producer_cr);
if (producer_cr.fanout_head == orch->dep_pool_cur_entry) {
orch->dep_pool_cur_entry = &dep_pool.alloc();
}
}
// Combined release: merge early_finished batch + init_task's +1 release
// into a single atomic fetch_add (saves one acq_rel cache-line bounce per task).
int32_t initial_refcount = early_finished + 1; // +1 for the init release
int32_t new_rc = cur_slot_state.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel)
int32_t new_rc = cur_main_slot.fanin_refcount.fetch_add(initial_refcount, std::memory_order_acq_rel)
+ initial_refcount;
if (new_rc >= fanin_count + 1) {
PTO2ResourceShape shape = pto2_active_mask_to_shape(active_mask);
Expand All @@ -516,7 +523,10 @@ void pto2_submit_mixed_task(
}

// Record dep pool watermark for this task (used by tail reclamation)
payload->dep_pool_mark = orch->dep_pool.top;
if (sched) {
PTO2ConsumedRingEntry& cr_entry = sched->get_consumed_entry(task_id);
cr_entry.dep_pool_mark = orch->dep_pool.top;
}

CYCLE_COUNT_LAP_RECORD(g_orch_fanin_cycle, AicpuPhaseId::ORCH_FANIN, task_id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct PTO2OrchestratorState {
PTO2TaskRing task_ring; // Task slot allocation
PTO2DepListPool dep_pool; // Dependency list storage (per-orchestrator, no atomics needed)
PTO2DepListEntry* dep_pool_cur_entry;
int32_t dep_pool_last_reclaimed; // last_task_alive value at last reclamation
int32_t dep_pool_last_reclaimed; // last_task_released value at last reclamation

// === TENSOR MAP (Private) ===
PTO2TensorMap tensor_map; // Producer lookup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ void pto2_heap_ring_init(PTO2HeapRing* ring, void* base, uint64_t size,
// =============================================================================

void pto2_task_ring_init(PTO2TaskRing* ring, PTO2TaskDescriptor* descriptors,
int32_t window_size, std::atomic<int32_t>* last_alive_ptr,
int32_t window_size, std::atomic<int32_t>* last_released_ptr,
std::atomic<int32_t>* current_index_ptr) {
ring->descriptors = descriptors;
ring->window_size = window_size;
ring->current_index_ptr = current_index_ptr;
ring->last_alive_ptr = last_alive_ptr;
ring->last_released_ptr = last_released_ptr;
}

// =============================================================================
Expand Down
Loading
Loading