diff --git a/examples/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp b/examples/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp index 2c472f51..96ba0b4c 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp @@ -191,7 +191,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, i params_up.add_input(oi_new_b); params_up.add_inout(mi_batch); params_up.add_inout(li_batch); - params_up.add_output(oi_batch); + params_up.add_inout(oi_batch); params_up.add_output(out); params_up.add_scalar(is_first); params_up.add_scalar(is_last); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_TENSORMAP_REWRITE.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_TENSORMAP_REWRITE.md new file mode 100644 index 00000000..64b8087b --- /dev/null +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/MULTI_TENSORMAP_REWRITE.md @@ -0,0 +1,167 @@ +# Multi-TensorMap Rewrite + +Date: 2026-03-19 + +## Background + +The old multi-tensormap direction had two persistent problems: + +- fallback history did not share the same lifecycle model as owner history +- owner and fallback logic were drifting toward separate implementations + +The rewrite fixes both by making producer retirement the only lifecycle +source and forcing owner/fallback to share one shard implementation. + +## Goals + +- Keep same-ring owner history on a ring-local fast path +- Support cross-ring `INOUT` and external tensors through fallback storage +- Bind stale/cleanup semantics to real producer retirement +- Keep `sync_tensormap()` interface unchanged +- Force owner and fallback to share one core implementation + +## Tensor Model + +`Tensor.ring_id` means tensor owner ring. + +- `ring_id in [0, PTO2_MAX_RING_DEPTH)`: internal tensor +- `ring_id == TENSOR_RING_ID_NONE`: external tensor + +Submit rules: + +- internal `OUTPUT` must satisfy `tensor.ring_id == submit_ring` +- `INOUT` and `INPUT` must not rewrite owner ring at submit time +- external tensors stay external; runtime must not silently assign an + internal owner ring + +## Entry Model + +Each tensormap entry stores: + +- `producer_task_id`: the real producer task +- `tensor_owner_ring`: the tensor owner ring, or `TENSOR_RING_ID_NONE` +- `storage_domain`: `OWNER_MAP` or `FALLBACK_MAP` +- overlap metadata: address, version, shape, offsets +- `with_alloc`: whether this history entry came from runtime allocation + +The entry does not store a separate fallback lifecycle key. + +Two derived values drive lifecycle handling: + +- `producer_ring = producer_task_id.ring()` +- `producer_local = producer_task_id.local()` + +## Shared Shard Core + +Owner and fallback both use the same template: + +```cpp +template +struct TensorMapShardImpl; +``` + +Concrete instances: + +- `OwnerTensorMapShard = TensorMapShardImpl<1, true>` +- `FallbackTensorMapShard = TensorMapShardImpl` + +This keeps one method body for: + +- `init` +- `destroy` +- `lookup` +- `insert` +- `remove_entry` +- `cleanup_range` + +Differences are expressed only through template parameters and entry +metadata, not through specialized method bodies. + +## Cleanup Domains + +`cleanup_domain` is a shard-local concept, not a stored field. + +For owner shards: + +- there is exactly one cleanup domain +- every entry maps to cleanup domain `0` + +For fallback shard: + +- there is one cleanup domain per producer ring +- an entry maps to `producer_task_id.ring()` + +This is why fallback mirrors `last_task_alive[ring]` for every producer +ring instead of maintaining a fake global frontier. + +## Routing Rules + +### Lookup + +- internal tensor: query owner shard first, then fallback shard +- external tensor: query fallback shard only + +### Insert + +- internal `OUTPUT`: owner shard of the submit ring +- same-ring internal `INOUT`: owner shard of the submit ring +- cross-ring internal `INOUT`: fallback shard +- external `OUTPUT` / `INOUT`: fallback shard + +### Remove + +`remove_entry()` routes by `storage_domain`: + +- `OWNER_MAP`: remove from the owner shard indexed by `tensor_owner_ring` +- `FALLBACK_MAP`: remove from fallback shard + +## Cleanup Semantics + +Stale is defined only by producer retirement. + +Shared validity rule: + +```cpp +entry.producer_task_id.local() >= + shard.last_task_alives[cleanup_domain_of(entry)] +``` + +Lookup behavior: + +- owner shards may `break` on first stale entry because each owner shard is + a single lifecycle domain +- fallback shard must continue scanning because its bucket chains mix + producer rings + +Cleanup behavior: + +- `sync_tensormap()` reads real `last_task_alive` values from shared memory +- owner shard `R` cleans retired range on domain `0` +- fallback shard cleans retired range on domain `R` + +No fallback-private lifecycle frontier exists. + +## Main Invariants + +Owner shard entry: + +- `storage_domain == OWNER_MAP` +- `tensor_owner_ring == producer_task_id.ring()` + +Fallback shard entry: + +- `storage_domain == FALLBACK_MAP` +- cleanup is driven only by `producer_task_id.ring()` + +Global invariant: + +- owner and fallback share one core implementation +- differences must not grow into two independent algorithms + +## Current Implementation Notes + +The committed implementation also keeps two important user-facing choices: + +- `sync_tensormap(uint8_t ring_id, int32_t sm_last_task_alive)` stays + unchanged +- `with_alloc` follows allocation semantics, not `PTOParamType` alone diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h index ff7d2b18..560ae6b9 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/orchestration/pto_orchestration_api.h @@ -23,9 +23,18 @@ // Type headers needed by orchestration #include "pto_types.h" // PTOParam, PTOTensorEntry, PTOParamType -#include "tensor.h" // Tensor, make_tensor, make_tensor_external +#include "tensor.h" // Tensor struct #include "pto_submit_types.h" // MixedKernels, INVALID_KERNEL_ID, subtask slots +// Multi-ring: number of independent ring layers (HeapRing + TaskRing + DepPool per layer) +// Scope depth maps to ring index via: min(scope_depth, PTO2_MAX_RING_DEPTH - 1) +#define PTO2_MAX_RING_DEPTH 4 + +// Thread-local scope depth for tensor factory functions. +// Incremented/decremented by PTO2ScopeGuard and standalone scope wrappers. +// Tensor ring selection clamps this depth to the runtime's valid ring range. +static thread_local uint8_t __pto2_ring_id = 0; + // ============================================================================= // Ops Table and Opaque Runtime // ============================================================================= @@ -99,10 +108,12 @@ static inline void pto2_rt_submit_aiv_task(PTO2Runtime* rt, int32_t kernel_id, static inline void pto2_rt_scope_begin(PTO2Runtime* rt) { rt->ops->scope_begin(rt); + __pto2_ring_id++; } static inline void pto2_rt_scope_end(PTO2Runtime* rt) { rt->ops->scope_end(rt); + __pto2_ring_id--; } static inline void pto2_rt_orchestration_done(PTO2Runtime* rt) { @@ -113,6 +124,59 @@ static inline bool pto2_rt_is_fatal(PTO2Runtime* rt) { return rt->ops->is_fatal(rt); } +// ============================================================================= +// Tensor Factory Functions +// ============================================================================= + +/** + * Create a Tensor for pre-allocated external memory. + */ +static inline Tensor make_tensor_external(void* addr, + const uint32_t shapes[], + uint32_t ndims, + DataType dtype = DataType::FLOAT32, + bool manual_dep = false, + int32_t version = 0) { + static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {}; + uint64_t total = 1; + for (uint32_t i = 0; i < ndims; i++) { + total *= shapes[i]; + } + return Tensor(addr, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version, + /*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep, + TENSOR_RING_ID_NONE); +} + +static inline Tensor make_tensor_with_ring(const uint32_t shapes[], + uint32_t ndims, + DataType dtype, + bool manual_dep, + int32_t version, + uint8_t ring_id) { + static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {}; + uint64_t total = 1; + for (uint32_t i = 0; i < ndims; i++) { + total *= shapes[i]; + } + return Tensor(0, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version, + /*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep, ring_id); +} + +static inline uint8_t current_tensor_ring_id() { + return __pto2_ring_id < PTO2_MAX_RING_DEPTH ? __pto2_ring_id : PTO2_MAX_RING_DEPTH - 1; +} + +/** + * Create a Tensor for runtime-allocated output (addr=0). + * Uses the thread-local scope depth set by PTO2ScopeGuard, clamped to the + * runtime ring range to match PTO2OrchestratorState::current_ring_id(). + */ +static inline Tensor make_tensor(const uint32_t shapes[], uint32_t ndims, + DataType dtype = DataType::FLOAT32, bool manual_dep = false, + int32_t version = 0) { + return make_tensor_with_ring(shapes, ndims, dtype, manual_dep, version, current_tensor_ring_id()); +} + // ============================================================================= // Logging Macros for Orchestration (call through ops table) // ============================================================================= @@ -133,10 +197,10 @@ static inline bool pto2_rt_is_fatal(PTO2Runtime* rt) { class PTO2ScopeGuard { public: PTO2ScopeGuard(PTO2Runtime* rt) : rt_(rt) { - rt_->ops->scope_begin(rt_); + pto2_rt_scope_begin(rt_); } ~PTO2ScopeGuard() { - rt_->ops->scope_end(rt_); + pto2_rt_scope_end(rt_); } private: PTO2Runtime* rt_; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index d62e0f9a..1e758fcf 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -390,10 +390,12 @@ void pto2_submit_mixed_task( CYCLE_COUNT_LAP_RECORD(g_orch_alloc_cycle, AicpuPhaseId::ORCH_ALLOC, local_id); // === STEP 2: Calculate output size + heap alloc (read from params only, no GM access) === + bool needs_alloc[PTO2_MAX_TENSOR_PARAMS] = {}; int32_t total_output_size = 0; for (int i = 0; i < params.tensor_count; i++) { if (params.tensor_types[i] == PTOParamType::OUTPUT && params.tensors[i]->buffer.addr == 0) { + needs_alloc[i] = true; total_output_size += PTO2_ALIGN_UP(params.tensors[i]->buffer.size, PTO2_PACKED_OUTPUT_ALIGN); } } @@ -491,7 +493,7 @@ void pto2_submit_mixed_task( PTOParamType ptype = params.tensor_types[i]; if (ptype == PTOParamType::OUTPUT || ptype == PTOParamType::INOUT) { if (!params.tensors[i]->manual_dep) { - orch->tensor_map.insert(*params.tensors[i], mixed_task_id, ptype == PTOParamType::OUTPUT); + orch->tensor_map.insert(*params.tensors[i], mixed_task_id, ptype, needs_alloc[i]); } } } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp index 50cd57b1..32a78a4d 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.cpp @@ -1,16 +1,5 @@ /** * PTO Runtime2 - TensorMap Implementation - * - * Implements TensorMap with ring buffer pool, lazy invalidation, - * and chain truncation optimization. - * - * Key features: - * 1. O(1) insert at bucket head - * 2. O(valid_entries) lookup with chain truncation - * 3. Automatic stale entry cleanup during lookup - * 4. Periodic explicit cleanup for long chains - * - * Based on: docs/runtime_buffer_manager_methods.md */ #include "pto_tensormap.h" @@ -22,9 +11,6 @@ #include "common/unified_log.h" #include "pto_orchestrator.h" -// ============================================================================= -// TensorMap Lookup Chain Length Statistics (compile-time toggle) -// ============================================================================= #if PTO2_TENSORMAP_PROFILING uint64_t g_lookup_chain_total = 0; uint64_t g_lookup_count = 0; @@ -34,53 +20,58 @@ uint64_t g_lookup_overlap_hits = 0; uint64_t g_insert_count = 0; #endif -// ============================================================================= -// Initialization and Destruction -// ============================================================================= +namespace { -bool PTO2TensorMap::init(int32_t new_num_buckets, int32_t new_pool_size, const int32_t new_task_window_sizes[PTO2_MAX_RING_DEPTH]) { - // Validate power of 2 for fast modulo - if ((new_num_buckets & (new_num_buckets - 1)) != 0) { - return false; // num_buckets must be power of 2 +bool is_power_of_two(int32_t value) { + return value > 0 && (value & (value - 1)) == 0; +} + +bool tensor_has_valid_owner_ring(const Tensor& tensor) { + return tensor.ring_id == TENSOR_RING_ID_NONE || tensor.ring_id < PTO2_MAX_RING_DEPTH; +} + +} // namespace + +template +bool TensorMapShardImpl::init( + int32_t new_num_buckets, int32_t new_pool_size, const int32_t new_task_window_sizes[]) { + if (!is_power_of_two(new_num_buckets)) { + return false; + } + for (int d = 0; d < NumCleanupDomains; d++) { + if (!is_power_of_two(new_task_window_sizes[d])) { + return false; + } } - // Allocate buckets - buckets = (PTO2TensorMapEntry**)malloc(new_num_buckets * sizeof(PTO2TensorMapEntry*)); + num_buckets = new_num_buckets; + pool_size = new_pool_size; + + buckets = static_cast(malloc(num_buckets * sizeof(PTO2TensorMapEntry*))); if (!buckets) { return false; } - - // Initialize all buckets to empty (-1) - for (int32_t i = 0; i < new_num_buckets; i++) { + for (int32_t i = 0; i < num_buckets; i++) { buckets[i] = nullptr; } - num_buckets = new_num_buckets; - - // Allocate entry pool (64-byte aligned for cache-line-aligned entries) - entry_pool = (PTO2TensorMapEntry*)aligned_alloc(alignof(PTO2TensorMapEntry), new_pool_size * sizeof(PTO2TensorMapEntry)); + entry_pool = static_cast( + aligned_alloc(alignof(PTO2TensorMapEntry), pool_size * sizeof(PTO2TensorMapEntry))); if (!entry_pool) { - free(buckets); - buckets = NULL; + destroy(); return false; } - memset(entry_pool, 0, new_pool_size * sizeof(PTO2TensorMapEntry)); + memset(entry_pool, 0, pool_size * sizeof(PTO2TensorMapEntry)); - // Allocate free entry list - free_entry_list = (PTO2TensorMapEntry**)calloc(new_pool_size, sizeof(PTO2TensorMapEntry*)); + free_entry_list = static_cast(calloc(pool_size, sizeof(PTO2TensorMapEntry*))); if (!free_entry_list) { - free(buckets); - free(entry_pool); - buckets = NULL; - entry_pool = NULL; + destroy(); return false; } - pool_size = new_pool_size; next_entry_idx = 0; free_num = 0; - // Initialize all entries as not in bucket for (int32_t i = 0; i < pool_size; i++) { entry_pool[i].bucket_index = -1; entry_pool[i].next_in_bucket = nullptr; @@ -88,72 +79,325 @@ bool PTO2TensorMap::init(int32_t new_num_buckets, int32_t new_pool_size, const i entry_pool[i].next_in_task = nullptr; entry_pool[i].prev_in_task = nullptr; entry_pool[i].producer_task_id = PTO2TaskId{}; + entry_pool[i].tensor_owner_ring = TENSOR_RING_ID_NONE; + entry_pool[i].storage_domain = TensorMapStorageDomain::OWNER_MAP; + entry_pool[i].with_alloc = false; } - // Allocate per-ring per-task entry tracking (each ring has its own window size) - for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { - task_entry_heads[r] = (PTO2TensorMapEntry**)malloc(new_task_window_sizes[r] * sizeof(PTO2TensorMapEntry*)); - if (!task_entry_heads[r]) { - // Cleanup previously allocated rings - for (int j = 0; j < r; j++) { - free(task_entry_heads[j]); - task_entry_heads[j] = NULL; - } - free(entry_pool); - free(buckets); - free(free_entry_list); - entry_pool = NULL; - buckets = NULL; - free_entry_list = NULL; + for (int d = 0; d < NumCleanupDomains; d++) { + task_entry_heads[d] = static_cast( + malloc(new_task_window_sizes[d] * sizeof(PTO2TensorMapEntry*))); + if (!task_entry_heads[d]) { + destroy(); return false; } - for (int32_t i = 0; i < new_task_window_sizes[r]; i++) { - task_entry_heads[r][i] = nullptr; + for (int32_t i = 0; i < new_task_window_sizes[d]; i++) { + task_entry_heads[d][i] = nullptr; } - task_window_sizes[r] = new_task_window_sizes[r]; - } - - for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { - last_task_alives[r] = 0; - last_cleanup[r] = 0; + task_window_sizes[d] = new_task_window_sizes[d]; + last_task_alives[d] = 0; + last_cleanup[d] = 0; } return true; } -bool PTO2TensorMap::init_default(const int32_t new_task_window_sizes[PTO2_MAX_RING_DEPTH]) { - return init(PTO2_TENSORMAP_NUM_BUCKETS, PTO2_TENSORMAP_POOL_SIZE, new_task_window_sizes); -} - -void PTO2TensorMap::destroy() { +template +void TensorMapShardImpl::destroy() { if (buckets) { free(buckets); - buckets = NULL; + buckets = nullptr; } - if (entry_pool) { free(entry_pool); - entry_pool = NULL; + entry_pool = nullptr; + } + if (free_entry_list) { + free(free_entry_list); + free_entry_list = nullptr; } + for (int d = 0; d < NumCleanupDomains; d++) { + if (task_entry_heads[d]) { + free(task_entry_heads[d]); + task_entry_heads[d] = nullptr; + } + task_window_sizes[d] = 0; + last_task_alives[d] = 0; + last_cleanup[d] = 0; + } + num_buckets = 0; + pool_size = 0; + next_entry_idx = 0; + free_num = 0; +} - for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { - if (task_entry_heads[r]) { - free(task_entry_heads[r]); - task_entry_heads[r] = NULL; +template +void TensorMapShardImpl::sync_validity( + int32_t cleanup_domain, int32_t last_task_alive) { + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + last_task_alives[cleanup_domain] = last_task_alive; +} + +template +bool TensorMapShardImpl::need_cleanup( + int32_t cleanup_domain, int32_t new_last_task_alive) const { + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + int32_t old_last_cleanup = last_cleanup[cleanup_domain]; + if (new_last_task_alive <= old_last_cleanup) { + return false; + } + if (new_last_task_alive - old_last_cleanup >= PTO2_TENSORMAP_CLEANUP_INTERVAL) { + return true; + } + return free_num < PTO2_TENSORMAP_CLEANUP_INTERVAL; +} + +template +uint32_t TensorMapShardImpl::hash(uint64_t key) const { + key = key ^ (key >> 16); + key = key ^ (key >> 32); + return static_cast(key & (num_buckets - 1)); +} + +template +int32_t TensorMapShardImpl::cleanup_domain_of( + PTO2TaskId producer_task_id) const { + if constexpr (NumCleanupDomains == 1) { + (void)producer_task_id; + return 0; + } else { + return static_cast(pto2_task_id_ring(producer_task_id)); + } +} + +template +int32_t TensorMapShardImpl::cleanup_domain_of( + const PTO2TensorMapEntry& entry) const { + if constexpr (NumCleanupDomains == 1) { + (void)entry; + return 0; + } else { + return static_cast(entry.producer_ring()); + } +} + +template +int32_t TensorMapShardImpl::lifecycle_local_of( + PTO2TaskId producer_task_id) const { + return static_cast(pto2_task_id_local(producer_task_id)); +} + +template +int32_t TensorMapShardImpl::lifecycle_local_of( + const PTO2TensorMapEntry& entry) const { + return static_cast(entry.producer_local()); +} + +template +bool TensorMapShardImpl::entry_valid( + const PTO2TensorMapEntry& entry) const { + int32_t cleanup_domain = cleanup_domain_of(entry); + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + return lifecycle_local_of(entry) >= last_task_alives[cleanup_domain]; +} + +template +PTO2TensorMapEntry* TensorMapShardImpl::new_entry() { + if (free_num > 0) { + PTO2TensorMapEntry* res = free_entry_list[--free_num]; + debug_assert(res->bucket_index == -1); + return res; + } + always_assert(next_entry_idx < pool_size); + PTO2TensorMapEntry* res = &entry_pool[next_entry_idx++]; + debug_assert(res->bucket_index == -1); + return res; +} + +template +void TensorMapShardImpl::unlink_from_bucket( + PTO2TensorMapEntry& entry) { + always_assert(entry.bucket_index != -1); + if (entry.prev_in_bucket == nullptr) { + buckets[entry.bucket_index] = entry.next_in_bucket; + } else { + entry.prev_in_bucket->next_in_bucket = entry.next_in_bucket; + } + if (entry.next_in_bucket != nullptr) { + entry.next_in_bucket->prev_in_bucket = entry.prev_in_bucket; + } + entry.next_in_bucket = nullptr; + entry.prev_in_bucket = nullptr; +} + +template +void TensorMapShardImpl::unlink_from_task( + PTO2TensorMapEntry& entry) { + int32_t cleanup_domain = cleanup_domain_of(entry); + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + int32_t local_id = lifecycle_local_of(entry); + int32_t task_slot = local_id & (task_window_sizes[cleanup_domain] - 1); + if (entry.prev_in_task == nullptr) { + task_entry_heads[cleanup_domain][task_slot] = entry.next_in_task; + } else { + entry.prev_in_task->next_in_task = entry.next_in_task; + } + if (entry.next_in_task != nullptr) { + entry.next_in_task->prev_in_task = entry.prev_in_task; + } + entry.next_in_task = nullptr; + entry.prev_in_task = nullptr; +} + +template +void TensorMapShardImpl::reclaim_entry( + PTO2TensorMapEntry& entry) { + always_assert(entry.bucket_index != -1); + unlink_from_bucket(entry); + free_entry_list[free_num++] = &entry; + entry.bucket_index = -1; + entry.next_in_bucket = nullptr; + entry.prev_in_bucket = nullptr; + entry.next_in_task = nullptr; + entry.prev_in_task = nullptr; +} + +template +void TensorMapShardImpl::lookup( + const Tensor& tensor, PTO2LookupResult& result) const { + uint32_t bucket_index = hash(tensor.buffer.addr); + PTO2TensorMapEntry* cur_entry = buckets[bucket_index]; + +#if PTO2_TENSORMAP_PROFILING + g_lookup_count++; + int32_t chain_len = 0; +#endif + + while (cur_entry != nullptr) { + PTO2TensorMapEntry* next_entry = cur_entry->next_in_bucket; + if (next_entry) __builtin_prefetch(next_entry, 0, 0); + +#if PTO2_TENSORMAP_PROFILING + chain_len++; +#endif + + if (!entry_valid(*cur_entry)) { + if constexpr (BreakOnStale) { + break; + } + cur_entry = next_entry; + continue; + } + + if (tensor.buffer.addr == cur_entry->buffer_addr) { + if (next_entry) { + PTO2TensorMapEntry* next_next = next_entry->next_in_bucket; + if (next_next) __builtin_prefetch(next_next, 0, 0); + } +#if PTO2_TENSORMAP_PROFILING + g_lookup_overlap_checks++; +#endif + OverlapStatus overlap_status = cur_entry->check_overlap(tensor); + if (overlap_status != OverlapStatus::NO_OVERLAP) { + result.push(cur_entry, overlap_status); +#if PTO2_TENSORMAP_PROFILING + g_lookup_overlap_hits++; +#endif + } } + + cur_entry = next_entry; } - if (free_entry_list) { - free(free_entry_list); - free_entry_list = NULL; +#if PTO2_TENSORMAP_PROFILING + g_lookup_chain_total += chain_len; + if (chain_len > g_lookup_chain_max) g_lookup_chain_max = chain_len; +#endif +} + +template +PTO2TensorMapEntry* TensorMapShardImpl::insert( + const Tensor& tensor, PTO2TaskId producer_task_id, const TensorMapInsertMeta& meta) { +#if PTO2_TENSORMAP_PROFILING + g_insert_count++; +#endif + int32_t cleanup_domain = cleanup_domain_of(producer_task_id); + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + if (meta.storage_domain == TensorMapStorageDomain::OWNER_MAP) { + always_assert(meta.tensor_owner_ring < PTO2_MAX_RING_DEPTH); + } + + uint32_t bucket_index = hash(tensor.buffer.addr); + __builtin_prefetch(&buckets[bucket_index], 1, 0); + int32_t local_id = lifecycle_local_of(producer_task_id); + int32_t task_slot = local_id & (task_window_sizes[cleanup_domain] - 1); + __builtin_prefetch(&task_entry_heads[cleanup_domain][task_slot], 1, 0); + + PTO2TensorMapEntry* entry = new_entry(); + entry->copy_from_tensor(tensor); + entry->producer_task_id = producer_task_id; + entry->tensor_owner_ring = meta.tensor_owner_ring; + entry->storage_domain = meta.storage_domain; + entry->with_alloc = meta.with_alloc; + + entry->bucket_index = static_cast(bucket_index); + entry->next_in_bucket = buckets[bucket_index]; + if (entry->next_in_bucket != nullptr) { + entry->next_in_bucket->prev_in_bucket = entry; + } + buckets[bucket_index] = entry; + entry->prev_in_bucket = nullptr; + + entry->next_in_task = task_entry_heads[cleanup_domain][task_slot]; + entry->prev_in_task = nullptr; + if (entry->next_in_task != nullptr) { + entry->next_in_task->prev_in_task = entry; } + task_entry_heads[cleanup_domain][task_slot] = entry; + return entry; } -// ============================================================================= -// Debug Utilities -// ============================================================================= +template +void TensorMapShardImpl::remove_entry( + PTO2TensorMapEntry& entry) { + always_assert(entry.bucket_index != -1); + unlink_from_task(entry); + reclaim_entry(entry); +} -void PTO2TensorMap::print_stats() { +template +void TensorMapShardImpl::cleanup_range( + int32_t cleanup_domain, int32_t old_last_task_alive, int32_t new_last_task_alive) { + always_assert(cleanup_domain >= 0 && cleanup_domain < NumCleanupDomains); + for (int32_t local_id = old_last_task_alive; local_id < new_last_task_alive; local_id++) { + int32_t task_slot = local_id & (task_window_sizes[cleanup_domain] - 1); + PTO2TensorMapEntry* cur_entry = task_entry_heads[cleanup_domain][task_slot]; + while (cur_entry != nullptr) { + PTO2TensorMapEntry* next_entry = cur_entry->next_in_task; + debug_assert(cleanup_domain_of(*cur_entry) == cleanup_domain); + debug_assert(lifecycle_local_of(*cur_entry) == local_id); + reclaim_entry(*cur_entry); + cur_entry = next_entry; + } + task_entry_heads[cleanup_domain][task_slot] = nullptr; + } + last_cleanup[cleanup_domain] = new_last_task_alive; +} + +template +int32_t TensorMapShardImpl::valid_count() const { + int32_t count = 0; + for (int32_t i = 0; i < pool_size; i++) { + if (entry_pool[i].bucket_index != -1 && entry_valid(entry_pool[i])) { + count++; + } + } + return count; +} + +template +void TensorMapShardImpl::print_stats( + const char* label) const { int32_t valid = 0; int32_t stale = 0; int32_t empty_buckets = 0; @@ -161,7 +405,6 @@ void PTO2TensorMap::print_stats() { int64_t total_chain = 0; int32_t non_empty_buckets = 0; - // Count entries for (int32_t i = 0; i < pool_size; i++) { if (entry_pool[i].bucket_index != -1) { if (entry_valid(entry_pool[i])) { @@ -172,16 +415,13 @@ void PTO2TensorMap::print_stats() { } } - // Count bucket stats for (int32_t b = 0; b < num_buckets; b++) { int32_t chain_len = 0; - auto cur_entry = buckets[b]; - + PTO2TensorMapEntry* cur_entry = buckets[b]; while (cur_entry != nullptr) { chain_len++; cur_entry = cur_entry->next_in_bucket; } - if (chain_len == 0) { empty_buckets++; } else { @@ -193,7 +433,7 @@ void PTO2TensorMap::print_stats() { } } - LOG_INFO("=== TensorMap Statistics ==="); + LOG_INFO("=== TensorMapShard Statistics: %s ===", label); LOG_INFO("Pool size: %d", pool_size); LOG_INFO("Pool next entry idx: %d", next_entry_idx); LOG_INFO("Pool free_num: %d", free_num); @@ -202,38 +442,150 @@ void PTO2TensorMap::print_stats() { LOG_INFO("Stale entries: %d", stale); LOG_INFO("Empty buckets: %d", empty_buckets); LOG_INFO("Max chain len: %d", max_chain); - LOG_INFO("Avg chain len: %.2f", non_empty_buckets > 0 ? (float)total_chain / non_empty_buckets : 0); - for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { - LOG_INFO("Last task alive[%d]: %d", r, last_task_alives[r]); + LOG_INFO("Avg chain len: %.2f", non_empty_buckets > 0 ? static_cast(total_chain) / non_empty_buckets : 0); + for (int d = 0; d < NumCleanupDomains; d++) { + LOG_INFO("last_task_alive[%d]: %d", d, last_task_alives[d]); + LOG_INFO("last_cleanup[%d]: %d", d, last_cleanup[d]); } - LOG_INFO("============================"); } -int32_t PTO2TensorMap::valid_count() { - int32_t count = 0; +template struct TensorMapShardImpl<1, true>; +template struct TensorMapShardImpl; - for (int32_t i = 0; i < pool_size; i++) { - if (entry_pool[i].bucket_index != -1 && entry_valid(entry_pool[i])) { - count++; +bool PTO2TensorMap::init( + int32_t num_buckets, int32_t pool_size, const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]) { + destroy(); + + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + int32_t owner_task_window_sizes[1] = {task_window_sizes[r]}; + if (!owner_shards[r].init(num_buckets, pool_size, owner_task_window_sizes)) { + destroy(); + return false; } } + if (!fallback_shard.init(num_buckets, pool_size, task_window_sizes)) { + destroy(); + return false; + } + + return true; +} + +bool PTO2TensorMap::init_default(const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]) { + return init(PTO2_TENSORMAP_NUM_BUCKETS, PTO2_TENSORMAP_POOL_SIZE, task_window_sizes); +} + +void PTO2TensorMap::destroy() { + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + owner_shards[r].destroy(); + } + fallback_shard.destroy(); +} + +void PTO2TensorMap::lookup(const Tensor& tensor, PTO2LookupResult& result) const { + always_assert(tensor_has_valid_owner_ring(tensor)); + result.count = 0; + if (tensor.ring_id == TENSOR_RING_ID_NONE) { + fallback_shard.lookup(tensor, result); + return; + } + owner_shards[tensor.ring_id].lookup(tensor, result); + fallback_shard.lookup(tensor, result); +} + +void PTO2TensorMap::insert( + const Tensor& tensor, PTO2TaskId producer_task_id, PTOParamType param_type, bool with_alloc) { + always_assert(param_type == PTOParamType::OUTPUT || param_type == PTOParamType::INOUT); + always_assert(tensor_has_valid_owner_ring(tensor)); + uint8_t producer_ring = producer_task_id.ring(); + always_assert(producer_ring < PTO2_MAX_RING_DEPTH); + + TensorMapInsertMeta meta; + meta.tensor_owner_ring = tensor.ring_id; + meta.with_alloc = with_alloc; + + if (tensor.ring_id == TENSOR_RING_ID_NONE) { + meta.storage_domain = TensorMapStorageDomain::FALLBACK_MAP; + fallback_shard.insert(tensor, producer_task_id, meta); + return; + } + + if (param_type == PTOParamType::OUTPUT) { + always_assert(tensor.ring_id == producer_ring); + meta.storage_domain = TensorMapStorageDomain::OWNER_MAP; + owner_shards[producer_ring].insert(tensor, producer_task_id, meta); + return; + } + + if (tensor.ring_id == producer_ring) { + meta.storage_domain = TensorMapStorageDomain::OWNER_MAP; + owner_shards[producer_ring].insert(tensor, producer_task_id, meta); + } else { + meta.storage_domain = TensorMapStorageDomain::FALLBACK_MAP; + fallback_shard.insert(tensor, producer_task_id, meta); + } +} + +void PTO2TensorMap::remove_entry(PTO2TensorMapEntry& entry) { + switch (entry.storage_domain) { + case TensorMapStorageDomain::OWNER_MAP: + always_assert(entry.tensor_owner_ring < PTO2_MAX_RING_DEPTH); + owner_shards[entry.storage_ring()].remove_entry(entry); + return; + case TensorMapStorageDomain::FALLBACK_MAP: + fallback_shard.remove_entry(entry); + return; + } + always_assert(false); +} + +void PTO2TensorMap::print_stats() const { + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + char label[32]; + snprintf(label, sizeof(label), "owner[%d]", r); + owner_shards[r].print_stats(label); + } + fallback_shard.print_stats("fallback"); +} + +int32_t PTO2TensorMap::valid_count() const { + int32_t count = fallback_shard.valid_count(); + for (int r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + count += owner_shards[r].valid_count(); + } return count; } void PTO2TensorMap::sync_tensormap(uint8_t ring_id, int32_t sm_last_task_alive) { - sync_validity(ring_id, sm_last_task_alive); - // Only attempt cleanup when last_task_alive has actually advanced; - // otherwise cleanup_retired would empty-loop and we'd spin forever. - if (sm_last_task_alive - last_cleanup[ring_id] >= PTO2_TENSORMAP_CLEANUP_INTERVAL) { - cleanup_retired(ring_id, last_cleanup[ring_id], sm_last_task_alive); - last_cleanup[ring_id] = sm_last_task_alive; + auto sync_one_ring = [&](uint8_t current_ring, int32_t new_alive) { + owner_shards[current_ring].sync_validity(0, new_alive); + fallback_shard.sync_validity(current_ring, new_alive); + + if (owner_shards[current_ring].need_cleanup(0, new_alive)) { + owner_shards[current_ring].cleanup_range( + 0, owner_shards[current_ring].last_cleanup[0], new_alive); + } + if (fallback_shard.need_cleanup(current_ring, new_alive)) { + fallback_shard.cleanup_range( + current_ring, fallback_shard.last_cleanup[current_ring], new_alive); + } + }; + + if (orch == nullptr || orch->sm_handle == nullptr) { + always_assert(ring_id < PTO2_MAX_RING_DEPTH); + sync_one_ring(ring_id, sm_last_task_alive); + return; + } + + for (uint8_t r = 0; r < PTO2_MAX_RING_DEPTH; r++) { + int32_t new_alive = (r == ring_id) + ? sm_last_task_alive + : orch->sm_handle->header->rings[r].fc.last_task_alive.load(std::memory_order_acquire); + sync_one_ring(r, new_alive); } } -// ============================================================================= -// TensorMap Lookup Profiling -// ============================================================================= #if PTO2_TENSORMAP_PROFILING PTO2TensorMapProfilingData pto2_tensormap_get_profiling() { PTO2TensorMapProfilingData d; @@ -244,7 +596,6 @@ PTO2TensorMapProfilingData pto2_tensormap_get_profiling() { d.overlap_hits = g_lookup_overlap_hits; d.insert_count = g_insert_count; - // Reset g_lookup_chain_total = 0; g_lookup_count = 0; g_lookup_chain_max = 0; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 9d1bb56a..e427a011 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -3,18 +3,18 @@ * * TensorMap provides producer lookup for dependency discovery: * - Maps Tensor -> producer task ID - * - Used by pto_submit_task() to find dependencies + * - Used by pto2_submit_task() to find dependencies * - * Key design features: - * 1. Ring buffer pool for entries (no malloc/free) - * 2. Lazy invalidation (entries become stale when producer retires) - * 3. Per-task per-ring entry tracking for efficient cleanup - * 4. OVERLAP DETECTION: Detects dependencies for overlapping sub-regions + * The facade owns: + * - one owner shard per ring for same-ring history + * - one fallback shard for external tensors and cross-ring modifiers + * + * Owner and fallback both execute the same template implementation. * - * Hash table with chaining: - * - buckets[] array of head offsets - * - Entries linked via next_in_bucket - * - Insert at head (newest first) for sorted chains + * Key design features: + * 1. Ring-buffer entry pool for O(1) append/free-list reuse + * 2. Producer-driven lazy invalidation and cleanup + * 3. OVERLAP DETECTION: detects dependencies for overlapping sub-regions * * CRITICAL: Hash only by base_ptr * ============================== @@ -24,9 +24,7 @@ * * Overlap detection: Two regions create a dependency if: * 1. Same base_ptr (raw tensor pointer) - * 2. Byte ranges [offset, offset+size) intersect - * - * Based on: docs/runtime_buffer_manager_methods.md + * 2. Their multi-dimensional ranges intersect */ #pragma once @@ -35,11 +33,8 @@ #include "pto_runtime2_types.h" #include "tensor.h" -struct PTO2OrchestratorState; // forward declare +struct PTO2OrchestratorState; -// ============================================================================= -// TensorMap Lookup Profiling (must precede inline lookup/insert methods) -// ============================================================================= #ifndef PTO2_TENSORMAP_PROFILING #define PTO2_TENSORMAP_PROFILING 0 #endif @@ -53,45 +48,39 @@ extern uint64_t g_lookup_overlap_hits; extern uint64_t g_insert_count; #endif -// ============================================================================= -// TensorMap Structure -// ============================================================================= +enum class TensorMapStorageDomain : uint8_t { + OWNER_MAP = 0, + FALLBACK_MAP = 1, +}; /** - * TensorMap entry structure — cache-line optimized for lookup - * - * Cache line 1 (64B, lookup hot path): - * next_in_bucket, producer_task_id, buffer_addr — chain traversal + validity + hash match - * version, ndims, is_all_offset_zero, with_alloc, bucket_index — overlap fast path - * shapes[5] — overlap comparison + * TensorMap entry structure. * - * Cache line 2 (64B, insert/remove/slow-path only): - * prev_in_bucket, next_in_task, prev_in_task — chain manipulation - * offsets[5] — only read when !is_all_offset_zero + * Cache line 1 keeps lookup-hot metadata plus overlap info. + * Cache line 2 keeps intrusive links and slow-path offsets. * * When is_all_offset_zero is true, lookup touches only cache line 1. - * Entry size: 128B (2 cache lines) vs previous 192B (3 cache lines with embedded Tensor). + * Entry size: 128B (2 cache lines). */ struct alignas(64) PTO2TensorMapEntry { // === Cache line 1 (64B) — lookup hot path === - PTO2TensorMapEntry* next_in_bucket; // 8B: next entry in hash bucket chain - PTO2TaskId producer_task_id; // 8B: raw (ring_id << 32) | local_id - uint64_t buffer_addr; // 8B: tensor base address (hash key) - int32_t version; // 4B: tensor version for overlap detection - uint32_t ndims; // 4B: number of dimensions - int32_t bucket_index; // 4B: bucket index (-1 if unlinked) - bool is_all_offset_zero; // 1B: fast-path flag - bool with_alloc; // 1B: true=OUTPUT, false=INOUT - // padding: 2B - uint32_t shapes[RUNTIME_MAX_TENSOR_DIMS]; // 20B: shape per dimension - // padding: 4B to fill 64B + PTO2TensorMapEntry* next_in_bucket; // 8B: next entry in hash bucket chain + PTO2TaskId producer_task_id; // 8B: raw (ring_id << 32) | local_id + uint64_t buffer_addr; // 8B: tensor base address (hash key) + int32_t version; // 4B: tensor version for overlap detection + int32_t bucket_index; // 4B: bucket index (-1 if unlinked) + uint16_t ndims; // 2B: number of dimensions + uint8_t tensor_owner_ring; // 1B: tensor owner ring, or TENSOR_RING_ID_NONE + TensorMapStorageDomain storage_domain; // 1B: owner shard or fallback shard + bool is_all_offset_zero; // 1B: fast-path flag + bool with_alloc; // 1B: true=producer created a new runtime allocation; false=modifier or preallocated-output history + uint32_t shapes[RUNTIME_MAX_TENSOR_DIMS]; // 20B: shape per dimension // === Cache line 2 (64B) — insert/remove/slow-path === - PTO2TensorMapEntry* prev_in_bucket; // 8B: prev in hash bucket chain - PTO2TensorMapEntry* next_in_task; // 8B: next entry for same task - PTO2TensorMapEntry* prev_in_task; // 8B: prev entry for same task + PTO2TensorMapEntry* prev_in_bucket; // 8B: prev in hash bucket chain + PTO2TensorMapEntry* next_in_task; // 8B: next entry for same cleanup task + PTO2TensorMapEntry* prev_in_task; // 8B: prev entry for same cleanup task uint32_t offsets[RUNTIME_MAX_TENSOR_DIMS]; // 20B: only when !is_all_offset_zero - // padding: 20B to fill 64B /** * Copy overlap-relevant fields from a Tensor into this entry. @@ -99,7 +88,7 @@ struct alignas(64) PTO2TensorMapEntry { void copy_from_tensor(const Tensor& t) { buffer_addr = t.buffer.addr; version = t.version; - ndims = t.ndims; + ndims = static_cast(t.ndims); is_all_offset_zero = t.is_all_offset_zero; for (uint32_t i = 0; i < t.ndims; i++) { shapes[i] = t.shapes[i]; @@ -111,8 +100,21 @@ struct alignas(64) PTO2TensorMapEntry { } } + uint8_t producer_ring() const { + return pto2_task_id_ring(producer_task_id); + } + + uint32_t producer_local() const { + return pto2_task_id_local(producer_task_id); + } + + uint8_t storage_ring() const { + debug_assert(storage_domain == TensorMapStorageDomain::OWNER_MAP); + return tensor_owner_ring; + } + /** - * Check overlap between input tensor and this entry (the producer output). + * Check overlap between the input tensor and this entry. * Mirrors Tensor::is_overlap() logic but operates on entry fields directly. */ OverlapStatus check_overlap(const Tensor& input) const { @@ -121,7 +123,7 @@ struct alignas(64) PTO2TensorMapEntry { if (input.version > version) { return OverlapStatus::OTHER; } - // Fast path: both have zero offsets → ranges are [0, shape[i]) + if (input.is_all_offset_zero && is_all_offset_zero) { bool contains = true; for (uint32_t i = 0; i < ndims; i++) { @@ -132,16 +134,17 @@ struct alignas(64) PTO2TensorMapEntry { } return contains ? OverlapStatus::COVERED : OverlapStatus::OTHER; } - // Slow path: at least one has non-zero offsets + bool contains = true; for (uint32_t i = 0; i < ndims; i++) { uint64_t in_off = input.is_all_offset_zero ? 0 : input.offsets[i]; uint64_t ent_off = is_all_offset_zero ? 0 : offsets[i]; - Segment in_range{in_off, in_off + (uint64_t)input.shapes[i]}; - Segment ent_range{ent_off, ent_off + (uint64_t)shapes[i]}; + Segment in_range{in_off, in_off + static_cast(input.shapes[i])}; + Segment ent_range{ent_off, ent_off + static_cast(shapes[i])}; if (!in_range.line_segment_intersection(ent_range)) { return OverlapStatus::NO_OVERLAP; - } else if (!in_range.contains(ent_range)) { + } + if (!in_range.contains(ent_range)) { contains = false; } } @@ -151,13 +154,13 @@ struct alignas(64) PTO2TensorMapEntry { static_assert(sizeof(PTO2TensorMapEntry) == 128, "TensorMapEntry must be exactly 2 cache lines (128 bytes)"); +#define PTO2_LOOKUP_MAX_RESULTS 16 + /** - * Stack-allocated lookup result (avoids heap allocation per lookup) + * Stack-allocated lookup result buffer. + * + * Facade lookup appends results from owner shard first and fallback shard next. */ -#define PTO2_LOOKUP_MAX_RESULTS 16 -// ============================================================================= -// TensorMap Lookup Chain Length Statistics (compile-time toggle) -// ============================================================================= struct PTO2LookupResult { struct Entry { PTO2TensorMapEntry* entry; @@ -173,335 +176,159 @@ struct PTO2LookupResult { } }; -/** - * TensorMap structure - * - * Hash table with ring buffer entry pool and lazy invalidation. - */ -struct PTO2TensorMap { - // Hash table buckets (fixed size, power of 2) - PTO2TensorMapEntry** buckets; // Array of offsets into entry_pool (-1 = empty) - int32_t num_buckets; // Must be power of 2 for fast modulo - - // Entry pool as ring buffer - PTO2TensorMapEntry* entry_pool; // Ring buffer of entries - PTO2TensorMapEntry** free_entry_list; // free entry ids - int32_t pool_size; // Total pool capacity - int32_t next_entry_idx; // id when next entry insert - int32_t free_num; // free entry number in entry pool - - // Per-ring per-task entry tracking (for efficient bucket cleanup) - // Indexed by [ring_id][local_id & (task_window_sizes[ring_id] - 1)] - PTO2TensorMapEntry** task_entry_heads[PTO2_MAX_RING_DEPTH]; - int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]; // Per-ring task window size (for slot masking) - - // Per-ring validity threshold (for lazy invalidation) - int32_t last_task_alives[PTO2_MAX_RING_DEPTH]; // Cached from shared memory per ring +struct TensorMapInsertMeta { + // Original tensor owner ring, or TENSOR_RING_ID_NONE for external tensors. + uint8_t tensor_owner_ring{TENSOR_RING_ID_NONE}; - // Per-ring cleanup progress (for periodic cleanup_retired) - int32_t last_cleanup[PTO2_MAX_RING_DEPTH]{}; + // Which shard actually stores this entry. + TensorMapStorageDomain storage_domain{TensorMapStorageDomain::OWNER_MAP}; - PTO2OrchestratorState* orch{nullptr}; - - // new_entry目前不负责分配属性,仅分配内存 - PTO2TensorMapEntry* new_entry() { - if (free_num > 0) { - PTO2TensorMapEntry* res = free_entry_list[--free_num]; - debug_assert(res->bucket_index == -1); - return res; - } - always_assert(next_entry_idx < pool_size); - PTO2TensorMapEntry* res = &entry_pool[next_entry_idx++]; - debug_assert(res->bucket_index == -1); - return res; - } - - void free_entry(PTO2TensorMapEntry& entry) { - always_assert(entry.bucket_index != -1); // 必须保证仍在桶中 - - // Update predecessor's next pointer (O(1) via prev_in_bucket) - if (entry.prev_in_bucket == nullptr) { - // Entry is the head of its bucket chain, update bucket head - // Must compute hash BEFORE clearing tensor - buckets[entry.bucket_index] = entry.next_in_bucket; - } else { - entry.prev_in_bucket->next_in_bucket = entry.next_in_bucket; - } - - // Update successor's prev pointer - if (entry.next_in_bucket != nullptr) { - entry.next_in_bucket->prev_in_bucket = entry.prev_in_bucket; - } - - free_entry_list[free_num++] = &entry; - entry.bucket_index = -1; - entry.next_in_bucket = nullptr; - entry.prev_in_bucket = nullptr; - entry.next_in_task = nullptr; - entry.prev_in_task = nullptr; - } + // True when producer created a new runtime allocation for this history entry. + bool with_alloc{false}; +}; - // ============================================================================= - // TensorMap API - // ============================================================================= +/** + * Shared core for both owner and fallback storage. + * + * OwnerTensorMapShard: + * - NumCleanupDomains = 1 + * - BreakOnStale = true + * + * FallbackTensorMapShard: + * - NumCleanupDomains = PTO2_MAX_RING_DEPTH + * - BreakOnStale = false + */ +template +struct TensorMapShardImpl { + static_assert(NumCleanupDomains > 0, "TensorMapShardImpl must have at least one cleanup domain"); + static constexpr int32_t kNumCleanupDomains = NumCleanupDomains; + static constexpr bool kBreakOnStale = BreakOnStale; + + PTO2TensorMapEntry** buckets{nullptr}; + int32_t num_buckets{0}; + + PTO2TensorMapEntry* entry_pool{nullptr}; + PTO2TensorMapEntry** free_entry_list{nullptr}; + int32_t pool_size{0}; + int32_t next_entry_idx{0}; + int32_t free_num{0}; + + // task_entry_heads[domain][local & (window_size - 1)] -> intrusive list head + PTO2TensorMapEntry** task_entry_heads[NumCleanupDomains]{}; + int32_t task_window_sizes[NumCleanupDomains]{}; + int32_t last_task_alives[NumCleanupDomains]{}; + int32_t last_cleanup[NumCleanupDomains]{}; /** - * Initialize TensorMap + * Initialize shard storage. * - * @param num_buckets Number of hash buckets (must be power of 2) - * @param pool_size Size of entry pool - * @return true on success, false on allocation failure + * num_buckets and each task_window_sizes[d] must be powers of two. */ - bool init(int32_t num_buckets, int32_t pool_size, const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]); + bool init(int32_t num_buckets, int32_t pool_size, const int32_t task_window_sizes[]); + void destroy(); /** - * Initialize TensorMap with default sizes + * Mirror the latest last_task_alive for one cleanup domain. */ - bool init_default(const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]); + void sync_validity(int32_t cleanup_domain, int32_t last_task_alive); /** - * Destroy TensorMap and free resources + * Decide whether cleanup should run for [last_cleanup, new_last_task_alive). */ - void destroy(); + bool need_cleanup(int32_t cleanup_domain, int32_t new_last_task_alive) const; /** - * Update validity threshold from shared memory - * Called periodically to refresh the lazy invalidation threshold. + * Lookup producer history for one shard. * - * @param last_task_alive Current value from shared memory + * Results are appended into the caller-provided buffer. */ - void sync_validity(int32_t ring_id, int32_t last_task_alive) { - this->last_task_alives[ring_id] = last_task_alive; - } + void lookup(const Tensor& tensor, PTO2LookupResult& result) const; /** - * Lookup producer for a tensor region - * - * Searches the hash table for a matching region. - * Returns producer entry if found and valid. - * Stale entries from different rings are skipped (not truncated). - * - * @param tensor Tensor to look up - * @param result Output: stack-allocated result buffer + * Insert a new history entry into the shard selected by the facade. */ - void lookup(const Tensor& tensor, PTO2LookupResult& result) { - uint32_t bucket_index = hash(tensor.buffer.addr); - PTO2TensorMapEntry* cur_entry = buckets[bucket_index]; - - result.count = 0; -#if PTO2_TENSORMAP_PROFILING - g_lookup_count++; - int32_t chain_len = 0; -#endif - - while (cur_entry != nullptr) { - // Prefetch next entry to hide pointer-chasing latency. - // entry_valid() + is_overlap() computation provides hide time. - PTO2TensorMapEntry* next_entry = cur_entry->next_in_bucket; - if (next_entry) __builtin_prefetch(next_entry, 0, 0); - -#if PTO2_TENSORMAP_PROFILING - chain_len++; -#endif - // Skip stale entries (no chain truncation — entries from different - // rings can be interleaved, so a stale entry from one ring does NOT - // imply subsequent entries from other rings are also stale) - if (!entry_valid(*cur_entry)) { - cur_entry = next_entry; - continue; - } - - // Entry is valid - check if regions OVERLAP (not just exact match) - // Since we hash only by base_ptr, all entries in this bucket have - // potential to overlap. We must check actual byte-range overlap. - if (tensor.buffer.addr == cur_entry->buffer_addr) { - // Double prefetch: check_overlap provides enough hide time - // to also warm up the entry after next. - if (next_entry) { - PTO2TensorMapEntry* next_next = next_entry->next_in_bucket; - if (next_next) __builtin_prefetch(next_next, 0, 0); - } -#if PTO2_TENSORMAP_PROFILING - g_lookup_overlap_checks++; -#endif - auto overlap_status = cur_entry->check_overlap(tensor); - if (overlap_status != OverlapStatus::NO_OVERLAP) { - result.push(cur_entry, overlap_status); -#if PTO2_TENSORMAP_PROFILING - g_lookup_overlap_hits++; -#endif - } - } - - // Move to next entry - cur_entry = next_entry; - } -#if PTO2_TENSORMAP_PROFILING - g_lookup_chain_total += chain_len; - if (chain_len > g_lookup_chain_max) g_lookup_chain_max = chain_len; -#endif - } + PTO2TensorMapEntry* insert(const Tensor& tensor, PTO2TaskId producer_task_id, const TensorMapInsertMeta& meta); /** - * Insert a new entry (called when task produces output) - * - * Allocates from ring buffer pool, may overwrite stale entries. - * Inserts at head of hash bucket chain (maintains task_id ordering). - * - * @param tensor Tensor produced - * @param producer_task_id Task ID of producer + * Remove one entry from both bucket chain and task chain in O(1). */ - void insert(const Tensor& tensor, PTO2TaskId producer_task_id, bool with_alloc) { -#if PTO2_TENSORMAP_PROFILING - g_insert_count++; -#endif - // Prefetch bucket head and task_entry_head early; new_entry() + field - // initialization below provides hide time for these RFOs. - uint32_t bucket_index = hash(tensor.buffer.addr); - __builtin_prefetch(&buckets[bucket_index], 1, 0); - auto ring_id = producer_task_id.ring(); - auto local_id = producer_task_id.local(); - int32_t task_slot = local_id & (task_window_sizes[ring_id] - 1); - __builtin_prefetch(&task_entry_heads[ring_id][task_slot], 1, 0); - - // Allocate entry from ring buffer pool - PTO2TensorMapEntry* entry = new_entry(); - - // Initialize new entry - entry->copy_from_tensor(tensor); - entry->producer_task_id = producer_task_id; - entry->with_alloc = with_alloc; - - // Insert at head of hash bucket (maintains task_id descending order) - entry->bucket_index = bucket_index; - entry->next_in_bucket = buckets[bucket_index]; - // Update old head's prev pointer - if (entry->next_in_bucket != nullptr) { - entry->next_in_bucket->prev_in_bucket = entry; - } - buckets[entry->bucket_index] = entry; - entry->prev_in_bucket = nullptr; // New head has no predecessor - - // Link to task's entry list (for cleanup), indexed by ring and local slot - entry->next_in_task = task_entry_heads[ring_id][task_slot]; - entry->prev_in_task = nullptr; // New head has no predecessor - // Update old head's prev pointer - if (entry->next_in_task != nullptr) { - entry->next_in_task->prev_in_task = entry; - } - task_entry_heads[ring_id][task_slot] = entry; - } + void remove_entry(PTO2TensorMapEntry& entry); /** - * Cleanup stale entries for retired tasks - * - * Called periodically by Orchestrator when last_task_alive advances. - * Removes entries from bucket chains for tasks in [old, new) range. - * - * @param old_last_task_alive Previous threshold - * @param new_last_task_alive New threshold + * Cleanup retired producer tasks in one cleanup domain. */ - void cleanup_retired(int32_t ring_id, int32_t old_last_task_alive, int32_t new_last_task_alive) { - // Iterate through retired tasks on this ring and remove their entries - for (int32_t local_id = old_last_task_alive; local_id < new_last_task_alive; local_id++) { - int32_t task_slot = local_id & (task_window_sizes[ring_id] - 1); - PTO2TensorMapEntry* cur_entry = task_entry_heads[ring_id][task_slot]; - - while (cur_entry != nullptr) { - PTO2TensorMapEntry* next_entry = cur_entry->next_in_task; // Save before clearing - // Only remove if this entry belongs to the retiring task - // (slot may have been reused by a newer task) - debug_assert(cur_entry->producer_task_id == - pto2_make_task_id(static_cast(ring_id), - static_cast(local_id))); - free_entry(*cur_entry); - cur_entry = next_entry; - } - - // Clear task's entry head (slot will be reused by local_id + task_window_sizes[ring_id]) - task_entry_heads[ring_id][task_slot] = nullptr; - } - } - - // ============================================================================= - // Internal Helpers (exposed for testing) - // ============================================================================= + void cleanup_range(int32_t cleanup_domain, int32_t old_last_task_alive, int32_t new_last_task_alive); + int32_t valid_count() const; + void print_stats(const char* label) const; +private: /** - * Compute hash for tensor addr + * Compute hash for tensor base address. */ - uint32_t hash(uint64_t key) { - // Improve distribution by mixing bits (pointers often have aligned low bits) - key = key ^ (key >> 16); - key = key ^ (key >> 32); + uint32_t hash(uint64_t key) const; - // Use bitwise AND for power-of-2 modulo (faster than %) - return (uint32_t)(key & (num_buckets - 1)); - } + int32_t cleanup_domain_of(PTO2TaskId producer_task_id) const; + int32_t cleanup_domain_of(const PTO2TensorMapEntry& entry) const; + int32_t lifecycle_local_of(PTO2TaskId producer_task_id) const; + int32_t lifecycle_local_of(const PTO2TensorMapEntry& entry) const; /** - * Check if entry is valid (producer has not retired) + * Shared validity rule: + * producer_local >= last_task_alives[cleanup_domain_of(entry)] */ - bool entry_valid(const PTO2TensorMapEntry& entry) const { - int32_t ring_id = pto2_task_id_ring(entry.producer_task_id); - int32_t local_id = static_cast(pto2_task_id_local(entry.producer_task_id)); - return local_id >= last_task_alives[ring_id]; - } + bool entry_valid(const PTO2TensorMapEntry& entry) const; + PTO2TensorMapEntry* new_entry(); + void unlink_from_bucket(PTO2TensorMapEntry& entry); + void unlink_from_task(PTO2TensorMapEntry& entry); + void reclaim_entry(PTO2TensorMapEntry& entry); +}; - void remove_entry(PTO2TensorMapEntry& entry) { - remove_from_task(entry); - free_entry(entry); - } +using OwnerTensorMapShard = TensorMapShardImpl<1, true>; +using FallbackTensorMapShard = TensorMapShardImpl; - /** - * Remove entry from its task chain (O(1) with prev pointer) - * Called during pool wrap-around to unlink reused entries. - */ - void remove_from_task(PTO2TensorMapEntry& entry) { - always_assert(entry.bucket_index != -1); // 必须保证仍在桶中 - // Update predecessor's next pointer (O(1) via prev_in_task) - if (entry.prev_in_task == nullptr) { - // Entry is the head of its task chain, update task_entry_heads - int32_t ring_id = pto2_task_id_ring(entry.producer_task_id); - int32_t local_id = static_cast(pto2_task_id_local(entry.producer_task_id)); - int32_t task_slot = local_id & (task_window_sizes[ring_id] - 1); - task_entry_heads[ring_id][task_slot] = entry.next_in_task; - } else { - entry.prev_in_task->next_in_task = entry.next_in_task; - } +/** + * Facade that routes requests across owner shards and the fallback shard. + */ +struct PTO2TensorMap { + OwnerTensorMapShard owner_shards[PTO2_MAX_RING_DEPTH]; + FallbackTensorMapShard fallback_shard; - // Update successor's prev pointer - if (entry.next_in_task != nullptr) { - entry.next_in_task->prev_in_task = entry.prev_in_task; - } + PTO2OrchestratorState* orch{nullptr}; - entry.next_in_task = nullptr; - entry.prev_in_task = nullptr; - } + bool init(int32_t num_buckets, int32_t pool_size, const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]); + bool init_default(const int32_t task_window_sizes[PTO2_MAX_RING_DEPTH]); + void destroy(); - // ============================================================================= - // Debug Utilities - // ============================================================================= + /** + * internal tensor: owner shard first, then fallback shard + * external tensor: fallback shard only + */ + void lookup(const Tensor& tensor, PTO2LookupResult& result) const; /** - * Print TensorMap statistics + * OUTPUT or INOUT routing: + * - same-ring internal history -> owner shard + * - cross-ring internal INOUT -> fallback shard + * - external history -> fallback shard + * + * Internal OUTPUT is fail-fast if tensor owner ring != producer ring. */ - void print_stats(); + void insert(const Tensor& tensor, PTO2TaskId producer_task_id, PTOParamType param_type, bool with_alloc); /** - * Get count of valid entries + * Remove by real storage location, not by producer ring or tensor owner alone. */ - int32_t valid_count(); + void remove_entry(PTO2TensorMapEntry& entry); - // ============================================================================= - // TensorMap Synchronization - // ============================================================================= + void print_stats() const; + int32_t valid_count() const; /** - * Sync TensorMap validity threshold from shared memory + * Sync TensorMap validity threshold from shared memory. * - * Called periodically to refresh the lazy invalidation threshold. - * Also triggers cleanup if threshold has advanced significantly. + * Signature is kept stable for orchestrator call sites. The implementation + * now refreshes both owner shards and the fallback shard from real producer + * ring retirement watermarks. */ void sync_tensormap(uint8_t ring_id, int32_t sm_last_task_alive); }; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h index 10b5b582..e2d6c7b8 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/tensor.h @@ -9,6 +9,7 @@ #include "data_type.h" constexpr int RUNTIME_MAX_TENSOR_DIMS = 5; +constexpr uint8_t TENSOR_RING_ID_NONE = 0xFF; // No ring assigned (external tensor) /** * Buffer Handle @@ -66,6 +67,7 @@ struct alignas(64) Tensor { bool is_all_offset_zero; // True when all offsets[] are zero (skip offset read/write) bool is_raw_eq_shapes; // True when raw_shapes[] == shapes[] (skip raw_shapes read/write) bool manual_dep; // True when dependency is managed manually (skip tensormap lookup/insert) + uint8_t ring_id; // Ring that owns this tensor (TENSOR_RING_ID_NONE = unassigned) uint32_t shapes[RUNTIME_MAX_TENSOR_DIMS]; // Current view shape per dimension uint32_t __padding__; @@ -96,9 +98,10 @@ struct alignas(64) Tensor { int32_t version, bool is_all_offset_zero = false, bool is_raw_eq_shapes = false, - bool manual_dep = false) { + bool manual_dep = false, + uint8_t in_ring_id = TENSOR_RING_ID_NONE) { init(addr, buffer_size_bytes, raw_shapes, shapes, offsets, ndims, dtype, version, - is_all_offset_zero, is_raw_eq_shapes, manual_dep); + is_all_offset_zero, is_raw_eq_shapes, manual_dep, in_ring_id); } // --- Initialization --- @@ -112,7 +115,8 @@ struct alignas(64) Tensor { int32_t in_version, bool in_is_all_offset_zero = false, bool in_is_raw_eq_shapes = false, - bool in_manual_dep = false) { + bool in_manual_dep = false, + uint8_t in_ring_id = TENSOR_RING_ID_NONE) { buffer = {reinterpret_cast(addr), buffer_size_bytes}; ndims = in_ndims; dtype = in_dtype; @@ -120,6 +124,7 @@ struct alignas(64) Tensor { is_all_offset_zero = in_is_all_offset_zero; is_raw_eq_shapes = in_is_raw_eq_shapes; manual_dep = in_manual_dep; + ring_id = in_ring_id; for (uint32_t i = 0; i < in_ndims; i++) { shapes[i] = in_shapes[i]; } @@ -155,6 +160,7 @@ struct alignas(64) Tensor { dtype = other.dtype; version = other.version; manual_dep = in_manual_dep; + ring_id = other.ring_id; // view always diverges shapes from raw_shapes, so is_raw_eq_shapes = false. // Read parent's effective raw_shapes (avoids parent cache line 2 when parent is_raw_eq_shapes). is_raw_eq_shapes = false; @@ -285,6 +291,7 @@ struct alignas(64) Tensor { ss << indent << "dtype: " << get_dtype_name(dtype) << std::endl; ss << indent << "ndims: " << ndims << std::endl; ss << indent << "version: " << version << std::endl; + ss << indent << "ring_id: " << (unsigned)ring_id << std::endl; const uint32_t* rs = get_raw_shapes(); ss << indent << "raw_shapes: ["; @@ -320,44 +327,3 @@ static_assert(sizeof(Tensor) == 128, "Tensor must be exactly 2 cache lines (128 static_assert(offsetof(Tensor, raw_shapes) == 64); using TensorData = Tensor; - -// ============================================================================= -// Factory Helpers -// ============================================================================= -/** - * Create a Tensor for pre-allocated external memory. - */ -static inline Tensor make_tensor_external(void* addr, - const uint32_t shapes[], - uint32_t ndims, - DataType dtype = DataType::FLOAT32, - bool manual_dep = false, - int32_t version = 0) { - static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {}; - uint64_t total = 1; - for (uint32_t i = 0; i < ndims; i++) { - total *= shapes[i]; - } - return Tensor(addr, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version, - /*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep); -} - -/** - * Create a Tensor for runtime-allocated output (addr=0). - * NO memory allocation: only records dtype, shape, and buffer.size in the Tensor struct. - * The runtime allocates from the heap ring and fills buffer.addr during pto2_submit_task - * when this tensor is passed as OUTPUT param. No buffer content is ever copied. - */ -static inline Tensor make_tensor(const uint32_t shapes[], - uint32_t ndims, - DataType dtype = DataType::FLOAT32, - bool manual_dep = false, - int32_t version = 0) { - static uint32_t zero_offsets[RUNTIME_MAX_TENSOR_DIMS] = {}; - uint64_t total = 1; - for (uint32_t i = 0; i < ndims; i++) { - total *= shapes[i]; - } - return Tensor(0, total * get_element_size(dtype), shapes, shapes, zero_offsets, ndims, dtype, version, - /*is_all_offset_zero=*/true, /*is_raw_eq_shapes=*/true, manual_dep); -} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp index 8a13111b..6a2a7952 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/batch_paged_attention/kernels/orchestration/paged_attention_orch.cpp @@ -192,7 +192,7 @@ void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, i params_up.add_input(oi_new_b); params_up.add_inout(mi_batch); params_up.add_inout(li_batch); - params_up.add_output(oi_batch); + params_up.add_inout(oi_batch); params_up.add_output(out); params_up.add_scalar(is_first); params_up.add_scalar(is_last);