fix: 修复PythonGenerateNode节点和执行PythonExecuteNode 取SQL_RESULT_LIST_MEMORY(修复仅取最新一个sql的执行结果)#336
fix: 修复PythonGenerateNode节点和执行PythonExecuteNode 取SQL_RESULT_LIST_MEMORY(修复仅取最新一个sql的执行结果)#336lijialun-leo wants to merge 4 commits intospring-ai-alibaba:mainfrom
Conversation
| } | ||
|
|
||
| // 从executionResults中获取sql结果的样例 | ||
| private List<List<Map<String, String>>> convertExecutionResultsToList(HashMap<String, String> executionResults) { |
There was a problem hiding this comment.
可以我抽取到PlanProcessUtil 这个类中?
There was a problem hiding this comment.
可以我抽取到PlanProcessUtil 这个类中?
都可以,你觉得合理就OK的
|
|
||
| public static List<List<Map<String, String>>> convertExecutionResultsToList( | ||
| HashMap<String, String> executionResults, Integer sampleDataNumber) { | ||
| List<List<Map<String, String>>> convertedResults = new ArrayList<>(); |
There was a problem hiding this comment.
核心原因是SQL_EXECUTE_NODE_OUTPUT 结果复杂,该对象在几个地方已经使用。
There was a problem hiding this comment.
这个建议定义一个实体类,比如这样:
record SqlResultRow(Map<String, ?> data ) {}
record SqlResult(List<SqlResultRow> rows) {}这样这里就可以返回List<SqlResult>。或者说,对于一次数据查询结果的若干个Map,key都是一致的,可以定义一个实体类
record SqlResult(Map<String, List<?>> data) {}你可以单独写一个工具类,用来将SQL_EXECUTE_NODE_OUTPUT的结果转换为这样的值。如果你不方便这样修改的话可以在此处代码上标注todo,然后开一个issue说明
| ? StateUtil.getListValue(state, SQL_RESULT_LIST_MEMORY) : new ArrayList<>(); | ||
| HashMap<String, String> executionResults = StateUtil.getObjectValue(state, SQL_EXECUTE_NODE_OUTPUT, | ||
| HashMap.class, new HashMap<>()); | ||
| List<List<Map<String, String>>> sqlResults = PlanProcessUtil.convertExecutionResultsToList(executionResults, |
There was a problem hiding this comment.
Pull request overview
This PR fixes an issue where PythonGenerateNode and PythonExecuteNode were only retrieving the most recent SQL execution result instead of all step results from a multi-step SQL plan.
Changes:
- Modified both Python nodes to retrieve all SQL execution results from SQL_EXECUTE_NODE_OUTPUT (a HashMap) instead of SQL_RESULT_LIST_MEMORY (single result)
- Added a new utility method
convertExecutionResultsToListin PlanProcessUtil to convert execution results HashMap to a list of lists structure - Updated the Python code generation prompt template to handle multi-step SQL results as a List[List[Dict]] input format
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| python-generator.txt | Updated prompt to specify input format as List[List[Dict]] for multi-step results and provided example code templates for processing multiple steps |
| PythonGenerateNode.java | Changed to retrieve all SQL results from SQL_EXECUTE_NODE_OUTPUT and convert to limited sample data using new utility method |
| PythonExecuteNode.java | Changed to retrieve all SQL results from SQL_EXECUTE_NODE_OUTPUT and convert to full list using new utility method |
| PlanProcessUtil.java | Added convertExecutionResultsToList method to parse JSON results from HashMap into structured list format |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| result = \{ | ||
| "summary": \{\}, | ||
| "details": [] | ||
| \} | ||
|
|
||
| # 示例逻辑:计算每列的平均值(可根据需求调整) | ||
| for column in df.columns: | ||
| if pd.api.types.is_numeric_dtype(df[column]): | ||
| result["summary"][column] = \{ | ||
| "mean": df[column].mean(), | ||
| "min": df[column].min(), | ||
| "max": df[column].max() | ||
| \} | ||
|
|
||
| # 示例逻辑:将原始数据分组并统计(可根据需求调整) | ||
| grouped = df.groupby(list(df.columns[:2])).size().reset_index(name="count") | ||
| result["details"] = grouped.to_dict(orient="records") | ||
| # 处理每个步骤的数据 | ||
| for step_index, step_data in enumerate(input_data): | ||
| if not step_data: | ||
| continue | ||
|
|
||
| step_df = pd.DataFrame(step_data) | ||
| # 自动类型推断:将字符串形式的数值转换为实际数值类型(解决后端统一返回字符串的问题) | ||
| for col in step_df.columns: | ||
| step_df[col] = pd.to_numeric(step_df[col], errors='ignore') | ||
|
|
||
| step_result = \{ | ||
| "step_number": step_index + 1, | ||
| "statistics": \{\} | ||
| \} | ||
|
|
||
| # 计算数值字段的统计指标 | ||
| for col in step_df.columns: | ||
| if pd.api.types.is_numeric_dtype(step_df[col]): | ||
| step_result["statistics"][col] = \{ | ||
| "mean": step_df[col].mean(), | ||
| "min": step_df[col].min(), | ||
| "max": step_df[col].max(), | ||
| "std": step_df[col].std() | ||
| \} | ||
|
|
||
| # 计算分类字段的频率分布 | ||
| for col in step_df.columns: | ||
| if not pd.api.types.is_numeric_dtype(step_df[col]): | ||
| frequency = step_df[col].value_counts().to_dict() | ||
| step_result["statistics"][col] = \{ | ||
| "frequency": frequency | ||
| \} | ||
|
|
||
| result["step_analyses"].append(step_result) |
There was a problem hiding this comment.
In the example code template, the variable "step_analyses" is referenced but it was never initialized in the result dictionary. The result dict only has "summary" and "details" fields, but the code attempts to append to "result['step_analyses']" which will cause a KeyError at runtime.
| for (int i = 1; i <= executionResults.size(); i++) { | ||
| String stepKey = "step_" + i; | ||
| String jsonResult = executionResults.get(stepKey); |
There was a problem hiding this comment.
The loop assumes that step keys are numbered consecutively starting from 1, using executionResults.size() as the upper bound. However, if there are gaps in the step numbering (e.g., step_1, step_3, step_5), this will miss some steps and might attempt to access non-existent keys. The loop should iterate over the actual keys in the HashMap rather than assuming consecutive numbering.
| else { | ||
| convertedResults.add(stepData); | ||
| } | ||
| } |
There was a problem hiding this comment.
When a step's data is successfully parsed but stepData is null, it is silently skipped without being added to the result list. This creates inconsistency in the output where step indices in the returned list may not match the actual step numbers. Consider adding an empty list for steps with null data to maintain positional consistency, or document this behavior clearly.
| } | |
| } | |
| else { | |
| // Maintain positional consistency: add an empty list when step data is null | |
| convertedResults.add(new ArrayList<>()); | |
| } |
| converter = new BeanOutputConverter<>(new ParameterizedTypeReference<>() { | ||
| }); | ||
|
|
||
| objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); |
| } | ||
| // 按照步骤顺序处理结果 | ||
| for (int i = 1; i <= executionResults.size(); i++) { | ||
| String stepKey = "step_" + i; |
There was a problem hiding this comment.
这里获得之前结果的方法可以写在PlanProcessUtil工具类里,统一让key的拼接方式都在工具类里
|
|
||
| public static List<List<Map<String, String>>> convertExecutionResultsToList( | ||
| HashMap<String, String> executionResults, Integer sampleDataNumber) { | ||
| List<List<Map<String, String>>> convertedResults = new ArrayList<>(); |
There was a problem hiding this comment.
这个建议定义一个实体类,比如这样:
record SqlResultRow(Map<String, ?> data ) {}
record SqlResult(List<SqlResultRow> rows) {}这样这里就可以返回List<SqlResult>。或者说,对于一次数据查询结果的若干个Map,key都是一致的,可以定义一个实体类
record SqlResult(Map<String, List<?>> data) {}你可以单独写一个工具类,用来将SQL_EXECUTE_NODE_OUTPUT的结果转换为这样的值。如果你不方便这样修改的话可以在此处代码上标注todo,然后开一个issue说明
Describe what this PR does / why we need it
plan多步骤sql计划时,后续的PythonGenerateNode节点和执行PythonExecuteNode节点 带入的是SQL_RESULT_LIST_MEMORY,应该取SQL_EXECUTE_NODE_OUTPUT中的所以步骤的sql结果和样例数据。
Does this pull request fix one issue?
Describe how you did it
修改PythonGenerateNode节点和执行PythonExecuteNode 取SQL_EXECUTE_NODE_OUTPUT中的所以步骤的sql结果和样例数据
同时修改prompt 读取多步骤的结果进行分析
Describe how to verify it
Special notes for reviews