Support for pending and running task states#297
Support for pending and running task states#297weiyanggongxia wants to merge 1 commit intohw-native-sys:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the AI CPU scheduling mechanism by introducing explicit 'pending' and 'running' states for tasks. This change allows the AI Core to accept new tasks even when it's already executing another, thereby optimizing resource utilization and reducing scheduling overhead. The core state tracking and task assignment/completion logic have been updated to support this new two-stage task lifecycle. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
The pull request refactors the AICPU executor's core state tracking and task completion logic by introducing new pending_id and running_id states for cores, replacing the previous core_idle and executing_task_ids arrays. This involves updating CoreStateTracker with new members and helper functions, refactoring find_cluster_for_shape to prioritize 'run ready' cores, and significantly overhauling check_running_cores_for_completion into a new finalize_core_task_execution function to handle task state transitions. Initialization and dispatch logic across assign_cores_to_threads, reassign_cores_for_all_threads, and resolve_and_dispatch_pto2 are updated to align with the new tracking mechanism. Review comments highlight a critical logic bug in the core idle check loop, an inefficient pass-by-value for CoreStateTracker, unused members and parameters, significant code duplication in task finalization, and complex/redundant AIV core selection logic, all of which require further refinement.
| for (int i = 0; i < tracker.cluster_count; i++) { | ||
| int32_t core_ids[3]; | ||
| core_ids[0] = tracker.clusters[i].aic_core_id; | ||
| core_ids[1] = tracker.clusters[i].aiv_core_ids[0]; | ||
| core_ids[2] = tracker.clusters[i].aiv_core_ids[1]; | ||
| for (int k = 0; k < 3; k++) { | ||
| if (tracker.pending_id[core_ids[k]] != AICPU_TASK_INVALID) { | ||
| aicore_end = false; | ||
| break; | ||
| } | ||
| if (tracker.running_id[core_ids[k]] != AICPU_TASK_INVALID) { | ||
| aicore_end = false; | ||
| break; | ||
| } | ||
| if (!aicore_end) { | ||
| break; | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
There is a critical logic bug in this loop that checks if all cores are idle. The break statement inside the inner loop only exits the inner loop. The outer loop continues to iterate even after a busy core has been found (aicore_end is set to false). This will cause unnecessary work and could lead to incorrect behavior if the final value of aicore_end is relied upon for scheduler termination.
Additionally, the check if (!aicore_end) at line 1176 is unreachable and should be removed. The logic can be simplified and corrected by adding a break for the outer loop.
for (int i = 0; i < tracker.cluster_count; i++) {
int32_t core_ids[3];
core_ids[0] = tracker.clusters[i].aic_core_id;
core_ids[1] = tracker.clusters[i].aiv_core_ids[0];
core_ids[2] = tracker.clusters[i].aiv_core_ids[1];
for (int k = 0; k < 3; k++) {
if (tracker.pending_id[core_ids[k]] != AICPU_TASK_INVALID ||
tracker.running_id[core_ids[k]] != AICPU_TASK_INVALID) {
aicore_end = false;
break;
}
}
if (!aicore_end) {
break;
}
}| } | ||
| for (int32_t j = 0; j < trackers_[i].aiv().idle_count; j++) { | ||
| was_idle[trackers_[i].aiv().idle[j]] = true; | ||
| auto tracker = trackers_[i]; |
| int32_t running_id[MAX_CORES_PER_THREAD]; | ||
| int32_t core_pend_ready_cnt[AICORE_TYPE_NUM]; | ||
| int32_t core_run_ready_cnt[AICORE_TYPE_NUM]; | ||
| uint32_t run_ready_core_idx[AICORE_TYPE_NUM][MAX_AIV_PER_THREAD]; |
| void check_running_cores_for_completion(int32_t thread_idx, | ||
| CoreTypeTracker& ct, | ||
| Handshake* hank, | ||
| int32_t* executing_task_ids, | ||
| int32_t& completed_this_turn, | ||
| int32_t& cur_thread_completed, | ||
| bool& made_progress, | ||
| PTO2TaskSlotState* deferred_release_slot_states[], | ||
| int32_t& deferred_release_count, | ||
| PTO2LocalReadyBuffer* local_bufs | ||
| #if PTO2_PROFILING | ||
| , | ||
| bool profiling_enabled, | ||
| uint64_t& complete_probe_count, | ||
| uint64_t& complete_hit_count, | ||
| uint32_t& phase_complete_count, | ||
| uint64_t& notify_edges_total, | ||
| int32_t& notify_max_degree, | ||
| uint64_t& notify_tasks_enqueued, | ||
| uint64_t& fanin_edges_total, | ||
| int32_t& fanin_max_degree | ||
| #endif | ||
| #if PTO2_SCHED_PROFILING | ||
| , | ||
| uint64_t& sched_complete_perf_cycle | ||
| #endif | ||
| ) { |
There was a problem hiding this comment.
The function check_running_cores_for_completion has several unused parameters in its signature, including ct, executing_task_ids, complete_probe_count, and complete_hit_count. These appear to be leftovers from a refactoring where their logic was moved to finalize_core_task_execution.
These unused parameters should be removed from the function signature and its call sites to improve code clarity and maintainability.
| if (pending_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) { | ||
| // pending task finish | ||
| // running task finish if it exists | ||
| int32_t running_id_val = running_id_ref; | ||
| int32_t pending_id_val = pending_id_ref; | ||
| pending_id_ref = AICPU_TASK_INVALID; | ||
| running_id_ref = AICPU_TASK_INVALID; | ||
| if (running_id_val != AICPU_TASK_INVALID) { | ||
| // running task fishish | ||
| finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn, | ||
| cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, | ||
| local_bufs, running_id_val, core_id, profiling_enabled, | ||
| phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, | ||
| fanin_edges_total, fanin_max_degree); | ||
| } | ||
| // pending task finish | ||
| finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn, | ||
| cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, | ||
| local_bufs, pending_id_val, core_id, profiling_enabled, | ||
| phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, | ||
| fanin_edges_total, fanin_max_degree); | ||
| LOG_INFO("Thread %d: Core %d completed task %d (runnung_id = %d)", | ||
| thread_idx, core_id, pending_id_val, running_id_val); | ||
| } else if (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE) { | ||
| int32_t running_id_value_ack = running_id_ref; | ||
| running_id_ref = reg_task_id; | ||
| pending_id_ref = AICPU_TASK_INVALID; | ||
| if (running_id_value_ack != AICPU_TASK_INVALID) { | ||
| // running task finish | ||
| finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn, | ||
| cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, | ||
| local_bufs, running_id_value_ack, core_id, profiling_enabled, | ||
| phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, | ||
| fanin_edges_total, fanin_max_degree); | ||
| } | ||
| LOG_INFO("Thread %d: Core %d completed task %d (pending_id_ref == reg_task_id && reg_state == TASK_ACK_STATE)", | ||
| thread_idx, core_id, reg_task_id); | ||
| } else if (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE) { | ||
| // running task finish | ||
| int32_t running_id_val = running_id_ref; | ||
| running_id_ref = AICPU_TASK_INVALID; | ||
| finalize_core_task_execution<CT>(thread_idx, hank, completed_this_turn, | ||
| cur_thread_completed, made_progress, deferred_release_slot_states, deferred_release_count, | ||
| local_bufs, running_id_val, core_id, profiling_enabled, | ||
| phase_complete_count, notify_edges_total, notify_max_degree, notify_tasks_enqueued, | ||
| fanin_edges_total, fanin_max_degree); | ||
| LOG_INFO("Thread %d: Core %d completed task %d (running_id_ref == reg_task_id && reg_state == TASK_FIN_STATE)", | ||
| thread_idx, core_id, running_id_val); | ||
| } |
There was a problem hiding this comment.
There is significant code duplication within the if-else if chain. The logic for finalizing a running task by calling finalize_core_task_execution is repeated in multiple branches. This makes the code harder to maintain and increases the risk of introducing inconsistencies if one branch is updated but others are not.
Consider refactoring this logic to avoid repetition. You could, for example, determine which tasks have finished (both running and pending) within the conditional blocks, and then call finalize_core_task_execution once for each completed task after the if-else if chain.
| // CoreTypeTracker& ct = tracker.by_type[static_cast<int32_t>(core_type)]; | ||
| // int32_t idle_idx = ct.find_idle_index(core_id); | ||
| // ct.move_idle_to_running(idle_idx); | ||
| tracker.pending_id[core_id] = task.mixed_task_id; | ||
| // tracker.core_idle[core_id] = false; | ||
| // executing_task_ids[core_id] = task.mixed_task_id; |
| int32_t aiv0 = c.aiv_core_ids[0]; | ||
| if (tracker.is_core_run_ready(c.aiv_core_ids[0]) || tracker.is_core_run_ready(c.aiv_core_ids[1])) { | ||
| aiv0 = tracker.is_core_run_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; | ||
| }else if (tracker.is_core_pend_ready(c.aiv_core_ids[0]) || tracker.is_core_pend_ready(c.aiv_core_ids[1])) { | ||
| aiv0 = tracker.is_core_pend_ready(c.aiv_core_ids[0]) ? c.aiv_core_ids[0] : c.aiv_core_ids[1]; | ||
| } |
There was a problem hiding this comment.
The logic for selecting an available AIV core is complex and contains redundant checks. For example, tracker.is_core_run_ready(c.aiv_core_ids[0]) is evaluated multiple times within the nested conditional. This logic is also duplicated in another part of the file (lines 1410-1415).
This can be refactored into a clearer and more efficient if-else if chain to avoid re-evaluating the same conditions.
int32_t aiv0;
if (tracker.is_core_run_ready(c.aiv_core_ids[0])) {
aiv0 = c.aiv_core_ids[0];
} else if (tracker.is_core_run_ready(c.aiv_core_ids[1])) {
aiv0 = c.aiv_core_ids[1];
} else if (tracker.is_core_pend_ready(c.aiv_core_ids[0])) {
aiv0 = c.aiv_core_ids[0];
} else {
aiv0 = c.aiv_core_ids[1];
}7a79e21 to
1b3ef9f
Compare
In the AI CPU scheduling for a2 and a3, two new states (pending and running) are added:
This implementation enables the AI Core to receive a new task while it is executing an existing one, thereby reducing scheduling overhead.