From b53978791876484c9f8a5deea2b501f99236079b Mon Sep 17 00:00:00 2001 From: jerry609 <1772030600@qq.com> Date: Sun, 15 Mar 2026 13:12:20 +0800 Subject: [PATCH 1/4] refactor: align python code analysis contracts --- src/paperbot/agents/code_analysis/agent.py | 9 +- src/paperbot/agents/quality/agent.py | 314 +++++++---- src/paperbot/core/workflow_coordinator.py | 18 +- src/paperbot/utils/analyzer.py | 573 +++++++++++++++++++-- tests/test_code_analysis_fallback.py | 16 - tests/unit/test_code_analysis_contracts.py | 136 +++++ tests/unit/test_code_analyzer_contracts.py | 56 ++ tests/unit/test_workflow_coordinator.py | 93 ++++ 8 files changed, 1039 insertions(+), 176 deletions(-) delete mode 100644 tests/test_code_analysis_fallback.py create mode 100644 tests/unit/test_code_analysis_contracts.py create mode 100644 tests/unit/test_code_analyzer_contracts.py diff --git a/src/paperbot/agents/code_analysis/agent.py b/src/paperbot/agents/code_analysis/agent.py index 18471e20..bf43cd30 100644 --- a/src/paperbot/agents/code_analysis/agent.py +++ b/src/paperbot/agents/code_analysis/agent.py @@ -43,6 +43,8 @@ async def process(self, *args, **kwargs) -> Dict[str, Any]: # 如果是单仓库模式,尝试返回扁平化结果适配 Coordinator if "repo_url" in kwargs and result['analysis_results']: repo_result = result['analysis_results'][0] + if repo_result.get("placeholder"): + return repo_result return self._flatten_result(repo_result) return result @@ -64,7 +66,8 @@ def _flatten_result(self, repo_result: Dict[str, Any]) -> Dict[str, Any]: "stars": meta.get("stars"), "forks": meta.get("forks"), "language": structure.get('primary_language', 'Unknown'), - "updated_at": meta.get("last_commit_at"), + "updated_at": meta.get("updated_at") or meta.get("last_commit_at"), + "last_commit_date": meta.get("last_commit_date") or meta.get("last_commit_at"), "has_readme": structure.get('documentation', {}).get('has_readme', False), "reproducibility_score": quality.get('overall_score', 0) * 100, "quality_notes": str(quality.get('recommendations', [])), @@ -125,6 +128,7 @@ def _placeholder(self, repo_url: Optional[str], reason: str) -> Dict[str, Any]: "forks": None, "language": None, "updated_at": None, + "last_commit_date": None, "has_readme": False, "reproducibility_score": None, "quality_notes": f"Repository unavailable: {reason}", @@ -226,6 +230,7 @@ def _extract_repo_meta(self, repo_path: Path, repo_url: str) -> Dict[str, Any]: last_commit = next(repo.iter_commits(max_count=1), None) if last_commit: meta["last_commit_at"] = last_commit.committed_datetime.isoformat() + meta["last_commit_date"] = meta["last_commit_at"] except Exception as e: self.log_error(e, {"repo_meta": "commit_time"}) @@ -245,8 +250,8 @@ def _extract_repo_meta(self, repo_path: Path, repo_url: str) -> Dict[str, Any]: data = resp.json() meta["stars"] = data.get("stargazers_count") meta["forks"] = data.get("forks_count") + meta["updated_at"] = data.get("updated_at") except Exception as e: self.log_error(e, {"repo_meta": "github_api"}) return meta - diff --git a/src/paperbot/agents/quality/agent.py b/src/paperbot/agents/quality/agent.py index ac5abc1a..fe555ca4 100644 --- a/src/paperbot/agents/quality/agent.py +++ b/src/paperbot/agents/quality/agent.py @@ -5,7 +5,6 @@ from typing import Dict, List, Any, Optional from enum import Enum -from pathlib import Path from ..base import BaseAgent @@ -37,45 +36,50 @@ async def _execute(self, *args, **kwargs) -> Dict[str, Any]: async def process(self, *args, **kwargs) -> Dict[str, Any]: """处理代码质量评估""" - # 适配 Coordinator 调用:process(context) - analysis_results = None - if args and isinstance(args[0], dict): - context = args[0] - # 如果上下文中有 code_analysis 结果,使用它 - if "code_analysis" in context and isinstance(context["code_analysis"], dict): - # 如果是扁平化结果,没法做深入分析,只能返回空或模拟值 - if "analysis_results" not in context["code_analysis"]: - return self._process_flat_result(context) - analysis_results = context["code_analysis"] - elif "analysis_results" in context: - analysis_results = context - elif kwargs and "analysis_results" in kwargs: - analysis_results = kwargs["analysis_results"] - - if not analysis_results: - # 无法进行深入分析,返回基础结构 - return { - 'quality_scores': {}, - 'summary': "无法进行深入质量评估(缺少代码分析详情)", - 'overall_assessment': "暂无代码详情,无法评估。", - 'strengths': [], - 'weaknesses': [] - } + analysis_input = self._extract_analysis_input(*args, **kwargs) + if not analysis_input: + return { + 'quality_score': 0.0, + 'quality_scores': {}, + 'summary': "无法进行深入质量评估(缺少代码分析详情)", + 'overall_assessment': "暂无代码详情,无法评估。", + 'strengths': [], + 'weaknesses': [], + } + + if self._is_flat_result(analysis_input): + return self._process_flat_result(analysis_input) try: quality_scores = {} - for repo_result in analysis_results.get('analysis_results', []): - quality_scores[repo_result['repo_url']] = await self._evaluate_quality( - repo_result['analysis'] - ) + repo_entries = self._normalize_repo_entries(analysis_input) + for repo_result in repo_entries: + repo_url = repo_result.get('repo_url') or repo_result.get('repo_name') or 'unknown' + if repo_result.get('placeholder'): + quality_scores[repo_url] = self._placeholder_quality(repo_result) + continue + + analysis = repo_result.get('analysis', repo_result) + quality_scores[repo_url] = await self._evaluate_quality(analysis) + + overall_values = [ + score['overall_score'] + for score in quality_scores.values() + if isinstance(score, dict) + ] + overall_score = ( + sum(overall_values) / len(overall_values) + if overall_values + else 0.0 + ) return { + 'quality_score': overall_score, 'quality_scores': quality_scores, 'summary': await self._generate_quality_summary(quality_scores), - # 添加 Coordinator 需要的字段 'overall_assessment': await self._generate_quality_summary(quality_scores), - 'strengths': [], # TODO: 从 scores 中提取 - 'weaknesses': [] + 'strengths': self._collect_strengths(quality_scores), + 'weaknesses': self._collect_weaknesses(quality_scores), } except Exception as e: self.log_error(e) @@ -83,12 +87,32 @@ async def process(self, *args, **kwargs) -> Dict[str, Any]: def _process_flat_result(self, context: Dict[str, Any]) -> Dict[str, Any]: """处理扁平化的上下文(通常只有元数据)""" + raw_score = context.get('reproducibility_score') + quality_score = float(raw_score or 0.0) + if quality_score > 1.0: + quality_score /= 100.0 + + strengths = [] + if context.get('has_readme'): + strengths.append("仓库包含 README") + if raw_score is not None: + strengths.append(f"代码复现度信号: {float(raw_score):.0f}/100") + + weaknesses = [] + if context.get('placeholder'): + weaknesses.append(f"仓库分析不可用: {context.get('reason', 'unknown')}") + else: + weaknesses.append("当前仅有仓库元数据,缺少静态分析细节") + return { + 'quality_score': quality_score, 'quality_scores': {}, 'summary': "基于元数据的基础评估", - 'overall_assessment': "代码已公开,但未进行深度静态分析。", - 'strengths': ["代码开源"], - 'weaknesses': ["缺少深度质量指标"] + 'overall_assessment': ( + "代码仓库已发现,但当前链路只拿到了元数据,尚未建立完整静态分析画像。" + ), + 'strengths': strengths or ["代码开源"], + 'weaknesses': weaknesses, } async def _generate_quality_summary(self, quality_scores: Dict[str, Any]) -> str: @@ -129,21 +153,15 @@ async def _evaluate_quality(self, repo_analysis: Dict[str, Any]) -> Optional[Dic async def _analyze_complexity(self, analysis: Dict[str, Any]) -> float: """分析代码复杂度""" try: - complexity_metrics = analysis['structure_analysis'].get('complexity', {}) - - # 计算加权分数 - weights = { - 'cyclomatic_complexity': 0.4, - 'cognitive_complexity': 0.3, - 'nesting_depth': 0.3 - } - - score = sum( - weights[metric] * value - for metric, value in complexity_metrics.items() - ) - - return min(1.0, max(0.0, 1.0 - score / 10.0)) + quality_metrics = analysis.get('quality_analysis', {}) + if 'complexity_score' in quality_metrics: + return min(1.0, max(0.0, float(quality_metrics.get('complexity_score', 0.0)))) + + complexity_metrics = analysis.get('structure_analysis', {}).get('complexity', {}) + total_complexity = float(complexity_metrics.get('overall_complexity', 0.0) or 0.0) + file_count = max(1, len(complexity_metrics.get('file_complexity', {}))) + average_complexity = total_complexity / file_count + return min(1.0, max(0.0, 1.0 - (average_complexity / 20.0))) except Exception as e: self.log_error(e) return 0.0 @@ -151,20 +169,15 @@ async def _analyze_complexity(self, analysis: Dict[str, Any]) -> float: async def _analyze_maintainability(self, analysis: Dict[str, Any]) -> float: """分析可维护性""" try: - maintainability_metrics = analysis['quality_analysis'].get('maintainability', {}) - - factors = { - 'code_duplication': 0.3, - 'comment_ratio': 0.2, - 'function_length': 0.2, - 'naming_convention': 0.3 - } - - score = sum( - factors[metric] * value - for metric, value in maintainability_metrics.items() - ) - + quality_metrics = analysis.get('quality_analysis', {}) + if 'maintainability_score' in quality_metrics: + return min(1.0, max(0.0, float(quality_metrics.get('maintainability_score', 0.0)))) + + overall = float(quality_metrics.get('overall_score', 0.0) or 0.0) + documentation = float(quality_metrics.get('documentation_score', 0.0) or 0.0) + complexity = float(quality_metrics.get('complexity_score', 0.0) or 0.0) + has_readme = 1.0 if quality_metrics.get('has_readme') else 0.0 + score = (overall * 0.5) + (documentation * 0.2) + (complexity * 0.2) + (has_readme * 0.1) return min(1.0, max(0.0, score)) except Exception as e: self.log_error(e) @@ -173,20 +186,19 @@ async def _analyze_maintainability(self, analysis: Dict[str, Any]) -> float: async def _analyze_security(self, analysis: Dict[str, Any]) -> float: """分析安全性""" try: - security_metrics = analysis['security_analysis'] - - # 评估各个安全指标 - weights = { - 'vulnerability_count': 0.4, - 'security_best_practices': 0.3, - 'dependency_security': 0.3 - } - - score = sum( - weights[metric] * (1.0 - value / 10.0) - for metric, value in security_metrics.items() - ) - + security_metrics = analysis.get('security_analysis', {}) + vulnerabilities = security_metrics.get('vulnerabilities', []) or [] + dependency_security = security_metrics.get('dependency_security', {}) or {} + dependency_vulns = float(dependency_security.get('total_vulnerabilities', 0.0) or 0.0) + + vulnerability_score = max(0.0, 1.0 - ((len(vulnerabilities) + dependency_vulns) / 5.0)) + measures = security_metrics.get('security_measures', {}) or {} + coverage = [ + 1.0 if self._security_measure_present(value) else 0.0 + for value in measures.values() + ] + measures_score = sum(coverage) / len(coverage) if coverage else 0.0 + score = (vulnerability_score * 0.7) + (measures_score * 0.3) return min(1.0, max(0.0, score)) except Exception as e: self.log_error(e) @@ -195,19 +207,16 @@ async def _analyze_security(self, analysis: Dict[str, Any]) -> float: async def _analyze_documentation(self, analysis: Dict[str, Any]) -> float: """分析文档质量""" try: - doc_metrics = analysis['structure_analysis'].get('documentation', {}) - - weights = { - 'docstring_coverage': 0.4, - 'readme_quality': 0.3, - 'api_documentation': 0.3 - } - - score = sum( - weights[metric] * value - for metric, value in doc_metrics.items() + doc_metrics = analysis.get('structure_analysis', {}).get('documentation', {}) + docstring_coverage = float(doc_metrics.get('docstring_coverage', 0.0) or 0.0) + readme_quality = float(doc_metrics.get('readme_quality', 0.0) or 0.0) + api_docs = doc_metrics.get('api_documentation', {}) or {} + api_coverage = float(api_docs.get('coverage', 0.0) or 0.0) + score = ( + (docstring_coverage * 0.5) + + (readme_quality * 0.3) + + (api_coverage * 0.2) ) - return min(1.0, max(0.0, score)) except Exception as e: self.log_error(e) @@ -216,20 +225,36 @@ async def _analyze_documentation(self, analysis: Dict[str, Any]) -> float: async def _analyze_test_coverage(self, analysis: Dict[str, Any]) -> float: """分析测试覆盖率""" try: - test_metrics = analysis['quality_analysis'].get('testing', {}) - - weights = { - 'line_coverage': 0.4, - 'branch_coverage': 0.3, - 'test_quality': 0.3 - } - - score = sum( - weights[metric] * value - for metric, value in test_metrics.items() - ) - - return min(1.0, max(0.0, score)) + quality_metrics = analysis.get('quality_analysis', {}) + if 'test_coverage_score' in quality_metrics: + return min(1.0, max(0.0, float(quality_metrics.get('test_coverage_score', 0.0)))) + + files = analysis.get('structure_analysis', {}).get('files', {}) + file_paths = files.get('file_paths', []) + if not file_paths: + return 0.0 + + test_files = [ + path for path in file_paths + if path.startswith('tests/') + or '/tests/' in path + or path.endswith('_test.py') + or path.endswith('.spec.ts') + or path.endswith('.test.ts') + or path.endswith('.test.tsx') + or path.endswith('.spec.js') + or path.endswith('.test.js') + ] + code_files = [ + path for path in file_paths + if path.endswith(('.py', '.ts', '.tsx', '.js', '.jsx')) + and path not in test_files + ] + if not code_files: + return 0.0 + + ratio = len(test_files) / max(1, len(code_files)) + return min(1.0, max(0.0, ratio * 2.0)) except Exception as e: self.log_error(e) return 0.0 @@ -258,6 +283,86 @@ def _generate_recommendations(self, scores: Dict[str, float]) -> List[str]: return recommendations + def _extract_analysis_input(self, *args, **kwargs) -> Optional[Dict[str, Any]]: + if args and isinstance(args[0], dict): + context = args[0] + for key in ('code_analysis_result', 'code_analysis', 'analysis_results'): + value = context.get(key) + if isinstance(value, dict): + return value + if any(key in context for key in ('structure_analysis', 'repo_url', 'placeholder')): + return context + + for key in ('code_analysis_result', 'analysis_results'): + value = kwargs.get(key) + if isinstance(value, dict): + return value + + return None + + def _is_flat_result(self, analysis_input: Dict[str, Any]) -> bool: + if 'analysis_results' in analysis_input or 'structure_analysis' in analysis_input: + return False + return any( + key in analysis_input + for key in ('repo_url', 'repo_name', 'reproducibility_score', 'placeholder') + ) + + def _normalize_repo_entries(self, analysis_input: Dict[str, Any]) -> List[Dict[str, Any]]: + if 'analysis_results' in analysis_input: + entries = analysis_input.get('analysis_results') or [] + return [entry for entry in entries if isinstance(entry, dict)] + + if 'structure_analysis' in analysis_input: + return [{'repo_url': analysis_input.get('repo_url', 'unknown'), 'analysis': analysis_input}] + + return [] + + def _placeholder_quality(self, repo_result: Dict[str, Any]) -> Dict[str, Any]: + return { + 'scores': { + QualityMetric.CODE_COMPLEXITY.value: 0.0, + QualityMetric.MAINTAINABILITY.value: 0.0, + QualityMetric.SECURITY.value: 0.0, + QualityMetric.DOCUMENTATION.value: 0.0, + QualityMetric.TEST_COVERAGE.value: 0.0, + }, + 'overall_score': 0.0, + 'recommendations': [f"Repository unavailable: {repo_result.get('reason', 'unknown')}"], + 'status': "需要改进", + } + + def _collect_strengths(self, quality_scores: Dict[str, Any]) -> List[str]: + strengths: List[str] = [] + for score in quality_scores.values(): + if not isinstance(score, dict): + continue + metric_scores = score.get('scores', {}) + if metric_scores.get(QualityMetric.DOCUMENTATION.value, 0.0) >= 0.7: + strengths.append("文档完整度较好") + if metric_scores.get(QualityMetric.SECURITY.value, 0.0) >= 0.7: + strengths.append("安全基线较稳定") + if metric_scores.get(QualityMetric.TEST_COVERAGE.value, 0.0) >= 0.6: + strengths.append("测试信号较强") + + return list(dict.fromkeys(strengths)) + + def _collect_weaknesses(self, quality_scores: Dict[str, Any]) -> List[str]: + weaknesses: List[str] = [] + for score in quality_scores.values(): + if not isinstance(score, dict): + continue + weaknesses.extend(score.get('recommendations', [])) + return list(dict.fromkeys(weaknesses)) + + def _security_measure_present(self, value: Any) -> bool: + if isinstance(value, dict): + if value.get('present'): + return True + matches = value.get('matches') + return isinstance(matches, list) and bool(matches) + return bool(value) + def _determine_status(self, overall_score: float) -> str: """确定代码质量状态""" if overall_score >= 0.8: @@ -298,4 +403,3 @@ def _get_recommendation_for_metric(self, metric: str, score: float) -> str: } return recommendations.get(metric, "一般性改进建议") - diff --git a/src/paperbot/core/workflow_coordinator.py b/src/paperbot/core/workflow_coordinator.py index b9e5c03d..35004892 100644 --- a/src/paperbot/core/workflow_coordinator.py +++ b/src/paperbot/core/workflow_coordinator.py @@ -286,7 +286,12 @@ def _publish_code_score(self, ctx: PipelineContext) -> None: code_result = ctx.code_analysis_result has_code = bool(code_result) - health_score = code_result.get("health_score", 0.0) if code_result else 0.0 + raw_score = code_result.get("reproducibility_score") if code_result else None + if raw_score is None: + raw_score = code_result.get("health_score", 0.0) if code_result else 0.0 + health_score = float(raw_score or 0.0) + if health_score <= 1.0: + health_score *= 100.0 is_empty = code_result.get("is_empty_repo", False) if code_result else False score = create_code_score(has_code, health_score, is_empty) @@ -429,10 +434,15 @@ async def _run_influence_stage(self, ctx: PipelineContext) -> PipelineContext: from paperbot.domain.paper import CodeMeta code_meta = CodeMeta( repo_url=getattr(ctx.paper, 'github_url', '') or '', - stars=ctx.code_analysis_result.get('stars', 0), - forks=ctx.code_analysis_result.get('forks', 0), + stars=ctx.code_analysis_result.get('stars', 0) or 0, + forks=ctx.code_analysis_result.get('forks', 0) or 0, has_readme=ctx.code_analysis_result.get('has_readme', False), - last_commit_date=ctx.code_analysis_result.get('last_commit_date'), + updated_at=ctx.code_analysis_result.get('updated_at'), + last_commit_date=( + ctx.code_analysis_result.get('last_commit_date') + or ctx.code_analysis_result.get('updated_at') + ), + reproducibility_score=ctx.code_analysis_result.get('reproducibility_score'), ) except ImportError: pass diff --git a/src/paperbot/utils/analyzer.py b/src/paperbot/utils/analyzer.py index 907c8dc7..f4605551 100644 --- a/src/paperbot/utils/analyzer.py +++ b/src/paperbot/utils/analyzer.py @@ -1,14 +1,22 @@ # paperbot/utils/analyzer.py -from typing import Dict, List, Any, Optional -from pathlib import Path import ast +import json import re +import shutil import subprocess -from dataclasses import dataclass from concurrent.futures import ThreadPoolExecutor -import threading +from dataclasses import dataclass from datetime import datetime +from pathlib import Path +import threading +from typing import Any, Dict, List, Optional +import xml.etree.ElementTree as ET + +try: + import tomllib +except ModuleNotFoundError: # pragma: no cover + tomllib = None try: import radon.complexity as radon @@ -59,20 +67,25 @@ def __init__(self, config: Optional[Dict[str, Any]] = None): async def analyze_structure(self, repo_path: Path) -> Dict[str, Any]: """分析代码结构""" + structure = self._empty_structure() try: + files = self._analyze_files(repo_path) + documentation = self._analyze_documentation(repo_path) structure = { - 'files': self._analyze_files(repo_path), + 'files': files, 'dependencies': self._analyze_dependencies(repo_path), 'complexity': self._analyze_code_complexity(repo_path), - 'documentation': self._analyze_documentation(repo_path) + 'documentation': documentation, + 'primary_language': self._detect_primary_language(files.get('file_types', {})), } return structure except Exception as e: self.logger.error(f"Structure analysis failed: {str(e)}") - return {} + return structure async def analyze_security(self, repo_path: Path) -> Dict[str, Any]: """分析安全性""" + security_report = self._empty_security_report() try: security_report = { 'vulnerabilities': await self._find_vulnerabilities(repo_path), @@ -82,13 +95,14 @@ async def analyze_security(self, repo_path: Path) -> Dict[str, Any]: return security_report except Exception as e: self.logger.error(f"Security analysis failed: {str(e)}") - return {} + return security_report async def analyze_quality(self, repo_path: Path) -> Dict[str, Any]: """分析代码质量""" try: complexity = self._analyze_code_complexity(repo_path) documentation = self._analyze_documentation(repo_path) + files = self._analyze_files(repo_path) # 计算总体质量分数 overall_score = 0.0 @@ -96,10 +110,9 @@ async def analyze_quality(self, repo_path: Path) -> Dict[str, Any]: # 复杂度评分 (0-1) total_complexity = complexity.get('overall_complexity', 0) - if total_complexity > 0: - complexity_score = max(0, 1 - (total_complexity / 100)) - else: - complexity_score = 1.0 + file_count = max(1, len(complexity.get('file_complexity', {}))) + average_complexity = total_complexity / file_count + complexity_score = max(0.0, 1.0 - (average_complexity / 20.0)) overall_score += complexity_score * self.quality_weights['complexity'] if complexity_score < 0.5: @@ -113,15 +126,29 @@ async def analyze_quality(self, repo_path: Path) -> Dict[str, Any]: recommendations.append("Improve documentation coverage") # 检查是否有 README - has_readme = any( - 'readme' in f.get('path', '').lower() - for f in documentation.get('documentation_files', []) + has_readme = documentation.get('has_readme', False) + if not has_readme: + recommendations.append("Add a repository README") + + test_coverage_score = self._estimate_test_signal(files) + overall_score += test_coverage_score * self.quality_weights['test_coverage'] + if test_coverage_score < 0.4: + recommendations.append("Add or expand automated tests") + + maintainability_score = min( + 1.0, + (complexity_score * 0.6) + + (doc_coverage * 0.2) + + ((1.0 if has_readme else 0.0) * 0.2), ) + overall_score += maintainability_score * self.quality_weights['maintainability'] return { - 'overall_score': overall_score, + 'overall_score': min(1.0, overall_score), 'complexity_score': complexity_score, + 'maintainability_score': maintainability_score, 'documentation_score': doc_coverage, + 'test_coverage_score': test_coverage_score, 'has_readme': has_readme, 'recommendations': recommendations, 'complexity_metrics': complexity, @@ -129,7 +156,17 @@ async def analyze_quality(self, repo_path: Path) -> Dict[str, Any]: } except Exception as e: self.logger.error(f"Quality analysis failed: {str(e)}") - return {'overall_score': 0, 'recommendations': []} + return { + 'overall_score': 0.0, + 'complexity_score': 0.0, + 'maintainability_score': 0.0, + 'documentation_score': 0.0, + 'test_coverage_score': 0.0, + 'has_readme': False, + 'recommendations': [], + 'complexity_metrics': self._empty_complexity_metrics(), + 'documentation_metrics': self._empty_documentation_metrics(), + } async def analyze_dependencies(self, repo_path: Path) -> Dict[str, Any]: """分析项目依赖(异步版本)""" @@ -137,7 +174,7 @@ async def analyze_dependencies(self, repo_path: Path) -> Dict[str, Any]: return self._analyze_dependencies(repo_path) except Exception as e: self.logger.error(f"Dependency analysis failed: {str(e)}") - return {} + return self._empty_dependency_metrics() def _analyze_files(self, repo_path: Path) -> Dict[str, Any]: """分析文件结构""" @@ -145,7 +182,8 @@ def _analyze_files(self, repo_path: Path) -> Dict[str, Any]: 'total_files': 0, 'file_types': {}, 'size_distribution': {}, - 'file_structure': {} + 'file_structure': {}, + 'file_paths': [], } for file_path in repo_path.rglob('*'): @@ -165,17 +203,14 @@ def _analyze_files(self, repo_path: Path) -> Dict[str, Any]: # 构建文件结构树 relative_path = file_path.relative_to(repo_path) self._update_file_structure(file_stats['file_structure'], relative_path) + file_stats['file_paths'].append(str(relative_path)) + file_stats['file_paths'].sort() return file_stats def _analyze_dependencies(self, repo_path: Path) -> Dict[str, Any]: """分析项目依赖""" - dependencies = { - 'direct_dependencies': {}, - 'dev_dependencies': {}, - 'dependency_graph': {}, - 'outdated_dependencies': [] - } + dependencies = self._empty_dependency_metrics() # 检查各种依赖文件 dependency_files = { @@ -188,18 +223,14 @@ def _analyze_dependencies(self, repo_path: Path) -> Dict[str, Any]: for dep_file, parser in dependency_files.items(): dep_path = repo_path / dep_file if dep_path.exists(): - dependencies.update(parser(dep_path)) + parsed = parser(dep_path) + self._merge_dependency_metrics(dependencies, parsed) return dependencies def _analyze_code_complexity(self, repo_path: Path) -> Dict[str, Any]: """分析代码复杂度""" - complexity_metrics = { - 'overall_complexity': 0, - 'file_complexity': {}, - 'function_complexity': {}, - 'complexity_distribution': {} - } + complexity_metrics = self._empty_complexity_metrics() python_files = list(repo_path.rglob('*.py')) @@ -210,6 +241,7 @@ def _analyze_code_complexity(self, repo_path: Path) -> Dict[str, Any]: relative_path = str(file_path.relative_to(repo_path)) complexity_metrics['file_complexity'][relative_path] = result complexity_metrics['overall_complexity'] += result['total_complexity'] + complexity_metrics['function_complexity'][relative_path] = result.get('functions', {}) # 更新复杂度分布 complexity_level = self._categorize_complexity(result['total_complexity']) @@ -220,12 +252,7 @@ def _analyze_code_complexity(self, repo_path: Path) -> Dict[str, Any]: def _analyze_documentation(self, repo_path: Path) -> Dict[str, Any]: """分析文档质量""" - doc_metrics = { - 'docstring_coverage': 0, - 'documentation_files': [], - 'documentation_quality': {}, - 'api_documentation': {} - } + doc_metrics = self._empty_documentation_metrics() # 分析Python文件的文档字符串 python_files = list(repo_path.rglob('*.py')) @@ -246,17 +273,31 @@ def _analyze_documentation(self, repo_path: Path) -> Dict[str, Any]: if total_functions > 0: doc_metrics['docstring_coverage'] = documented_functions / total_functions + doc_metrics['api_documentation'] = { + 'documented_symbols': documented_functions, + 'total_symbols': total_functions, + 'coverage': doc_metrics['docstring_coverage'], + } # 检查文档文件 doc_patterns = ['*.md', '*.rst', '*.txt'] for pattern in doc_patterns: doc_files = list(repo_path.rglob(pattern)) for doc_file in doc_files: + quality_score = self._assess_doc_quality(doc_file) + relative_path = str(doc_file.relative_to(repo_path)) doc_metrics['documentation_files'].append({ - 'path': str(doc_file.relative_to(repo_path)), + 'path': relative_path, 'size': doc_file.stat().st_size, - 'quality_score': self._assess_doc_quality(doc_file) + 'quality_score': quality_score, }) + doc_metrics['documentation_quality'][relative_path] = quality_score + if doc_file.name.lower().startswith('readme'): + doc_metrics['has_readme'] = True + doc_metrics['readme_quality'] = max( + doc_metrics['readme_quality'], + quality_score, + ) return doc_metrics @@ -300,29 +341,48 @@ def _analyze_security_measures(self, repo_path: Path) -> Dict[str, Any]: async def _check_dependency_security(self, repo_path: Path) -> Dict[str, Any]: """检查依赖的安全性""" + report = { + 'vulnerable_dependencies': [], + 'total_vulnerabilities': 0, + 'scan_timestamp': datetime.now().isoformat(), + 'status': 'skipped', + 'scanner': 'safety', + } + + requirements_path = repo_path / 'requirements.txt' + if not requirements_path.exists(): + return report + + if shutil.which('safety') is None: + report['status'] = 'unavailable' + return report + try: # 使用safety检查Python依赖 result = subprocess.run( ['safety', 'check', '-r', 'requirements.txt'], cwd=repo_path, capture_output=True, - text=True + text=True, + timeout=30, ) vulnerabilities = [] if result.returncode != 0: - for line in result.stdout.splitlines(): + output = "\n".join([result.stdout, result.stderr]) + for line in output.splitlines(): if 'Found vulnerability' in line: vulnerabilities.append(self._parse_safety_output(line)) - return { - 'vulnerable_dependencies': vulnerabilities, - 'total_vulnerabilities': len(vulnerabilities), - 'scan_timestamp': datetime.now().isoformat() - } + report['vulnerable_dependencies'] = vulnerabilities + report['total_vulnerabilities'] = len(vulnerabilities) + report['status'] = 'issues_found' if vulnerabilities else 'clean' + return report except Exception as e: self.logger.error(f"Dependency security check failed: {str(e)}") - return {} + report['status'] = 'error' + report['scan_error'] = str(e) + return report def _categorize_file_size(self, size: int) -> str: """对文件大小进行分类""" @@ -356,4 +416,419 @@ def _assess_vulnerability_severity(self, vuln_type: str) -> str: 'xss': 'high', 'path_traversal': 'medium' } - return severity_levels.get(vuln_type, 'low') \ No newline at end of file + return severity_levels.get(vuln_type, 'low') + + def _empty_structure(self) -> Dict[str, Any]: + return { + 'files': { + 'total_files': 0, + 'file_types': {}, + 'size_distribution': {}, + 'file_structure': {}, + 'file_paths': [], + }, + 'dependencies': self._empty_dependency_metrics(), + 'complexity': self._empty_complexity_metrics(), + 'documentation': self._empty_documentation_metrics(), + 'primary_language': 'Unknown', + } + + def _empty_security_report(self) -> Dict[str, Any]: + return { + 'vulnerabilities': [], + 'security_measures': { + 'input_validation': self._empty_security_measure(), + 'authentication': self._empty_security_measure(), + 'encryption': self._empty_security_measure(), + 'secure_headers': self._empty_security_measure(), + 'csrf_protection': self._empty_security_measure(), + }, + 'dependency_security': { + 'vulnerable_dependencies': [], + 'total_vulnerabilities': 0, + 'scan_timestamp': datetime.now().isoformat(), + 'status': 'skipped', + 'scanner': 'safety', + }, + } + + def _empty_dependency_metrics(self) -> Dict[str, Any]: + return { + 'direct_dependencies': {}, + 'dev_dependencies': {}, + 'dependency_graph': {}, + 'outdated_dependencies': [], + } + + def _empty_complexity_metrics(self) -> Dict[str, Any]: + return { + 'overall_complexity': 0, + 'file_complexity': {}, + 'function_complexity': {}, + 'complexity_distribution': {}, + } + + def _empty_documentation_metrics(self) -> Dict[str, Any]: + return { + 'docstring_coverage': 0.0, + 'documentation_files': [], + 'documentation_quality': {}, + 'api_documentation': { + 'documented_symbols': 0, + 'total_symbols': 0, + 'coverage': 0.0, + }, + 'has_readme': False, + 'readme_quality': 0.0, + } + + def _empty_security_measure(self) -> Dict[str, Any]: + return {'present': False, 'matches': []} + + def _detect_primary_language(self, file_types: Dict[str, int]) -> str: + language_map = { + '.py': 'Python', + '.ts': 'TypeScript', + '.tsx': 'TypeScript', + '.js': 'JavaScript', + '.jsx': 'JavaScript', + '.java': 'Java', + '.rs': 'Rust', + '.go': 'Go', + '.php': 'PHP', + } + ranked = [ + (language_map.get(ext, ext or 'unknown'), count) + for ext, count in file_types.items() + if ext in language_map + ] + if not ranked: + return 'Unknown' + ranked.sort(key=lambda item: item[1], reverse=True) + return ranked[0][0] + + def _update_file_structure(self, file_structure: Dict[str, Any], relative_path: Path) -> None: + current = file_structure + parts = list(relative_path.parts) + for index, part in enumerate(parts): + is_leaf = index == len(parts) - 1 + if is_leaf: + current.setdefault(part, {}) + else: + current = current.setdefault(part, {}) + + def _merge_dependency_metrics( + self, + target: Dict[str, Any], + parsed: Dict[str, Any], + ) -> None: + for section in ('direct_dependencies', 'dev_dependencies', 'dependency_graph'): + values = parsed.get(section, {}) + if not isinstance(values, dict): + continue + for ecosystem, deps in values.items(): + target[section].setdefault(ecosystem, []) + target[section][ecosystem].extend(deps) + + outdated = parsed.get('outdated_dependencies', []) + if isinstance(outdated, list): + target['outdated_dependencies'].extend(outdated) + + def _parse_python_requirements(self, dep_path: Path) -> Dict[str, Any]: + dependencies = [] + for raw_line in dep_path.read_text(encoding='utf-8').splitlines(): + line = raw_line.strip() + if not line or line.startswith('#'): + continue + name, version = self._split_dependency_spec(line) + dependencies.append({'name': name, 'version': version}) + + names = [dep['name'] for dep in dependencies] + return { + 'direct_dependencies': {'python': dependencies}, + 'dev_dependencies': {}, + 'dependency_graph': {'python': names}, + 'outdated_dependencies': [], + } + + def _parse_node_dependencies(self, dep_path: Path) -> Dict[str, Any]: + try: + data = json.loads(dep_path.read_text(encoding='utf-8')) + except json.JSONDecodeError: + return self._empty_dependency_metrics() + + dependencies = [ + {'name': name, 'version': version} + for name, version in (data.get('dependencies') or {}).items() + ] + dev_dependencies = [ + {'name': name, 'version': version} + for name, version in (data.get('devDependencies') or {}).items() + ] + return { + 'direct_dependencies': {'node': dependencies}, + 'dev_dependencies': {'node': dev_dependencies}, + 'dependency_graph': { + 'node': [dep['name'] for dep in dependencies + dev_dependencies], + }, + 'outdated_dependencies': [], + } + + def _parse_rust_dependencies(self, dep_path: Path) -> Dict[str, Any]: + if tomllib is None: + return self._empty_dependency_metrics() + + data = tomllib.loads(dep_path.read_text(encoding='utf-8')) + dependencies = [ + {'name': name, 'version': self._normalize_dependency_version(version)} + for name, version in (data.get('dependencies') or {}).items() + ] + dev_dependencies = [ + {'name': name, 'version': self._normalize_dependency_version(version)} + for name, version in (data.get('dev-dependencies') or {}).items() + ] + return { + 'direct_dependencies': {'rust': dependencies}, + 'dev_dependencies': {'rust': dev_dependencies}, + 'dependency_graph': { + 'rust': [dep['name'] for dep in dependencies + dev_dependencies], + }, + 'outdated_dependencies': [], + } + + def _parse_maven_dependencies(self, dep_path: Path) -> Dict[str, Any]: + dependencies = [] + dev_dependencies = [] + + try: + tree = ET.parse(dep_path) + except ET.ParseError: + return self._empty_dependency_metrics() + + for dependency in tree.iterfind('.//{*}dependency'): + group_id = self._xml_text(dependency, 'groupId') + artifact_id = self._xml_text(dependency, 'artifactId') + version = self._xml_text(dependency, 'version') + scope = (self._xml_text(dependency, 'scope') or '').lower() + entry = { + 'name': f"{group_id}:{artifact_id}".strip(':'), + 'version': version, + } + if scope == 'test': + dev_dependencies.append(entry) + else: + dependencies.append(entry) + + return { + 'direct_dependencies': {'maven': dependencies}, + 'dev_dependencies': {'maven': dev_dependencies}, + 'dependency_graph': { + 'maven': [dep['name'] for dep in dependencies + dev_dependencies], + }, + 'outdated_dependencies': [], + } + + def _analyze_file_complexity(self, file_path: Path) -> Dict[str, Any]: + try: + content = file_path.read_text(encoding='utf-8') + tree = ast.parse(content) + except Exception: + return { + 'total_complexity': 0, + 'average_complexity': 0.0, + 'max_complexity': 0, + 'functions': {}, + } + + decision_nodes = ( + ast.If, + ast.For, + ast.AsyncFor, + ast.While, + ast.Try, + ast.BoolOp, + ast.With, + ast.AsyncWith, + ast.comprehension, + ) + match_node = getattr(ast, 'Match', None) + if match_node is not None: + decision_nodes = decision_nodes + (match_node,) + + functions: Dict[str, int] = {} + for node in ast.walk(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + branch_points = sum( + 1 + for child in ast.walk(node) + if isinstance( + child, + decision_nodes, + ) + ) + functions[node.name] = 1 + branch_points + + total_complexity = sum(functions.values()) + average_complexity = total_complexity / max(1, len(functions)) + return { + 'total_complexity': total_complexity, + 'average_complexity': average_complexity, + 'max_complexity': max(functions.values(), default=0), + 'functions': functions, + } + + def _assess_doc_quality(self, doc_file: Path) -> float: + try: + content = doc_file.read_text(encoding='utf-8') + except Exception: + return 0.0 + + words = len(re.findall(r'\w+', content)) + headings = len(re.findall(r'^\s*#+\s+', content, re.MULTILINE)) + code_blocks = content.count('```') + score = 0.0 + if words >= 50: + score += 0.4 + elif words >= 15: + score += 0.2 + if headings > 0: + score += 0.3 + if code_blocks > 0: + score += 0.3 + return round(min(1.0, score), 2) + + def _check_input_validation(self, repo_path: Path) -> Dict[str, Any]: + return self._scan_security_pattern( + repo_path, + [ + r'\b(BaseModel|pydantic|validator|validate\(|marshmallow|joi|zod)\b', + ], + ) + + def _check_authentication_mechanisms(self, repo_path: Path) -> Dict[str, Any]: + return self._scan_security_pattern( + repo_path, + [ + r'\b(jwt|oauth|nextauth|authenticate|authorization|bearer)\b', + ], + ) + + def _check_encryption_usage(self, repo_path: Path) -> Dict[str, Any]: + return self._scan_security_pattern( + repo_path, + [ + r'\b(hashlib|bcrypt|argon2|cryptography|fernet|ssl|tls)\b', + ], + ) + + def _check_secure_headers(self, repo_path: Path) -> Dict[str, Any]: + return self._scan_security_pattern( + repo_path, + [ + r'Content-Security-Policy', + r'X-Frame-Options', + r'Strict-Transport-Security', + r'helmet\(', + ], + ) + + def _check_csrf_protection(self, repo_path: Path) -> Dict[str, Any]: + return self._scan_security_pattern( + repo_path, + [ + r'\bcsrf\b', + r'\bxsrf\b', + ], + ) + + def _parse_safety_output(self, line: str) -> Dict[str, Any]: + package_match = re.search(r'vulnerability in ([A-Za-z0-9_.-]+)', line, re.IGNORECASE) + severity_match = re.search(r'\b(critical|high|medium|low)\b', line, re.IGNORECASE) + return { + 'package': package_match.group(1) if package_match else 'unknown', + 'severity': severity_match.group(1).lower() if severity_match else 'unknown', + 'advisory': line.strip(), + } + + def _estimate_test_signal(self, file_stats: Dict[str, Any]) -> float: + file_paths = file_stats.get('file_paths', []) + if not file_paths: + return 0.0 + + test_files = [ + path for path in file_paths + if path.startswith('tests/') + or '/tests/' in path + or path.endswith('_test.py') + or path.endswith('.spec.ts') + or path.endswith('.test.ts') + or path.endswith('.test.tsx') + or path.endswith('.spec.js') + or path.endswith('.test.js') + ] + code_files = [ + path for path in file_paths + if path.endswith(('.py', '.ts', '.tsx', '.js', '.jsx')) + and path not in test_files + ] + if not code_files: + return 0.0 + + ratio = len(test_files) / max(1, len(code_files)) + return round(min(1.0, ratio * 2.0), 2) + + def _split_dependency_spec(self, spec: str) -> tuple[str, Optional[str]]: + match = re.match(r'^([A-Za-z0-9_.-]+)\s*(?:==|>=|<=|~=|!=|>|<)?\s*(.*)$', spec) + if not match: + return spec, None + version = match.group(2).strip() or None + return match.group(1), version + + def _normalize_dependency_version(self, value: Any) -> Optional[str]: + if isinstance(value, str): + return value + if isinstance(value, dict): + version = value.get('version') + return str(version) if version else None + return None + + def _xml_text(self, dependency: ET.Element, tag_name: str) -> Optional[str]: + for child in dependency: + if child.tag.endswith(tag_name): + return (child.text or '').strip() or None + return None + + def _scan_security_pattern( + self, + repo_path: Path, + patterns: List[str], + ) -> Dict[str, Any]: + matches: List[Dict[str, Any]] = [] + for file_path in repo_path.rglob('*'): + if not file_path.is_file(): + continue + if file_path.suffix not in {'.py', '.js', '.ts', '.tsx', '.jsx', '.json', '.yml', '.yaml'}: + continue + + try: + content = file_path.read_text(encoding='utf-8', errors='ignore') + except Exception: + continue + + for pattern in patterns: + matched = re.search(pattern, content, re.IGNORECASE) + if not matched: + continue + matches.append( + { + 'file': str(file_path.relative_to(repo_path)), + 'line': content.count('\n', 0, matched.start()) + 1, + 'pattern': pattern, + } + ) + break + + return { + 'present': bool(matches), + 'matches': matches[:20], + } diff --git a/tests/test_code_analysis_fallback.py b/tests/test_code_analysis_fallback.py deleted file mode 100644 index 4ce93cdb..00000000 --- a/tests/test_code_analysis_fallback.py +++ /dev/null @@ -1,16 +0,0 @@ -from agents.code_analysis_agent import CodeAnalysisAgent - - -def test_code_analysis_placeholder_no_repo(): - agent = CodeAnalysisAgent({}) - res = agent._placeholder(None, "no_repository_provided") - assert res["placeholder"] is True - assert res["repo_url"] is None - assert res["stars"] is None - - -def test_code_analysis_process_without_repo_returns_placeholder(): - agent = CodeAnalysisAgent({}) - res = agent.process() - assert res["placeholder"] is True - diff --git a/tests/unit/test_code_analysis_contracts.py b/tests/unit/test_code_analysis_contracts.py new file mode 100644 index 00000000..17a86f27 --- /dev/null +++ b/tests/unit/test_code_analysis_contracts.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import pytest + +from paperbot.agents.code_analysis.agent import CodeAnalysisAgent +from paperbot.agents.quality.agent import QualityAgent + + +@pytest.mark.asyncio +async def test_single_repo_mode_preserves_placeholder_result(): + agent = CodeAnalysisAgent({}) + placeholder = agent._placeholder("https://github.com/example/repo", "clone_failed") + + async def fake_process_batch(_links): + return { + "repositories_analyzed": 1, + "analysis_results": [placeholder], + } + + agent._process_batch = fake_process_batch # type: ignore[method-assign] + + result = await agent.process(repo_url="https://github.com/example/repo") + + assert result["placeholder"] is True + assert result["reason"] == "clone_failed" + assert result["repo_url"] == "https://github.com/example/repo" + + +def test_flatten_result_maps_code_meta_contract_fields(): + agent = CodeAnalysisAgent({}) + + flattened = agent._flatten_result( + { + "repo_url": "https://github.com/example/repo", + "analysis": { + "structure_analysis": { + "primary_language": "Python", + "documentation": {"has_readme": True}, + }, + "quality_analysis": { + "overall_score": 0.82, + "recommendations": ["Add more tests"], + }, + }, + "meta": { + "stars": 12, + "forks": 3, + "updated_at": "2026-03-01T00:00:00+00:00", + "last_commit_at": "2026-03-02T00:00:00+00:00", + }, + } + ) + + assert flattened["updated_at"] == "2026-03-01T00:00:00+00:00" + assert flattened["last_commit_date"] == "2026-03-02T00:00:00+00:00" + assert flattened["reproducibility_score"] == 82.0 + assert flattened["has_readme"] is True + + +@pytest.mark.asyncio +async def test_quality_agent_accepts_flat_workflow_code_analysis_result(): + agent = QualityAgent({}) + + result = await agent.process( + code_analysis_result={ + "repo_url": "https://github.com/example/repo", + "reproducibility_score": 72.0, + "has_readme": True, + } + ) + + assert result["quality_score"] == 0.72 + assert "README" in " ".join(result["strengths"]) + + +@pytest.mark.asyncio +async def test_quality_agent_scores_nested_analysis_contract(): + agent = QualityAgent({}) + + result = await agent.process( + analysis_results={ + "analysis_results": [ + { + "repo_url": "https://github.com/example/repo", + "analysis": { + "structure_analysis": { + "files": { + "file_paths": [ + "src/app.py", + "tests/test_app.py", + ] + }, + "complexity": { + "overall_complexity": 4, + "file_complexity": { + "src/app.py": {"total_complexity": 4} + }, + }, + "documentation": { + "docstring_coverage": 0.8, + "readme_quality": 0.9, + "api_documentation": {"coverage": 0.75}, + }, + }, + "quality_analysis": { + "overall_score": 0.8, + "complexity_score": 0.9, + "maintainability_score": 0.85, + "documentation_score": 0.8, + "test_coverage_score": 0.7, + "has_readme": True, + }, + "security_analysis": { + "vulnerabilities": [], + "security_measures": { + "input_validation": { + "present": True, + "matches": [{"file": "src/app.py", "line": 1}], + }, + "authentication": {"present": False, "matches": []}, + "encryption": {"present": False, "matches": []}, + "secure_headers": {"present": False, "matches": []}, + "csrf_protection": {"present": False, "matches": []}, + }, + "dependency_security": {"total_vulnerabilities": 0}, + }, + }, + } + ] + } + ) + + repo_score = result["quality_scores"]["https://github.com/example/repo"] + assert result["quality_score"] > 0.0 + assert repo_score["overall_score"] > 0.0 + assert repo_score["scores"]["documentation"] >= 0.8 diff --git a/tests/unit/test_code_analyzer_contracts.py b/tests/unit/test_code_analyzer_contracts.py new file mode 100644 index 00000000..d53587bc --- /dev/null +++ b/tests/unit/test_code_analyzer_contracts.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +import pytest + +from paperbot.utils.analyzer import CodeAnalyzer + + +@pytest.mark.asyncio +async def test_code_analyzer_returns_stable_contracts(tmp_path): + (tmp_path / "src").mkdir() + (tmp_path / "tests").mkdir() + + (tmp_path / "README.md").write_text( + "# Sample Repo\n\n## Usage\n\n```bash\npytest\n```\n", + encoding="utf-8", + ) + (tmp_path / "requirements.txt").write_text( + "fastapi==0.109.0\npytest==8.0.0\n", + encoding="utf-8", + ) + (tmp_path / "src" / "app.py").write_text( + '"""App module."""\n' + "from pydantic import BaseModel\n\n" + "class InputPayload(BaseModel):\n" + " name: str\n\n" + "def handle(payload):\n" + ' """Handle payload."""\n' + " if payload:\n" + " return 1\n" + " return 0\n", + encoding="utf-8", + ) + (tmp_path / "tests" / "test_app.py").write_text( + "def test_ok():\n" + " assert True\n", + encoding="utf-8", + ) + + analyzer = CodeAnalyzer({}) + + structure = await analyzer.analyze_structure(tmp_path) + security = await analyzer.analyze_security(tmp_path) + quality = await analyzer.analyze_quality(tmp_path) + dependencies = await analyzer.analyze_dependencies(tmp_path) + + assert structure["primary_language"] == "Python" + assert "src/app.py" in structure["files"]["file_paths"] + assert structure["documentation"]["has_readme"] is True + assert dependencies["direct_dependencies"]["python"][0]["name"] == "fastapi" + + assert security["security_measures"]["input_validation"]["present"] is True + assert security["dependency_security"]["scanner"] == "safety" + + assert quality["documentation_metrics"]["has_readme"] is True + assert quality["overall_score"] > 0 + diff --git a/tests/unit/test_workflow_coordinator.py b/tests/unit/test_workflow_coordinator.py index e1cc4370..5dba4c90 100644 --- a/tests/unit/test_workflow_coordinator.py +++ b/tests/unit/test_workflow_coordinator.py @@ -24,6 +24,15 @@ def __init__(self, total=77.0, academic=70.0, engineering=84.0, momentum=12.0): self.metrics_breakdown = {"academic": {"momentum_score": momentum}} +class _CaptureInfluenceCalculator: + def __init__(self): + self.code_meta = None + + def calculate(self, paper, code_meta): + self.code_meta = code_meta + return _FakeInfluence() + + @pytest.mark.asyncio async def test_workflow_coordinator_publishes_all_four_stage_scores(tmp_path): coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) @@ -62,6 +71,90 @@ async def test_workflow_coordinator_publishes_all_four_stage_scores(tmp_path): assert [event.stage for event in score_events] == ["research", "code", "quality", "influence"] +@pytest.mark.asyncio +async def test_workflow_coordinator_uses_reproducibility_score_for_code_stage(tmp_path): + coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) + coordinator._research_agent = SimpleNamespace(process=AsyncMock(return_value={"venue_tier": 1})) + coordinator._code_analysis_agent = SimpleNamespace( + process=AsyncMock( + return_value={ + "reproducibility_score": 82.0, + "updated_at": "2026-03-01T00:00:00+00:00", + "last_commit_date": "2026-03-02T00:00:00+00:00", + "has_readme": True, + "stars": 12, + "forks": 3, + } + ) + ) + coordinator._quality_agent = SimpleNamespace( + process=AsyncMock(return_value={"quality_score": 0.71, "quality_scores": {"repo": {"overall_score": 0.71}}}) + ) + coordinator._influence_calculator = SimpleNamespace(calculate=lambda paper, code_meta: _FakeInfluence()) + coordinator._report_writer = None + + paper = SimpleNamespace( + paper_id="paper-repro", + title="Repro Paper", + abstract="Abstract", + citation_count=30, + github_url="https://github.com/example/repo", + has_code=True, + ) + event_log = _FakeEventLog() + + await coordinator.run_paper_pipeline( + paper, + event_log=event_log, + run_id="run-repro", + trace_id="trace-repro", + ) + + score_events = [event for event in event_log.events if event.type == "score_update"] + code_event = next(event for event in score_events if event.stage == "code") + assert code_event.payload["score"]["score"] == 82.0 + + +@pytest.mark.asyncio +async def test_workflow_coordinator_passes_aligned_code_meta_to_influence(tmp_path): + coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) + coordinator._research_agent = SimpleNamespace(process=AsyncMock(return_value={"venue_tier": 1})) + coordinator._code_analysis_agent = SimpleNamespace( + process=AsyncMock( + return_value={ + "reproducibility_score": 82.0, + "updated_at": "2026-03-01T00:00:00+00:00", + "last_commit_date": "2026-03-02T00:00:00+00:00", + "has_readme": True, + "stars": 12, + "forks": 3, + } + ) + ) + coordinator._quality_agent = SimpleNamespace( + process=AsyncMock(return_value={"quality_score": 0.71, "quality_scores": {"repo": {"overall_score": 0.71}}}) + ) + influence_calculator = _CaptureInfluenceCalculator() + coordinator._influence_calculator = influence_calculator + coordinator._report_writer = None + + paper = SimpleNamespace( + paper_id="paper-meta", + title="Meta Paper", + abstract="Abstract", + citation_count=30, + github_url="https://github.com/example/repo", + has_code=True, + ) + + await coordinator.run_paper_pipeline(paper) + + assert influence_calculator.code_meta is not None + assert influence_calculator.code_meta.updated_at == "2026-03-01T00:00:00+00:00" + assert influence_calculator.code_meta.last_commit_date == "2026-03-02T00:00:00+00:00" + assert influence_calculator.code_meta.reproducibility_score == 82.0 + + @pytest.mark.asyncio async def test_workflow_coordinator_early_exit_skips_remaining_stages(tmp_path): coordinator = ScholarWorkflowCoordinator( From 2a499bcc26806eaa736d0636f81bac8455ca9c94 Mon Sep 17 00:00:00 2001 From: jerry609 <1772030600@qq.com> Date: Sun, 15 Mar 2026 14:25:06 +0800 Subject: [PATCH 2/4] refactor: harden dependency spec parsing --- src/paperbot/utils/analyzer.py | 12 +++++++----- tests/unit/test_code_analyzer_contracts.py | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/paperbot/utils/analyzer.py b/src/paperbot/utils/analyzer.py index f4605551..d11ec66e 100644 --- a/src/paperbot/utils/analyzer.py +++ b/src/paperbot/utils/analyzer.py @@ -778,11 +778,13 @@ def _estimate_test_signal(self, file_stats: Dict[str, Any]) -> float: return round(min(1.0, ratio * 2.0), 2) def _split_dependency_spec(self, spec: str) -> tuple[str, Optional[str]]: - match = re.match(r'^([A-Za-z0-9_.-]+)\s*(?:==|>=|<=|~=|!=|>|<)?\s*(.*)$', spec) - if not match: - return spec, None - version = match.group(2).strip() or None - return match.group(1), version + normalized = spec.strip() + for operator in ('==', '>=', '<=', '~=', '!=', '>', '<'): + if operator not in normalized: + continue + name, version = normalized.split(operator, 1) + return name.strip(), version.strip() or None + return normalized, None def _normalize_dependency_version(self, value: Any) -> Optional[str]: if isinstance(value, str): diff --git a/tests/unit/test_code_analyzer_contracts.py b/tests/unit/test_code_analyzer_contracts.py index d53587bc..9d1fea2d 100644 --- a/tests/unit/test_code_analyzer_contracts.py +++ b/tests/unit/test_code_analyzer_contracts.py @@ -54,3 +54,22 @@ async def test_code_analyzer_returns_stable_contracts(tmp_path): assert quality["documentation_metrics"]["has_readme"] is True assert quality["overall_score"] > 0 + +@pytest.mark.asyncio +async def test_code_analyzer_parses_requirement_operators_without_regex_backtracking(tmp_path): + (tmp_path / "requirements.txt").write_text( + "fastapi==0.109.0\n" + "pydantic>=2.6.0\n" + "uvicorn\n", + encoding="utf-8", + ) + + analyzer = CodeAnalyzer({}) + + dependencies = await analyzer.analyze_dependencies(tmp_path) + + assert dependencies["direct_dependencies"]["python"] == [ + {"name": "fastapi", "version": "0.109.0"}, + {"name": "pydantic", "version": "2.6.0"}, + {"name": "uvicorn", "version": None}, + ] From 045ebe9237d7e30ac960807db9979452b8405c08 Mon Sep 17 00:00:00 2001 From: jerry609 <1772030600@qq.com> Date: Sun, 15 Mar 2026 14:31:05 +0800 Subject: [PATCH 3/4] test: align sonar reliability contracts --- src/paperbot/agents/quality/agent.py | 39 ++++++++++++---------- tests/unit/test_code_analysis_contracts.py | 14 ++++---- tests/unit/test_workflow_coordinator.py | 4 +-- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/paperbot/agents/quality/agent.py b/src/paperbot/agents/quality/agent.py index fe555ca4..210b66c9 100644 --- a/src/paperbot/agents/quality/agent.py +++ b/src/paperbot/agents/quality/agent.py @@ -234,31 +234,34 @@ async def _analyze_test_coverage(self, analysis: Dict[str, Any]) -> float: if not file_paths: return 0.0 - test_files = [ - path for path in file_paths - if path.startswith('tests/') - or '/tests/' in path - or path.endswith('_test.py') - or path.endswith('.spec.ts') - or path.endswith('.test.ts') - or path.endswith('.test.tsx') - or path.endswith('.spec.js') - or path.endswith('.test.js') - ] - code_files = [ - path for path in file_paths - if path.endswith(('.py', '.ts', '.tsx', '.js', '.jsx')) - and path not in test_files - ] - if not code_files: + test_count = sum(1 for path in file_paths if self._is_test_file(path)) + code_count = sum( + 1 for path in file_paths if self._is_source_file(path) and not self._is_test_file(path) + ) + if not code_count: return 0.0 - ratio = len(test_files) / max(1, len(code_files)) + ratio = test_count / code_count return min(1.0, max(0.0, ratio * 2.0)) except Exception as e: self.log_error(e) return 0.0 + def _is_test_file(self, path: str) -> bool: + return ( + path.startswith('tests/') + or '/tests/' in path + or path.endswith('_test.py') + or path.endswith('.spec.ts') + or path.endswith('.test.ts') + or path.endswith('.test.tsx') + or path.endswith('.spec.js') + or path.endswith('.test.js') + ) + + def _is_source_file(self, path: str) -> bool: + return path.endswith(('.py', '.ts', '.tsx', '.js', '.jsx')) + def _calculate_overall_score(self, scores: Dict[str, float]) -> float: """计算总体质量分数""" weights = { diff --git a/tests/unit/test_code_analysis_contracts.py b/tests/unit/test_code_analysis_contracts.py index 17a86f27..b7b909f7 100644 --- a/tests/unit/test_code_analysis_contracts.py +++ b/tests/unit/test_code_analysis_contracts.py @@ -1,5 +1,7 @@ from __future__ import annotations +from unittest.mock import AsyncMock + import pytest from paperbot.agents.code_analysis.agent import CodeAnalysisAgent @@ -10,14 +12,12 @@ async def test_single_repo_mode_preserves_placeholder_result(): agent = CodeAnalysisAgent({}) placeholder = agent._placeholder("https://github.com/example/repo", "clone_failed") - - async def fake_process_batch(_links): - return { + agent._process_batch = AsyncMock( # type: ignore[method-assign] + return_value={ "repositories_analyzed": 1, "analysis_results": [placeholder], } - - agent._process_batch = fake_process_batch # type: ignore[method-assign] + ) result = await agent.process(repo_url="https://github.com/example/repo") @@ -53,7 +53,7 @@ def test_flatten_result_maps_code_meta_contract_fields(): assert flattened["updated_at"] == "2026-03-01T00:00:00+00:00" assert flattened["last_commit_date"] == "2026-03-02T00:00:00+00:00" - assert flattened["reproducibility_score"] == 82.0 + assert flattened["reproducibility_score"] == pytest.approx(82.0) assert flattened["has_readme"] is True @@ -69,7 +69,7 @@ async def test_quality_agent_accepts_flat_workflow_code_analysis_result(): } ) - assert result["quality_score"] == 0.72 + assert result["quality_score"] == pytest.approx(0.72) assert "README" in " ".join(result["strengths"]) diff --git a/tests/unit/test_workflow_coordinator.py b/tests/unit/test_workflow_coordinator.py index 5dba4c90..a22de15b 100644 --- a/tests/unit/test_workflow_coordinator.py +++ b/tests/unit/test_workflow_coordinator.py @@ -112,7 +112,7 @@ async def test_workflow_coordinator_uses_reproducibility_score_for_code_stage(tm score_events = [event for event in event_log.events if event.type == "score_update"] code_event = next(event for event in score_events if event.stage == "code") - assert code_event.payload["score"]["score"] == 82.0 + assert code_event.payload["score"]["score"] == pytest.approx(82.0) @pytest.mark.asyncio @@ -152,7 +152,7 @@ async def test_workflow_coordinator_passes_aligned_code_meta_to_influence(tmp_pa assert influence_calculator.code_meta is not None assert influence_calculator.code_meta.updated_at == "2026-03-01T00:00:00+00:00" assert influence_calculator.code_meta.last_commit_date == "2026-03-02T00:00:00+00:00" - assert influence_calculator.code_meta.reproducibility_score == 82.0 + assert influence_calculator.code_meta.reproducibility_score == pytest.approx(82.0) @pytest.mark.asyncio From 728e238e76368241e0d1b1cc3aeefdf09f053f09 Mon Sep 17 00:00:00 2001 From: jerry609 <1772030600@qq.com> Date: Sun, 15 Mar 2026 14:34:25 +0800 Subject: [PATCH 4/4] test: dedupe workflow coordinator fixtures --- tests/unit/test_workflow_coordinator.py | 70 +++++++++++-------------- 1 file changed, 32 insertions(+), 38 deletions(-) diff --git a/tests/unit/test_workflow_coordinator.py b/tests/unit/test_workflow_coordinator.py index a22de15b..b132cd14 100644 --- a/tests/unit/test_workflow_coordinator.py +++ b/tests/unit/test_workflow_coordinator.py @@ -33,6 +33,33 @@ def calculate(self, paper, code_meta): return _FakeInfluence() +def _sample_code_analysis_result(): + return { + "reproducibility_score": 82.0, + "updated_at": "2026-03-01T00:00:00+00:00", + "last_commit_date": "2026-03-02T00:00:00+00:00", + "has_readme": True, + "stars": 12, + "forks": 3, + } + + +def _build_coordinator_with_code_result(tmp_path, *, influence_calculator=None): + coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) + coordinator._research_agent = SimpleNamespace(process=AsyncMock(return_value={"venue_tier": 1})) + coordinator._code_analysis_agent = SimpleNamespace( + process=AsyncMock(return_value=_sample_code_analysis_result()) + ) + coordinator._quality_agent = SimpleNamespace( + process=AsyncMock(return_value={"quality_score": 0.71, "quality_scores": {"repo": {"overall_score": 0.71}}}) + ) + coordinator._influence_calculator = influence_calculator or SimpleNamespace( + calculate=lambda paper, code_meta: _FakeInfluence() + ) + coordinator._report_writer = None + return coordinator + + @pytest.mark.asyncio async def test_workflow_coordinator_publishes_all_four_stage_scores(tmp_path): coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) @@ -73,25 +100,7 @@ async def test_workflow_coordinator_publishes_all_four_stage_scores(tmp_path): @pytest.mark.asyncio async def test_workflow_coordinator_uses_reproducibility_score_for_code_stage(tmp_path): - coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) - coordinator._research_agent = SimpleNamespace(process=AsyncMock(return_value={"venue_tier": 1})) - coordinator._code_analysis_agent = SimpleNamespace( - process=AsyncMock( - return_value={ - "reproducibility_score": 82.0, - "updated_at": "2026-03-01T00:00:00+00:00", - "last_commit_date": "2026-03-02T00:00:00+00:00", - "has_readme": True, - "stars": 12, - "forks": 3, - } - ) - ) - coordinator._quality_agent = SimpleNamespace( - process=AsyncMock(return_value={"quality_score": 0.71, "quality_scores": {"repo": {"overall_score": 0.71}}}) - ) - coordinator._influence_calculator = SimpleNamespace(calculate=lambda paper, code_meta: _FakeInfluence()) - coordinator._report_writer = None + coordinator = _build_coordinator_with_code_result(tmp_path) paper = SimpleNamespace( paper_id="paper-repro", @@ -117,26 +126,11 @@ async def test_workflow_coordinator_uses_reproducibility_score_for_code_stage(tm @pytest.mark.asyncio async def test_workflow_coordinator_passes_aligned_code_meta_to_influence(tmp_path): - coordinator = ScholarWorkflowCoordinator({"output_dir": str(tmp_path), "enable_fail_fast": False}) - coordinator._research_agent = SimpleNamespace(process=AsyncMock(return_value={"venue_tier": 1})) - coordinator._code_analysis_agent = SimpleNamespace( - process=AsyncMock( - return_value={ - "reproducibility_score": 82.0, - "updated_at": "2026-03-01T00:00:00+00:00", - "last_commit_date": "2026-03-02T00:00:00+00:00", - "has_readme": True, - "stars": 12, - "forks": 3, - } - ) - ) - coordinator._quality_agent = SimpleNamespace( - process=AsyncMock(return_value={"quality_score": 0.71, "quality_scores": {"repo": {"overall_score": 0.71}}}) - ) influence_calculator = _CaptureInfluenceCalculator() - coordinator._influence_calculator = influence_calculator - coordinator._report_writer = None + coordinator = _build_coordinator_with_code_result( + tmp_path, + influence_calculator=influence_calculator, + ) paper = SimpleNamespace( paper_id="paper-meta",