Skip to content

Support for pending and running task states#297

Draft
weiyanggongxia wants to merge 1 commit intohw-native-sys:mainfrom
weiyanggongxia:main
Draft

Support for pending and running task states#297
weiyanggongxia wants to merge 1 commit intohw-native-sys:mainfrom
weiyanggongxia:main

Conversation

@weiyanggongxia
Copy link

In the AI CPU scheduling for a2 and a3, two new states (pending and running) are added:

  1. The "pending" state indicates that the AI Core has received the task and returned an ACK flag.
  2. The "running" state indicates that the AI Core has completed the task execution and returned a task completion flag.
    This implementation enables the AI Core to receive a new task while it is executing an existing one, thereby reducing scheduling overhead.

@gemini-code-assist
Copy link

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the AI CPU scheduling mechanism by introducing explicit 'pending' and 'running' states for tasks. This change allows the AI Core to accept new tasks even when it's already executing another, thereby optimizing resource utilization and reducing scheduling overhead. The core state tracking and task assignment/completion logic have been updated to support this new two-stage task lifecycle.

Highlights

  • New Task States: Introduced 'pending' and 'running' states for AI CPU tasks to allow for more efficient scheduling. A 'pending' task has been acknowledged by the AI Core, while a 'running' task is actively being executed.
  • Core State Management: The CoreStateTracker struct was updated to include pending_id and running_id arrays for each core, along with counters for pending and running ready cores, replacing the previous core_idle and executing_task_ids flags for more granular state tracking.
  • Task Assignment Logic: Modified the find_cluster_for_shape method to prioritize assigning tasks to 'run ready' cores (completely idle) before considering 'pend ready' cores (cores with no currently running task but potentially a pending one).
  • Task Completion Flow: Refactored the task completion logic into a new finalize_core_task_execution template method. The check_running_cores_for_completion method now iterates through cores and handles state transitions based on task ACK and FIN states, potentially completing both running and pending tasks sequentially on a single core.
  • Core Reassignment: Updated the reassign_cores_for_all_threads function to correctly preserve and restore the new pending_id and running_id states of tasks across core reassignments.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
    • Added AICORE_TYPE_NUM constant.
    • Introduced pending_id, running_id, core_pend_ready_cnt, core_run_ready_cnt, and run_ready_core_idx members to CoreStateTracker.
    • Added is_core_pend_ready and is_core_run_ready helper methods to CoreStateTracker.
    • Modified find_cluster_for_shape to first search for 'run ready' cores, then 'pend ready' cores.
    • Refactored task completion logic into a new finalize_core_task_execution template method.
    • Updated check_running_cores_for_completion to use the new pending/running states and call finalize_core_task_execution.
    • Removed executing_task_ids parameter from dispatch_subtask_to_core and updated core state assignment to use pending_id.
    • Initialized pending_id and running_id arrays to AICPU_TASK_INVALID in assign_cores_to_threads.
    • Modified reassign_cores_for_all_threads to collect and restore pending_task_ids and running_task_ids.
    • Updated resolve_and_dispatch_pto2 to check for pending_id and running_id to determine core availability and dispatch tasks using the new is_core_run_ready and is_core_pend_ready logic.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request refactors the AICPU executor's core state tracking and task completion logic by introducing new pending_id and running_id states for cores, replacing the previous core_idle and executing_task_ids arrays. This involves updating CoreStateTracker with new members and helper functions, refactoring find_cluster_for_shape to prioritize 'run ready' cores, and significantly overhauling check_running_cores_for_completion into a new finalize_core_task_execution function to handle task state transitions. Initialization and dispatch logic across assign_cores_to_threads, reassign_cores_for_all_threads, and resolve_and_dispatch_pto2 are updated to align with the new tracking mechanism. Review comments highlight a critical logic bug in the core idle check loop, an inefficient pass-by-value for CoreStateTracker, unused members and parameters, significant code duplication in task finalization, and complex/redundant AIV core selection logic, all of which require further refinement.

