Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ struct AicpuExecutor {

int thread_num_{0};
int cores_total_num_{0};
int thread_cores_num_{0};
int aic_per_thread_{0}; // Fixed number of AIC cores per thread
int aiv_per_thread_{0}; // Fixed number of AIV cores per thread
int thread_cores_num_[MAX_AICPU_THREADS]{}; // Total cores (AIC+AIV) assigned to each thread
int aic_per_thread_{0}; // Max AIC cores per thread (ceil), used as local queue cap
int aiv_per_thread_{0}; // Max AIV cores per thread (ceil), used as local queue cap
int core_assignments_[MAX_AICPU_THREADS][MAX_CORES_PER_THREAD];

// Core discovery arrays (space-time tradeoff: avoid sorting)
Expand Down Expand Up @@ -251,10 +251,7 @@ int AicpuExecutor::init(Runtime* runtime) {
return -1;
}

// Calculate cores per thread
thread_cores_num_ = cores_total_num_ / thread_num_;

LOG_INFO("Config: threads=%d, cores=%d, cores_per_thread=%d", thread_num_, cores_total_num_, thread_cores_num_);
LOG_INFO("Config: threads=%d, cores=%d", thread_num_, cores_total_num_);

for (int i = 0; i < cores_total_num_; i++) {
pending_task_ids_[i] = AICPU_TASK_INVALID;
Expand Down Expand Up @@ -389,60 +386,48 @@ int AicpuExecutor::handshake_all_cores(Runtime* runtime) {
return 0;
}

// Assign discovered cores to threads (requires even distribution)
// Assign discovered cores to threads using round-robin
void AicpuExecutor::assign_cores_to_threads() {
if (aic_count_ % thread_num_ != 0) {
LOG_ERROR("AIC cores (%d) cannot be evenly distributed to %d threads", aic_count_, thread_num_);
init_failed_.store(true, std::memory_order_release);
return;
}

if (aiv_count_ % thread_num_ != 0) {
LOG_ERROR("AIV cores (%d) cannot be evenly distributed to %d threads", aiv_count_, thread_num_);
init_failed_.store(true, std::memory_order_release);
return;
}
// Round-robin: AIC core i → thread (i % thread_num_), AIV core i → thread (i % thread_num_).
// AIC and AIV are assigned independently; no cluster pairing is required.
// aic_per_thread_ / aiv_per_thread_ store the ceiling value and serve as local queue caps.
aic_per_thread_ = (aic_count_ + thread_num_ - 1) / thread_num_;
aiv_per_thread_ = (aiv_count_ + thread_num_ - 1) / thread_num_;

aic_per_thread_ = aic_count_ / thread_num_;
aiv_per_thread_ = aiv_count_ / thread_num_;

LOG_INFO("Core Assignment: %d AIC/thread, %d AIV/thread", aic_per_thread_, aiv_per_thread_);
LOG_INFO("Core Assignment: %d AIC cores, %d AIV cores across %d threads (max %d AIC/thread, %d AIV/thread)",
aic_count_, aiv_count_, thread_num_, aic_per_thread_, aiv_per_thread_);

for (int t = 0; t < thread_num_; t++) {
int core_idx = 0;

// Assign AIC cores to this thread
int aic_start = t * aic_per_thread_;
int aic_end = (t + 1) * aic_per_thread_;
for (int i = aic_start; i < aic_end; i++) {
// Assign AIC cores: cores at indices t, t+thread_num_, t+2*thread_num_, ...
for (int i = t; i < aic_count_; i += thread_num_) {
core_assignments_[t][core_idx++] = aic_cores_[i].worker_id;
}

// Assign AIV cores to this thread
int aiv_start = t * aiv_per_thread_;
int aiv_end = (t + 1) * aiv_per_thread_;
for (int i = aiv_start; i < aiv_end; i++) {
// Assign AIV cores after AIC cores
for (int i = t; i < aiv_count_; i += thread_num_) {
core_assignments_[t][core_idx++] = aiv_cores_[i].worker_id;
}

thread_cores_num_[t] = core_idx;

char log_buffer[256];
int offset = 0;

offset += snprintf(
log_buffer + offset, sizeof(log_buffer) - offset, "Thread %d: assigned %d cores - AIC[", t, core_idx);

for (int i = 0; i < aic_per_thread_; i++) {
if (i > 0) offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, ",");
offset +=
snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "%d", aic_cores_[aic_start + i].worker_id);
for (int k = 0, i = t; i < aic_count_; i += thread_num_, k++) {
if (k > 0) offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, ",");
offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "%d", aic_cores_[i].worker_id);
}

offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "] AIV[");

for (int i = 0; i < aiv_per_thread_; i++) {
if (i > 0) offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, ",");
offset +=
snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "%d", aiv_cores_[aiv_start + i].worker_id);
for (int k = 0, i = t; i < aiv_count_; i += thread_num_, k++) {
if (k > 0) offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, ",");
offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "%d", aiv_cores_[i].worker_id);
}

