Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, i
params_up.add_input(oi_new_b);
params_up.add_inout(mi_batch);
params_up.add_inout(li_batch);
params_up.add_output(oi_batch);
params_up.add_inout(oi_batch);
params_up.add_output(out);
params_up.add_scalar(is_first);
params_up.add_scalar(is_last);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@

// Type headers needed by orchestration
#include "pto_types.h" // PTOParam, PTOTensorEntry, PTOParamType
#include "tensor.h" // Tensor, make_tensor, make_tensor_external
#include "tensor.h" // Tensor struct
#include "pto_submit_types.h" // MixedKernels, INVALID_KERNEL_ID, subtask slots

// Multi-ring: number of independent ring layers (HeapRing + TaskRing + DepPool per layer)
// Scope depth maps to ring index via: min(scope_depth, PTO2_MAX_RING_DEPTH - 1)
#define PTO2_MAX_RING_DEPTH 4

// Thread-local scope depth for tensor factory functions.
// Incremented/decremented by PTO2ScopeGuard and standalone scope wrappers.
// Tensor ring selection clamps this depth to the runtime's valid ring range.
static thread_local uint8_t __pto2_ring_id = 0;

// =============================================================================
// Ops Table and Opaque Runtime
// =============================================================================
Expand Down Expand Up @@ -99,10 +108,12 @@ static inline void pto2_rt_submit_aiv_task(PTO2Runtime* rt, int32_t kernel_id,

static inline void pto2_rt_scope_begin(PTO2Runtime* rt) {
rt->ops->scope_begin(rt);
__pto2_ring_id++;
}

static inline void pto2_rt_scope_end(PTO2Runtime* rt) {
rt->ops->scope_end(rt);
__pto2_ring_id--;
}

static inline void pto2_rt_orchestration_done(PTO2Runtime* rt) {
Expand All @@ -113,6 +124,59 @@ static inline bool pto2_rt_is_fatal(PTO2Runtime* rt) {
return rt->ops->is_fatal(rt);
}

// =============================================================================
// Tensor Factory Functions
// =============================================================================

/**
* Create a Tensor for pre-allocated external memory.
*/
static inline Tensor make_tensor_external(void* addr,
const uint32_t shapes[],
uint32_t ndims,
DataType dtype = DataType::FLOAT32,
bool manual_dep = false,
int32_t version = 0) {
static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {};
uint64_t total = 1;
for (uint32_t i = 0; i < ndims; i++) {
total *= shapes[i];
}
return Tensor(addr, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version,
/*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep,
TENSOR_RING_ID_NONE);
}

static inline Tensor make_tensor_with_ring(const uint32_t shapes[],
uint32_t ndims,
DataType dtype,
bool manual_dep,
int32_t version,
uint8_t ring_id) {
static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {};
uint64_t total = 1;
for (uint32_t i = 0; i < ndims; i++) {
total *= shapes[i];
}
return Tensor(0, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version,
/*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep, ring_id);
}

static inline uint8_t current_tensor_ring_id() {
return __pto2_ring_id < PTO2_MAX_RING_DEPTH ? __pto2_ring_id : PTO2_MAX_RING_DEPTH - 1;
}

/**
* Create a Tensor for runtime-allocated output (addr=0).
* Uses the thread-local scope depth set by PTO2ScopeGuard, clamped to the
* runtime ring range to match PTO2OrchestratorState::current_ring_id().
*/
static inline Tensor make_tensor(const uint32_t shapes[], uint32_t ndims,
DataType dtype = DataType::FLOAT32, bool manual_dep = false,
int32_t version = 0) {
return make_tensor_with_ring(shapes, ndims, dtype, manual_dep, version, current_tensor_ring_id());
}

// =============================================================================
// Logging Macros for Orchestration (call through ops table)
// =============================================================================
Expand All @@ -133,10 +197,10 @@ static inline bool pto2_rt_is_fatal(PTO2Runtime* rt) {
class PTO2ScopeGuard {
public:
PTO2ScopeGuard(PTO2Runtime* rt) : rt_(rt) {
rt_->ops->scope_begin(rt_);
pto2_rt_scope_begin(rt_);
}
~PTO2ScopeGuard() {
rt_->ops->scope_end(rt_);
pto2_rt_scope_end(rt_);
}
private:
PTO2Runtime* rt_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,16 @@ bool pto2_orchestrator_init(
int32_t init_cap = PTO2_SCOPE_TASKS_INIT_CAP;
orch->scope_tasks = (PTO2TaskSlotState**)malloc(init_cap * sizeof(PTO2TaskSlotState*));
orch->scope_begins = (int32_t*)malloc(max_depth * sizeof(int32_t));
if (!orch->scope_tasks || !orch->scope_begins) {
orch->scope_escape_tasks = (PTO2TaskSlotState***)calloc(max_depth, sizeof(PTO2TaskSlotState**));
orch->scope_escape_counts = (int32_t*)calloc(max_depth, sizeof(int32_t));
orch->scope_escape_capacities = (int32_t*)calloc(max_depth, sizeof(int32_t));
if (!orch->scope_tasks || !orch->scope_begins || !orch->scope_escape_tasks ||
!orch->scope_escape_counts || !orch->scope_escape_capacities) {
free(orch->scope_tasks);
free(orch->scope_begins);
free(orch->scope_escape_tasks);
free(orch->scope_escape_counts);
free(orch->scope_escape_capacities);
for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) {
free(orch->rings[r].dep_pool.base);
}
Expand All @@ -190,6 +197,17 @@ void pto2_orchestrator_destroy(PTO2OrchestratorState* orch) {
orch->scope_tasks = NULL;
free(orch->scope_begins);
orch->scope_begins = NULL;
if (orch->scope_escape_tasks) {
for (uint64_t i = 0; i < orch->scope_stack_capacity; ++i) {
free(orch->scope_escape_tasks[i]);
}
free(orch->scope_escape_tasks);
orch->scope_escape_tasks = NULL;
}
free(orch->scope_escape_counts);
orch->scope_escape_counts = NULL;
free(orch->scope_escape_capacities);
orch->scope_escape_capacities = NULL;
}

void pto2_orchestrator_set_scheduler(PTO2OrchestratorState* orch, PTO2SchedulerState* scheduler) {
Expand All @@ -211,12 +229,29 @@ static void scope_tasks_push(PTO2OrchestratorState* orch, PTO2TaskSlotState *tas
orch->scope_tasks[orch->scope_tasks_size++] = task_slot_state;
}

static void scope_escape_tasks_push(PTO2OrchestratorState* orch, int32_t scope_idx,
PTO2TaskSlotState* task_slot_state) {
always_assert(scope_idx >= 0 && static_cast<uint64_t>(scope_idx) < orch->scope_stack_capacity);
int32_t count = orch->scope_escape_counts[scope_idx];
int32_t capacity = orch->scope_escape_capacities[scope_idx];
if (count >= capacity) {
int32_t new_cap = capacity > 0 ? capacity * 2 : PTO2_SCOPE_TASKS_INIT_CAP;
PTO2TaskSlotState** new_buf = (PTO2TaskSlotState**)realloc(
orch->scope_escape_tasks[scope_idx], new_cap * sizeof(PTO2TaskSlotState*));
assert(new_buf && "Failed to grow scope escape task buffer");
orch->scope_escape_tasks[scope_idx] = new_buf;
orch->scope_escape_capacities[scope_idx] = new_cap;
}
orch->scope_escape_tasks[scope_idx][orch->scope_escape_counts[scope_idx]++] = task_slot_state;
}

void pto2_scope_begin(PTO2OrchestratorState* orch) {
if (orch->fatal) { return; }
assert(orch->scope_stack_top < (int32_t)(orch->scope_stack_capacity - 1) && "Scope stack overflow");

++orch->scope_stack_top;
orch->scope_begins[orch->scope_stack_top] = orch->scope_tasks_size;
orch->scope_escape_counts[orch->scope_stack_top] = 0;
}

void pto2_scope_end(PTO2OrchestratorState* orch) {
Expand All @@ -227,12 +262,18 @@ void pto2_scope_end(PTO2OrchestratorState* orch) {
uint64_t _se0 = get_sys_cnt_aicpu();
#endif

int32_t scope_idx = orch->scope_stack_top;
int32_t begin = orch->scope_begins[orch->scope_stack_top--];
int32_t count = orch->scope_tasks_size - begin;

if (orch->scheduler && count > 0) {
orch->scheduler->on_scope_end(&orch->scope_tasks[begin], count);
}
if (orch->scheduler && orch->scope_escape_counts[scope_idx] > 0) {
orch->scheduler->on_scope_end(
orch->scope_escape_tasks[scope_idx], orch->scope_escape_counts[scope_idx]);
}
orch->scope_escape_counts[scope_idx] = 0;

// Rewind the task buffer — these entries are no longer needed
orch->scope_tasks_size = begin;
Expand Down Expand Up @@ -271,7 +312,7 @@ void pto2_submit_mixed_task(
return;
}


// Determine which ring this task belongs to
uint8_t ring_id = orch->current_ring_id();
auto& task_ring = orch->rings[ring_id].task_ring;
Expand Down Expand Up @@ -343,7 +384,23 @@ void pto2_submit_mixed_task(
PTO2TaskId mixed_task_id = pto2_make_task_id(ring_id, static_cast<uint32_t>(local_id));

PTO2TaskDescriptor& task = task_ring.get_task_by_slot(slot);
PTO2TaskPayload* payload = &orch->sm_handle->task_payloads[ring_id][slot];
PTO2TaskPayload* payload = &orch->sm_handle->task_payloads[ring_id][slot];
bool extra_scope_refs[PTO2_MAX_RING_DEPTH] = {};
int32_t extra_scope_ref_count = 0;
for (int i = 0; i < params.tensor_count; i++) {
if (params.tensor_types[i] != PTOParamType::INOUT || params.tensors[i]->manual_dep) {
continue;
}
uint8_t owner_ring = params.tensors[i]->ring_id;
if (owner_ring == TENSOR_RING_ID_NONE || owner_ring >= PTO2_MAX_RING_DEPTH || owner_ring >= ring_id) {
continue;
}
if (!extra_scope_refs[owner_ring]) {
always_assert(static_cast<int32_t>(owner_ring) <= orch->scope_stack_top);
extra_scope_refs[owner_ring] = true;
extra_scope_ref_count++;
}
}

// Early write-prefetch payload GM cache lines to issue RFO in background.
// ~130 lines of computation (output_size, lookup, insert) follow before
Expand All @@ -367,8 +424,9 @@ void pto2_submit_mixed_task(
slot_state.fanin_count = 0;
slot_state.fanout_head = nullptr;
slot_state.fanout_lock.store(0, std::memory_order_relaxed);
// Initial fanout_count = 1 (the owning scope holds one reference)
slot_state.fanout_count = 1;
// Direct owner scope always holds one reference. Cross-scope INOUT
// additionally pins the task to each escaped owner scope.
slot_state.fanout_count = 1 + extra_scope_ref_count;
slot_state.fanout_refcount.store(0, std::memory_order_release);
slot_state.fanin_refcount.store(0, std::memory_order_release);
slot_state.payload = payload;
Expand All @@ -377,6 +435,11 @@ void pto2_submit_mixed_task(
slot_state.subtask_done_mask.store(0, std::memory_order_relaxed);
slot_state.ring_id = ring_id;
scope_tasks_push(orch, &slot_state);
for (int32_t owner_scope = 0; owner_scope < PTO2_MAX_RING_DEPTH; ++owner_scope) {
if (extra_scope_refs[owner_scope]) {
scope_escape_tasks_push(orch, owner_scope, &slot_state);
}
}
} else {
scope_tasks_push(orch, nullptr);
}
Expand Down Expand Up @@ -416,7 +479,7 @@ void pto2_submit_mixed_task(
// Read current last_task_alive from shared memory for this ring
int32_t sm_last_task_alive = fc.last_task_alive.load(std::memory_order_acquire);

orch->tensor_map.sync_tensormap(ring_id, sm_last_task_alive);
orch->tensor_map.sync_tensormap();

if (sched) {
orch->rings[ring_id].dep_pool.reclaim(*sched, ring_id, sm_last_task_alive);
Expand Down Expand Up @@ -474,11 +537,20 @@ void pto2_submit_mixed_task(

case PTOParamType::OUTPUT: {
Tensor& tensor = *params.tensors[i];
if (tensor.buffer.addr == 0) {
bool needs_alloc = tensor.buffer.addr == 0;
if (needs_alloc) {
uint64_t alloc_addr = reinterpret_cast<uint64_t>((char*)local_packed_base + offset);
tensor.buffer.addr = alloc_addr;
offset += PTO2_ALIGN_UP(tensor.buffer.size, PTO2_PACKED_OUTPUT_ALIGN);
}
if (tensor.ring_id == TENSOR_RING_ID_NONE) {
always_assert(!needs_alloc &&
"Internal OUTPUT tensor must have ring_id assigned before submit");
} else {
always_assert(tensor.ring_id < PTO2_MAX_RING_DEPTH);
always_assert(static_cast<int32_t>(tensor.ring_id) == ring_id &&
"OUTPUT tensor ring_id must match submit ring");
}
break;
}
}
Expand All @@ -491,7 +563,7 @@ void pto2_submit_mixed_task(
PTOParamType ptype = params.tensor_types[i];
if (ptype == PTOParamType::OUTPUT || ptype == PTOParamType::INOUT) {
if (!params.tensors[i]->manual_dep) {
orch->tensor_map.insert(*params.tensors[i], mixed_task_id, ptype == PTOParamType::OUTPUT);
orch->tensor_map.insert(*params.tensors[i], mixed_task_id, ptype == PTOParamType::OUTPUT, ring_id);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ struct PTO2OrchestratorState {
int32_t scope_tasks_size; // Number of task IDs currently in the buffer
int32_t scope_tasks_capacity; // Allocated capacity of scope_tasks
int32_t* scope_begins; // scope_begins[i] = start index of scope i in scope_tasks
PTO2TaskSlotState*** scope_escape_tasks; // Cross-scope INOUT refs held by owner scope
int32_t* scope_escape_counts; // Number of escape refs per scope
int32_t* scope_escape_capacities; // Allocated capacity per scope
int32_t scope_stack_top; // Current top of stack (-1 = no scope open)
uint64_t scope_stack_capacity; // Max nesting depth (PTO2_MAX_SCOPE_DEPTH)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
#define PTO2_TENSORMAP_POOL_SIZE (65536) // TensorMap entry pool
#define PTO2_TENSORMAP_NUM_BUCKETS 65536 // Power of 2 for fast hash

// Fallback TensorMap (for cross-ring INOUT and external tensor entries)
#define PTO2_FALLBACK_POOL_SIZE 4096 // Fallback entry pool (rare path)
#define PTO2_FALLBACK_NUM_BUCKETS 4096 // Power of 2 for fast hash

// Scope management
#define PTO2_MAX_SCOPE_DEPTH 64 // Maximum nesting depth
#define PTO2_SCOPE_TASKS_INIT_CAP 65536 // Initial capacity for scope task buffer
Expand Down
Loading
Loading