Comment on lines +1162 to +1180
for (int i = 0; i < tracker.cluster_count; i++) {
int32_t core_ids[3];
core_ids[0] = tracker.clusters[i].aic_core_id;
core_ids[1] = tracker.clusters[i].aiv_core_ids[0];
core_ids[2] = tracker.clusters[i].aiv_core_ids[1];
for (int k = 0; k < 3; k++) {
if (tracker.pending_id[core_ids[k]] != AICPU_TASK_INVALID) {
aicore_end = false;
break;
}
if (tracker.running_id[core_ids[k]] != AICPU_TASK_INVALID) {
aicore_end = false;
break;
}
if (!aicore_end) {
break;
}
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a critical logic bug in this loop that checks if all cores are idle. The break statement inside the inner loop only exits the inner loop. The outer loop continues to iterate even after a busy core has been found (aicore_end is set to false). This will cause unnecessary work and could lead to incorrect behavior if the final value of aicore_end is relied upon for scheduler termination.

Additionally, the check if (!aicore_end) at line 1176 is unreachable and should be removed. The logic can be simplified and corrected by adding a break for the outer loop.

        for (int i = 0; i < tracker.cluster_count; i++) {
            int32_t core_ids[3];
            core_ids[0] = tracker.clusters[i].aic_core_id;
            core_ids[1] = tracker.clusters[i].aiv_core_ids[0];
            core_ids[2] = tracker.clusters[i].aiv_core_ids[1];
            for (int k = 0; k < 3; k++) {
                if (tracker.pending_id[core_ids[k]] != AICPU_TASK_INVALID ||
                    tracker.running_id[core_ids[k]] != AICPU_TASK_INVALID) {
                    aicore_end = false;
                    break;
                }
            }
            if (!aicore_end) {
                break;
            }
        }

}
for (int32_t j = 0; j < trackers_[i].aiv().idle_count; j++) {
was_idle[trackers_[i].aiv().idle[j]] = true;
auto tracker = trackers_[i];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The CoreStateTracker struct is copied by value inside this loop. This is a large struct, and copying it in every iteration is inefficient and can impact performance. It should be taken by reference instead.

        auto& tracker = trackers_[i];

int32_t running_id[MAX_CORES_PER_THREAD];
int32_t core_pend_ready_cnt[AICORE_TYPE_NUM];
int32_t core_run_ready_cnt[AICORE_TYPE_NUM];
uint32_t run_ready_core_idx[AICORE_TYPE_NUM][MAX_AIV_PER_THREAD];

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The member run_ready_core_idx is added to CoreStateTracker but it appears to be unused throughout the file. This seems to be dead code and should be removed to keep the struct clean.

Comment on lines +437 to +463
void check_running_cores_for_completion(int32_t thread_idx,
CoreTypeTracker& ct,
Handshake* hank,
int32_t* executing_task_ids,
int32_t& completed_this_turn,
int32_t& cur_thread_completed,
bool& made_progress,
PTO2TaskSlotState* deferred_release_slot_states[],
int32_t& deferred_release_count,
PTO2LocalReadyBuffer* local_bufs
#if PTO2_PROFILING
,
bool profiling_enabled,
uint64_t& complete_probe_count,
uint64_t& complete_hit_count,
uint32_t& phase_complete_count,
uint64_t& notify_edges_total,
int32_t& notify_max_degree,
uint64_t& notify_tasks_enqueued,
uint64_t& fanin_edges_total,
int32_t& fanin_max_degree
#endif
#if PTO2_SCHED_PROFILING
,
uint64_t& sched_complete_perf_cycle
#endif
) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function check_running_cores_for_completion has several unused parameters in its signature, including ct, executing_task_ids, complete_probe_count, and complete_hit_count. These appear to be leftovers from a refactoring where their logic was moved to finalize_core_task_execution.

These unused parameters should be removed from the function signature and its call sites to improve code clarity and maintainability.

Comment on lines +478 to 526
if (pending_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) {
// pending task finish
// running task finish if it exists
int32_t running_id_val = running_id_ref;
int32_t pending_id_val = pending_id_ref;
pending_id_ref = AICPU_TASK_INVALID;
running_id_ref = AICPU_TASK_INVALID;
if (running_id_val != AICPU_TASK_INVALID) {
// running task fishish
finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn,
cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count,
local_bufs, running_id_val, core_id, profiling_enabled,
phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued,
fanin_edges_total, fanin_max_degree);
}
// pending task finish
finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn,
cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count,
local_bufs, pending_id_val, core_id, profiling_enabled,
phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued,
fanin_edges_total, fanin_max_degree);
LOG_INFO("Thread %d: Core %d completed task %d (runnung_id = %d)",
thread_idx, core_id, pending_id_val, running_id_val);
} else if (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE) {
int32_t running_id_value_ack = running_id_ref;
running_id_ref = reg_task_id;
pending_id_ref = AICPU_TASK_INVALID;
if (running_id_value_ack != AICPU_TASK_INVALID) {
// running task finish
finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn,
cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count,
local_bufs, running_id_value_ack, core_id, profiling_enabled,
phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued,
fanin_edges_total, fanin_max_degree);
}
LOG_INFO("Thread %d: Core %d completed task %d (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE)",
thread_idx, core_id, reg_task_id);
} else if (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) {
// running task finish
int32_t running_id_val = running_id_ref;
running_id_ref = AICPU_TASK_INVALID;
finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn,
cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count,
local_bufs, running_id_val, core_id, profiling_enabled,
phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued,
fanin_edges_total, fanin_max_degree);
LOG_INFO("Thread %d: Core %d completed task %d (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE)",
thread_idx, core_id, running_id_val);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