offset += snprintf(log_buffer + offset, sizeof(log_buffer) - offset, "]");
Expand Down Expand Up @@ -554,9 +539,9 @@ void AicpuExecutor::classify_and_distribute_initial_tasks(Runtime* runtime) {
int AicpuExecutor::shutdown_aicore(Runtime* runtime, int thread_idx, const int* cur_thread_cores) {
Handshake* all_handshakes = (Handshake*)runtime->workers;

LOG_INFO("Thread %d: Shutting down %d cores", thread_idx, thread_cores_num_);
LOG_INFO("Thread %d: Shutting down %d cores", thread_idx, thread_cores_num_[thread_idx]);

for (int i = 0; i < thread_cores_num_; i++) {
for (int i = 0; i < thread_cores_num_[thread_idx]; i++) {
int core_id = cur_thread_cores[i];
Handshake* hank = &all_handshakes[core_id];
LOG_INFO("Thread %d: AICPU hank addr = 0x%lx", thread_idx, (uint64_t)hank);
Expand Down Expand Up @@ -999,7 +984,7 @@ int AicpuExecutor::run(Runtime* runtime) {
const int* cur_thread_cores = core_assignments_[thread_idx];

LOG_INFO("Thread %d: Runtime has %d tasks", thread_idx, runtime->get_task_count());
int completed = resolve_and_dispatch(*runtime, thread_idx, cur_thread_cores, thread_cores_num_);
int completed = resolve_and_dispatch(*runtime, thread_idx, cur_thread_cores, thread_cores_num_[thread_idx]);
LOG_INFO("Thread %d: Executed %d tasks from runtime", thread_idx, completed);

int rc = shutdown_aicore(runtime, thread_idx, cur_thread_cores);
Expand All @@ -1009,7 +994,7 @@ int AicpuExecutor::run(Runtime* runtime) {

// Flush performance buffers for cores managed by this thread
if (runtime->enable_profiling) {
perf_aicpu_flush_buffers(runtime, thread_idx, cur_thread_cores, thread_cores_num_);
perf_aicpu_flush_buffers(runtime, thread_idx, cur_thread_cores, thread_cores_num_[thread_idx]);
}

LOG_INFO("Thread %d: Completed", thread_idx);
Expand Down
70 changes: 37 additions & 33 deletions src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -638,13 +638,13 @@ int32_t AicpuExecutor::handshake_all_cores(Runtime* runtime) {
* (Aligned with host_build_graph mechanism)
*/
void AicpuExecutor::assign_cores_to_threads() {
// Cluster-aligned assignment: each cluster = 1 AIC + 2 AIV (adjacent pair)
// Cluster-aligned round-robin assignment: cluster ci -> sched thread ci % divisor.
// Each cluster = 1 AIC + 2 adjacent AIV; the triple is always kept together.
int32_t divisor = (sched_thread_num_ > 0) ? sched_thread_num_ : thread_num_;
int32_t cluster_count = aic_count_;
int32_t clusters_per_thread = cluster_count / divisor;

DEV_INFO("Assigning cores: %d clusters, %d per thread (%d AIC, %d AIV)",
cluster_count, clusters_per_thread, aic_count_, aiv_count_);
DEV_INFO("Assigning cores (round-robin): %d clusters across %d sched threads (%d AIC, %d AIV)",
cluster_count, divisor, aic_count_, aiv_count_);

for (int32_t i = 0; i < thread_num_; i++) {
for (int32_t j = 0; j < MAX_CORES_PER_THREAD; j++) {
Expand All @@ -656,47 +656,51 @@ void AicpuExecutor::assign_cores_to_threads() {
trackers_[i].aiv().idle_count = 0;
trackers_[i].cluster_count = 0;
memset(trackers_[i].core_idle, 0, sizeof(trackers_[i].core_idle));
core_count_per_thread_[i] = 0;
}

for (int32_t t = 0; t < thread_num_; t++) {
if (sched_thread_num_ > 0 && t >= sched_thread_num_) {
// Orchestrator thread: no cores
core_count_per_thread_[t] = 0;
DEV_INFO("Thread %d: orchestrator (0 cores)", t);
continue;
}
// Mark orchestrator threads explicitly (no cores).
for (int32_t t = divisor; t < thread_num_; t++) {
DEV_INFO("Thread %d: orchestrator (0 cores)", t);
}

// Per-sched-thread running core index used while filling core_assignments_.
int32_t core_idx[MAX_AICPU_THREADS] = {};

int32_t core_idx = 0;
for (int32_t ci = 0; ci < cluster_count; ci++) {
int32_t t = ci % divisor;
CoreStateTracker& tracker = trackers_[t];
int32_t& idx = core_idx[t];

for (int32_t c = 0; c < clusters_per_thread; c++) {
int32_t ci = t * clusters_per_thread + c;
int32_t aic_wid = aic_cores_[ci].worker_id;
int32_t aiv0_wid = aiv_cores_[2 * ci].worker_id;
int32_t aiv1_wid = aiv_cores_[2 * ci + 1].worker_id;
int32_t aic_wid = aic_cores_[ci].worker_id;
int32_t aiv0_wid = aiv_cores_[2 * ci].worker_id;
int32_t aiv1_wid = aiv_cores_[2 * ci + 1].worker_id;

tracker.clusters[tracker.cluster_count++] = {aic_wid, {aiv0_wid, aiv1_wid}};
tracker.clusters[tracker.cluster_count++] = {aic_wid, {aiv0_wid, aiv1_wid}};

core_assignments_[t][core_idx++] = aic_wid;
tracker.aic().idle[tracker.aic().idle_count++] = aic_wid;
tracker.core_idle[aic_wid] = true;
core_assignments_[t][idx++] = aic_wid;
tracker.aic().idle[tracker.aic().idle_count++] = aic_wid;
tracker.core_idle[aic_wid] = true;

core_assignments_[t][core_idx++] = aiv0_wid;
core_assignments_[t][core_idx++] = aiv1_wid;
tracker.aiv().idle[tracker.aiv().idle_count++] = aiv0_wid;
tracker.aiv().idle[tracker.aiv().idle_count++] = aiv1_wid;
tracker.core_idle[aiv0_wid] = true;
tracker.core_idle[aiv1_wid] = true;
core_assignments_[t][idx++] = aiv0_wid;
core_assignments_[t][idx++] = aiv1_wid;
tracker.aiv().idle[tracker.aiv().idle_count++] = aiv0_wid;
tracker.aiv().idle[tracker.aiv().idle_count++] = aiv1_wid;
tracker.core_idle[aiv0_wid] = true;
tracker.core_idle[aiv1_wid] = true;

DEV_INFO("Thread %d: cluster %d (AIC=%d, AIV0=%d, AIV1=%d)",
t, ci, aic_wid, aiv0_wid, aiv1_wid);
}
DEV_INFO("Thread %d: cluster %d (AIC=%d, AIV0=%d, AIV1=%d)",
t, ci, aic_wid, aiv0_wid, aiv1_wid);
}

core_count_per_thread_[t] = core_idx;
DEV_INFO("Thread %d: total %d cores (%d clusters)", t, core_idx, clusters_per_thread);
for (int32_t t = 0; t < divisor; t++) {
core_count_per_thread_[t] = core_idx[t];
DEV_INFO("Thread %d: total %d cores (%d clusters)", t, core_idx[t], trackers_[t].cluster_count);
}

thread_cores_num_ = clusters_per_thread * 3;
// Max clusters any single sched thread can hold: ceil(cluster_count / divisor).
int32_t max_clusters_per_thread = (cluster_count + divisor - 1) / divisor;
thread_cores_num_ = max_clusters_per_thread * 3;
}

/**
Expand Down
Loading