There is significant code duplication within the if-else if chain. The logic for finalizing a running task by calling finalize_core_task_execution is repeated in multiple branches. This makes the code harder to maintain and increases the risk of introducing inconsistencies if one branch is updated but others are not.

Consider refactoring this logic to avoid repetition. You could, for example, determine which tasks have finished (both running and pending) within the conditional blocks, and then call finalize_core_task_execution once for each completed task after the if-else if chain.

Comment on lines +645 to +650
// CoreTypeTracker& ct = tracker.by_type[static_cast<int32_t>(core_type)];
// int32_t idle_idx = ct.find_idle_index(core_id);
// ct.move_idle_to_running(idle_idx);
tracker.pending_id[core_id] = task.mixed_task_id;
// tracker.core_idle[core_id] = false;
// executing_task_ids[core_id] = task.mixed_task_id;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This block contains commented-out code from a previous implementation. To improve code readability and maintainability, this dead code should be removed.

Comment on lines +1322 to +1327
int32_t aiv0 = c.aiv_core_ids[0];
if (tracker.is_core_run_ready(c.aiv_core_ids[0]) || tracker.is_core_run_ready(c.aiv_core_ids[1])) {
aiv0 = tracker.is_core_run_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1];
}else if (tracker.is_core_pend_ready(c.aiv_core_ids[0]) || tracker.is_core_pend_ready(c.aiv_core_ids[1])) {
aiv0 = tracker.is_core_pend_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1];
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for selecting an available AIV core is complex and contains redundant checks. For example, tracker.is_core_run_ready(c.aiv_core_ids[0]) is evaluated multiple times within the nested conditional. This logic is also duplicated in another part of the file (lines 1410-1415).

This can be refactored into a clearer and more efficient if-else if chain to avoid re-evaluating the same conditions.

                        int32_t aiv0;
                        if (tracker.is_core_run_ready(c.aiv_core_ids[0])) {
                            aiv0 = c.aiv_core_ids[0];
                        } else if (tracker.is_core_run_ready(c.aiv_core_ids[1])) {
                            aiv0 = c.aiv_core_ids[1];
                        } else if (tracker.is_core_pend_ready(c.aiv_core_ids[0])) {
                            aiv0 = c.aiv_core_ids[0];
                        } else {
                            aiv0 = c.aiv_core_ids[1];
                        }

@weiyanggongxia weiyanggongxia force-pushed the main branch 17 times, most recently from 7a79e21 to 1b3ef9f Compare March 18, 2026 11:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant