diff --git a/libs/openant-core/parsers/python/function_extractor.py b/libs/openant-core/parsers/python/function_extractor.py index 37475e57..dac03832 100644 --- a/libs/openant-core/parsers/python/function_extractor.py +++ b/libs/openant-core/parsers/python/function_extractor.py @@ -229,7 +229,12 @@ def classify_function(self, func_name: str, decorators: List[str], return 'constructor' if func_name.startswith('__') and func_name.endswith('__'): return 'dunder_method' - if '@property' in dec_str: + # @property getter, @.setter/.deleter, and @cached_property/ + # @functools.cached_property are all property accessors. Reuse the + # single _property_role predicate so classification can't drift from + # the qualified_name role-suffix logic (a literal '@property' match + # silently mislabels @cached_property as a plain method). + if self._property_role(decorators) is not None: return 'property' if '@staticmethod' in dec_str: return 'static_method' @@ -241,13 +246,19 @@ def classify_function(self, func_name: str, decorators: List[str], if 'middleware' in func_name.lower() or self._path_has_segment(file_path, 'middleware'): return 'middleware' - # Test functions. - # 'test' is matched as a substring here on purpose: test-file conventions use plural/affixed - # forms ('tests/' dir, 'test_*'/'*_test' files) that a whole-segment match would miss. The - # substring over-match (e.g. 'latest'/'contest') is the is_test_file family handled in its own - # scheduled units -- 'test' is NOT an entry-point type, so unlike 'views' it seeds no false - # reachability and is intentionally left as a substring test in this fix. - if func_name.startswith('test_') or 'test' in path_lower: + # Test functions. Match the file by PATH COMPONENT, not bare substring: + # 'test' in path_lower wrongly flags e.g. latest.py / contest.py / fastest.py. + # A real test file is one whose directory or filename is/starts-with 'test' + # (pytest's discovery convention: test_*.py / *_test.py / a tests/ dir). + path_parts = path_lower.replace('\\', '/').split('/') + filename = path_parts[-1] if path_parts else '' + is_test_path = ( + any(part == 'test' or part == 'tests' for part in path_parts[:-1]) + or filename.startswith('test_') + or filename.endswith('_test.py') + or filename == 'test.py' + ) + if func_name.startswith('test_') or is_test_path: return 'test' # Utility functions @@ -298,29 +309,250 @@ def extract_imports(self, tree: ast.AST, file_path: str) -> Dict[str, str]: return imports + def _property_role(self, decorators: List[str]) -> Optional[str]: + """Classify a property accessor from its decorators: getter | setter | + deleter | None. Match on the decorator's final dotted segment (TOKEN), + not a bare substring, so `@property`/`@cached_property`/ + `@functools.cached_property`/`@x.setter`/`@x.deleter` are recognized but + a method whose decorator merely CONTAINS the text (e.g. + `@some_property_validator`, `@app.property_route`) is NOT misclassified.""" + for d in decorators: + leaf = d.lstrip('@').split('(')[0].rsplit('.', 1)[-1] + if leaf == 'setter': + return 'setter' + if leaf == 'deleter': + return 'deleter' + for d in decorators: + leaf = d.lstrip('@').split('(')[0].rsplit('.', 1)[-1] + if leaf in ('property', 'cached_property'): + return 'getter' + return None + + def _store_function(self, func_id: str, func_data: Dict) -> str: + """Insert a function unit, disambiguating any residual func_id collision. + + Property accessors are already disambiguated by ROLE upstream (in + process_function, via the qualified_name), so they never collide here. + The residual cases are TRUE same-qualified-name duplicates -- two nested + defs of the same name in one scope, or a lambda sharing a name with a + def. Keying solely on qualified_name would let the second overwrite the + first (a recall loss), so disambiguate DETERMINISTICALLY by source line + (`#L`), never by emission order -- the canonical-unit choice must + be stable across edits. The earlier-in-source unit (parsed first) keeps + the clean id. + """ + if func_id not in self.functions: + self.functions[func_id] = func_data + return func_id + line = func_data.get('start_line', 0) + unique_id = f"{func_id}#L{line}" + n = 2 + while unique_id in self.functions: + unique_id = f"{func_id}#L{line}.{n}" + n += 1 + self.functions[unique_id] = func_data + return unique_id + + def _count_function(self, func_data: Dict, *, is_method: bool) -> None: + """Update statistics for a single emitted function/method unit.""" + self.stats['total_functions'] += 1 + if is_method: + self.stats['total_methods'] += 1 + else: + self.stats['standalone_functions'] += 1 + if func_data['is_async']: + self.stats['async_functions'] += 1 + unit_type = func_data['unit_type'] + self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 + + # Block-statement containers whose bodies may hold def/class nodes the + # def/class-only recursion never reaches. Built defensively so Python + # versions lacking TryStar (<3.11) / Match (<3.10) don't raise. + _BLOCK_CONTAINERS = tuple(filter(None, ( + getattr(ast, _n, None) for _n in ( + 'If', 'For', 'AsyncFor', 'While', 'With', 'AsyncWith', + 'Try', 'TryStar', 'Match', + ) + ))) + + @staticmethod + def _block_bodies(stmt: ast.AST) -> List[list]: + """Every statement-list body of a block container (if/try/for/.../match).""" + bodies: List[list] = [] + for field in ('body', 'orelse', 'finalbody'): + v = getattr(stmt, field, None) + if isinstance(v, list): + bodies.append(v) + for handler in getattr(stmt, 'handlers', None) or []: # except arms + b = getattr(handler, 'body', None) + if isinstance(b, list): + bodies.append(b) + for case in getattr(stmt, 'cases', None) or []: # match arms + b = getattr(case, 'body', None) + if isinstance(b, list): + bodies.append(b) + return bodies + + def _descend_into_blocks(self, stmts: list, file_path: Path, content: str) -> None: + """Find def/class nodes inside block statements at ANY depth and emit them. + + A `def`/`class` inside an `if`/`try`/`for`/`while`/`with`/`match` block is + runtime-reachable (version guards, `try/except ImportError` fallbacks, + CBV `if/else` dispatchers) but the def/class-only recursion never entered + a block body, so it was dropped from both the inventory and the call + graph. This descends ONLY into block-container nodes — direct + `FunctionDef`/`ClassDef` children of a body are emitted by the caller, so + there is no double-processing (the two node sets are disjoint). Surfaced + defs reuse the existing keep-both (`#L`) machinery. + """ + for stmt in stmts: + if not isinstance(stmt, self._BLOCK_CONTAINERS): + continue + for body in self._block_bodies(stmt): + for child in body: + if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): + self._process_function_tree(child, file_path, content, class_name=None) + elif isinstance(child, ast.ClassDef): + self._process_class_tree(child, file_path, content, outer_qualifier=None) + self._descend_into_blocks(body, file_path, content) + + def _process_function_tree(self, node: ast.AST, file_path: Path, content: str, + class_name: Optional[str] = None) -> None: + """Register a function and recurse into its body. + + Handles defs nested inside a function body (which the top-level child + iteration never reaches) and classes nested inside a function. Each + nested def is emitted as its own unit; nested classes are delegated to + process_class so their methods are extracted too. + """ + func_id, func_data = self.process_function(node, str(file_path), content, class_name) + self._store_function(func_id, func_data) + self._count_function(func_data, is_method=class_name is not None) + + # Recurse into the body: a def nested inside this function's body is + # never reached by the top-level / direct-method walks. + for child in node.body: + if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)): + # A def nested inside a function is a standalone (non-method) + # function in its own right; do not attribute it to a class. + self._process_function_tree(child, file_path, content, class_name=None) + elif isinstance(child, ast.ClassDef): + self._process_class_tree(child, file_path, content, outer_qualifier=None) + # defs/classes wrapped in a block inside this function's body + self._descend_into_blocks(node.body, file_path, content) + + def _process_class_tree(self, node: ast.ClassDef, file_path: Path, content: str, + outer_qualifier: Optional[str] = None) -> None: + """Register a class, its methods, and any classes nested within it. + + `outer_qualifier` is the dotted prefix of any enclosing class + (e.g. 'Outer' so an inner class method is keyed 'Outer.Inner.deep'). + """ + class_id, class_data, method_nodes = self.process_class( + node, str(file_path), content, outer_qualifier=outer_qualifier + ) + self.classes[class_id] = class_data + self.stats['total_classes'] += 1 + + qualified_class = f"{outer_qualifier}.{node.name}" if outer_qualifier else node.name + + for method_node, method_class_name in method_nodes: + # Methods may themselves contain nested defs -- recurse. + self._process_function_tree(method_node, file_path, content, class_name=method_class_name) + + # Recurse into nested classes so their methods are extracted. + for item in node.body: + if isinstance(item, ast.ClassDef): + self._process_class_tree(item, file_path, content, outer_qualifier=qualified_class) + # defs/classes wrapped in a block inside the class body (e.g. an + # `if TYPE_CHECKING:` block declaring conditional members). + self._descend_into_blocks(node.body, file_path, content) + + def extract_assigned_lambdas(self, tree: ast.AST, file_path: Path, content: str) -> None: + """Emit a function unit for each module-level `name = lambda ...`. + + Only FunctionDef/AsyncFunctionDef/ClassDef are recognised as units, so a + named lambda (a common handler / dispatch idiom) is invisible and calls + to it cannot resolve. Capture module-level single-target name bindings to + a lambda as functions. + """ + relative_path = str(file_path.relative_to(self.repo_path)) + for node in ast.iter_child_nodes(tree): + if not isinstance(node, ast.Assign): + continue + if not isinstance(node.value, ast.Lambda): + continue + for target in node.targets: + if not isinstance(target, ast.Name): + continue + name = target.id + func_id = f"{relative_path}:{name}" + params = [a.arg for a in node.value.args.args] + if node.value.args.vararg: + params.append(f"*{node.value.args.vararg.arg}") + for a in node.value.args.kwonlyargs: + params.append(a.arg) + if node.value.args.kwarg: + params.append(f"**{node.value.args.kwarg.arg}") + func_data = { + 'name': name, + 'qualified_name': name, + 'file_path': relative_path, + 'start_line': node.lineno, + 'end_line': getattr(node, 'end_lineno', node.lineno), + 'code': self.get_source_segment(content, node), + 'class_name': None, + 'decorators': [], + 'is_async': False, + 'parameters': params, + 'docstring': None, + 'unit_type': self.classify_function(name, [], None, relative_path), + 'is_lambda': True, + } + self._store_function(func_id, func_data) + self._count_function(func_data, is_method=False) + def process_function(self, node: ast.FunctionDef, file_path: str, content: str, class_name: Optional[str] = None) -> Dict: """Process a function definition and extract metadata.""" func_name = node.name - qualified_name = f"{class_name}.{func_name}" if class_name else func_name - - # Generate unique ID relative_path = str(Path(file_path).relative_to(self.repo_path)) - func_id = f"{relative_path}:{qualified_name}" # Extract metadata decorators = self.extract_decorators(node) + + # @property getter, @x.setter and @x.deleter accessors all share the + # qualified name `Class.x`, which would collide into one func_id and let + # the setter overwrite the getter. Disambiguate by ROLE in the + # qualified_name (getter stays canonical `C.x`; setter -> `C.x.setter`, + # deleter -> `C.x.deleter`). This keeps func_id == path:qualified_name -- + # the invariant call_graph_builder relies on to reconstruct call targets + # -- and is order-independent (role is intrinsic, not emission position). + property_role = self._property_role(decorators) + qualified_name = f"{class_name}.{func_name}" if class_name else func_name + if property_role in ('setter', 'deleter'): + qualified_name = f"{qualified_name}.{property_role}" + + # Generate unique ID (after any role suffix) + func_id = f"{relative_path}:{qualified_name}" parameters = self.extract_parameters(node) docstring = self.get_docstring(node) code = self.get_source_segment(content, node) is_async = isinstance(node, ast.AsyncFunctionDef) unit_type = self.classify_function(func_name, decorators, class_name, relative_path) + # The captured `code` (get_source_segment) includes any decorator lines, + # so start_line must point at the first decorator, not the `def` line. + # Off-by-one for one decorator; off-by-N for stacked decorators. + start_line = node.lineno + if getattr(node, 'decorator_list', None): + start_line = min(start_line, min(d.lineno for d in node.decorator_list)) + func_data = { 'name': func_name, 'qualified_name': qualified_name, 'file_path': relative_path, - 'start_line': node.lineno, + 'start_line': start_line, 'end_line': getattr(node, 'end_lineno', node.lineno), 'code': code, 'class_name': class_name, @@ -329,13 +561,20 @@ def process_function(self, node: ast.FunctionDef, file_path: str, 'parameters': parameters, 'docstring': docstring[:500] if docstring else None, # Truncate long docstrings 'unit_type': unit_type, + 'property_role': property_role, } return func_id, func_data - def process_class(self, node: ast.ClassDef, file_path: str, content: str) -> Tuple[str, Dict, List[Tuple]]: - """Process a class definition and extract metadata.""" - class_name = node.name + def process_class(self, node: ast.ClassDef, file_path: str, content: str, + outer_qualifier: Optional[str] = None) -> Tuple[str, Dict, List[Tuple]]: + """Process a class definition and extract metadata. + + `outer_qualifier` is the dotted name of any enclosing class, so a class + nested inside another is keyed by its full path (e.g. 'Outer.Inner') and + its methods become 'Outer.Inner.method'. + """ + class_name = f"{outer_qualifier}.{node.name}" if outer_qualifier else node.name relative_path = str(Path(file_path).relative_to(self.repo_path)) class_id = f"{relative_path}:{class_name}" @@ -408,10 +647,13 @@ def extract_module_level_code(self, tree: ast.AST, content: str, lines = content.split('\n') total_lines = len(lines) - # Track which lines are covered by functions/classes + # Track which lines are covered by functions/classes. Walk the WHOLE + # tree (not just top-level children) so a def/class wrapped in a block + # (if/try/for/with/match) is covered too — otherwise its body, now its + # own unit, would also leak verbatim into this synthetic :__module__ text. covered_lines: Set[int] = set() - for node in ast.iter_child_nodes(tree): + for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): start_line = node.lineno end_line = getattr(node, 'end_lineno', start_line) @@ -523,37 +765,21 @@ def process_file(self, file_path: Path) -> None: # Extract imports self.imports[relative_path] = self.extract_imports(tree, relative_path) - # Process top-level functions and classes + # Process top-level functions and classes. The tree helpers recurse so + # defs nested in function bodies and classes nested in classes/functions + # are also extracted (not just the direct children). for node in ast.iter_child_nodes(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): - func_id, func_data = self.process_function(node, file_path, content) - self.functions[func_id] = func_data - self.stats['total_functions'] += 1 - self.stats['standalone_functions'] += 1 - if func_data['is_async']: - self.stats['async_functions'] += 1 + self._process_function_tree(node, file_path, content, class_name=None) + elif isinstance(node, ast.ClassDef): + self._process_class_tree(node, file_path, content, outer_qualifier=None) - # Track by type - unit_type = func_data['unit_type'] - self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 + # defs/classes wrapped in a top-level block (version guard, try/except + # ImportError fallback, with-guarded handler, etc.). + self._descend_into_blocks(tree.body, file_path, content) - elif isinstance(node, ast.ClassDef): - class_id, class_data, method_nodes = self.process_class(node, file_path, content) - self.classes[class_id] = class_data - self.stats['total_classes'] += 1 - - # Process methods - for method_node, class_name in method_nodes: - func_id, func_data = self.process_function(method_node, file_path, content, class_name) - self.functions[func_id] = func_data - self.stats['total_functions'] += 1 - self.stats['total_methods'] += 1 - if func_data['is_async']: - self.stats['async_functions'] += 1 - - # Track by type - unit_type = func_data['unit_type'] - self.stats['by_type'][unit_type] = self.stats['by_type'].get(unit_type, 0) + 1 + # Module-level lambdas bound to a name (handler = lambda ...). + self.extract_assigned_lambdas(tree, file_path, content) # Extract module-level code module_result = self.extract_module_level_code(tree, content, file_path) diff --git a/libs/openant-core/tests/parsers/python/test_block_scoped_defs.py b/libs/openant-core/tests/parsers/python/test_block_scoped_defs.py new file mode 100644 index 00000000..0384a253 --- /dev/null +++ b/libs/openant-core/tests/parsers/python/test_block_scoped_defs.py @@ -0,0 +1,118 @@ +"""Bug: functions/classes defined inside a BLOCK statement are dropped. + +The Python extractor only recursed into `FunctionDef`/`ClassDef` bodies, never +into block statements (`if`/`elif`/`else`, `try`/`except`/`finally`, `for`/ +`while`, `with`, `match`/`case`). So a `def` inside `if sys.version_info...`, a +`try/except ImportError` fallback, a `with`-guarded handler, or a CBV `if/else` +dispatcher was never a unit, never a call-graph node, and its body (including any +sink) leaked verbatim into the synthetic `:__module__` unit. + +Investigated independent + judge (real interpreter). Fix descends into block +bodies at ALL depths (Python already keeps function-nested defs, so this matches +its baseline), reusing the existing keep-both (`#L`) machinery, and closes +the `__module__` leak by covering every def/class span. +""" +import sys +import tempfile +from pathlib import Path + +import pytest + +_CORE_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(_CORE_ROOT)) + +from parsers.python.function_extractor import FunctionExtractor + + +def _extract(src: str) -> dict: + repo = Path(tempfile.mkdtemp()).resolve() + (repo / "m.py").write_text(src) + ex = FunctionExtractor(str(repo)) + ex.process_file(repo / "m.py") + return ex.functions + + +def _names(functions: dict): + return sorted(k.split(":", 1)[1] for k in functions) + + +@pytest.mark.parametrize("wrap", [ + "if X:\n {d}", + "if X:\n pass\nelse:\n {d}", + "try:\n {d}\nexcept Exception:\n pass", + "try:\n pass\nexcept Exception:\n {d}", + "try:\n pass\nfinally:\n {d}", + "for i in r:\n {d}", + "while X:\n {d}", + "with open('x') as f:\n {d}", +]) +def test_block_scoped_def_is_extracted(wrap): + src = "def top(): pass\n" + wrap.format(d="def blk(): return sink()") + assert "blk" in _names(_extract(src)), f"block def dropped: {_names(_extract(src))}" + + +def test_match_case_def_is_extracted(): + src = "def top(): pass\nmatch v:\n case 1:\n def handler(): return 1\n" + assert "handler" in _names(_extract(src)) + + +def test_async_and_decorated_block_defs_extracted(): + src = ( + "import functools\n" + "if X:\n" + " async def afn(): return 1\n" + "if Y:\n" + " @functools.cache\n" + " def dfn(): return 2\n" + ) + names = _names(_extract(src)) + assert "afn" in names and "dfn" in names, names + + +def test_class_in_block_and_its_methods_extracted(): + # A class inside a block, and its methods, must surface. The method + # `Hidden.m` in the functions inventory proves the block-nested class was + # descended into and processed (the class itself lands in `ex.classes`). + src = "if TYPE_CHECKING:\n class Hidden:\n def m(self): return 1\n" + names = _names(_extract(src)) + assert any(n.endswith("Hidden.m") for n in names), names + + +def test_function_internal_block_def_extracted_no_duplicate(): + src = ( + "def outer():\n" + " def direct(): return 1\n" + " if c:\n" + " def blocked(): return 2\n" + ) + names = _names(_extract(src)) + assert names.count("direct") == 1, f"direct duplicated: {names}" + assert "blocked" in names, f"function-internal block def dropped: {names}" + + +def test_sibling_block_same_name_keeps_both(): + src = ( + "if c:\n def view(): return a()\n" + "else:\n def view(): return b()\n" + ) + views = [n for n in _names(_extract(src)) if n.startswith("view")] + assert len(views) == 2, f"both if/else view defs must survive: {views}" + + +def test_block_def_colliding_with_top_level_keeps_both(): + src = "def dup(): return 1\nif c:\n def dup(): return 2\n" + dups = [n for n in _names(_extract(src)) if n.startswith("dup")] + assert len(dups) == 2, f"block def must not clobber top-level same-name: {dups}" + + +def test_module_unit_does_not_leak_block_def_body(): + # The block def's body (incl. its sink) must move into its own unit, not + # leak verbatim into the synthetic :__module__ text. + src = "if X:\n def hidden(req):\n return __import__('os').system(req)\n" + fns = _extract(src) + assert "hidden" in _names(fns), "hidden not surfaced" + mod = next((v for k, v in fns.items() if k.endswith(":__module__")), None) + if mod is not None: + assert "system(req)" not in mod.get("code", ""), ( + "block def body leaked into __module__" + ) diff --git a/libs/openant-core/tests/parsers/python/test_callgraph_symmetry.py b/libs/openant-core/tests/parsers/python/test_callgraph_symmetry.py new file mode 100644 index 00000000..e382ca27 --- /dev/null +++ b/libs/openant-core/tests/parsers/python/test_callgraph_symmetry.py @@ -0,0 +1,70 @@ +"""Canonical per-parser invariant: every call-graph node is a real function. + +`set(call_graph.keys()) ⊆ set(functions.keys())` and the same for +`reverse_call_graph` — no call-graph key may reference a function id that the +inventory doesn't contain. The fixture exercises top-level, nested, method, and +block-scoped (if/try/for/with) defs so the invariant is checked across every +emit path (a block def must appear in BOTH maps, not just one). +""" +import sys +import tempfile +from pathlib import Path + +_CORE_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(_CORE_ROOT)) + +from parsers.python.function_extractor import FunctionExtractor +from parsers.python.call_graph_builder import CallGraphBuilder + +_FIXTURE = ( + "def top():\n" + " return helper()\n" + "def helper():\n" + " return 1\n" + "class C:\n" + " def m(self):\n" + " return self.helper2()\n" + " def helper2(self):\n" + " return 2\n" + "if FLAG:\n" + " def block_fn():\n" + " return top()\n" + "try:\n" + " def fallback():\n" + " return 3\n" + "except Exception:\n" + " pass\n" +) + + +def _build(): + d = tempfile.mkdtemp() + (Path(d) / "m.py").write_text(_FIXTURE) + builder = CallGraphBuilder(FunctionExtractor(d).extract_all()) + builder.build_call_graph() + return builder + + +def test_callgraph_keys_subset_of_functions(): + b = _build() + fns = set(b.functions) + extra = set(b.call_graph) - fns + assert not extra, f"call_graph references non-inventory ids: {sorted(extra)}" + + +def test_reverse_callgraph_keys_subset_of_functions(): + b = _build() + fns = set(b.functions) + extra = set(b.reverse_call_graph) - fns + assert not extra, f"reverse_call_graph references non-inventory ids: {sorted(extra)}" + + +def test_block_scoped_def_is_a_callgraph_node_with_its_edge(): + # The block-scoped def must be in the inventory AND carry its real edge, + # never an orphan / backstop-empty entry. + b = _build() + block_id = next(k for k in b.functions if k.endswith(":block_fn")) + top_id = next(k for k in b.functions if k.endswith(":top")) + assert top_id in b.call_graph.get(block_id, []), ( + f"block_fn -> top edge missing: {b.call_graph.get(block_id)}" + ) diff --git a/libs/openant-core/tests/parsers/python/test_python_schema_completeness.py b/libs/openant-core/tests/parsers/python/test_python_schema_completeness.py new file mode 100644 index 00000000..58e44a74 --- /dev/null +++ b/libs/openant-core/tests/parsers/python/test_python_schema_completeness.py @@ -0,0 +1,71 @@ +"""Canonical per-parser schema completeness: every emitted function unit carries +the schema-contract fields downstream consumers (call graph, unit generator, +entry-point detector, dataset) read. Run across top-level, nested, method, +async, decorated, and block-scoped defs so no emit path drops a field. +""" +import sys +import tempfile +from pathlib import Path + +_CORE_ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(_CORE_ROOT)) + +from parsers.python.function_extractor import FunctionExtractor + +# Fields every function unit must expose (present-as-key; value may be None/[]). +_REQUIRED_FIELDS = { + "name", "qualified_name", "file_path", "start_line", "end_line", + "code", "parameters", "unit_type", "is_async", "decorators", + "class_name", +} + +_FIXTURE = ( + "import functools\n" + "def top(a, b):\n" + " return a + b\n" + "async def atop():\n" + " return 1\n" + "@functools.cache\n" + "def decorated():\n" + " return 2\n" + "class C:\n" + " def method(self):\n" + " def nested():\n" + " return 3\n" + " return nested()\n" + "if FLAG:\n" + " def block_fn(x):\n" + " return x\n" + " async def block_async():\n" + " return 4\n" +) + + +def _functions(): + repo = Path(tempfile.mkdtemp()).resolve() + (repo / "m.py").write_text(_FIXTURE) + ex = FunctionExtractor(str(repo)) + ex.process_file(repo / "m.py") + return {k: v for k, v in ex.functions.items() if not k.endswith(":__module__")} + + +def test_every_function_has_required_schema_fields(): + fns = _functions() + assert fns, "fixture produced no functions" + for fid, data in fns.items(): + missing = _REQUIRED_FIELDS - set(data) + assert not missing, f"{fid} missing schema fields: {sorted(missing)}" + + +def test_block_scoped_defs_present_and_well_formed(): + names = {k.split(":", 1)[1] for k in _functions()} + for expected in ("block_fn", "block_async"): + assert expected in names, f"{expected} not surfaced: {sorted(names)}" + + +def test_field_value_types(): + for fid, data in _functions().items(): + assert isinstance(data["name"], str) and data["name"], fid + assert isinstance(data["parameters"], list), fid + assert isinstance(data["start_line"], int), fid + assert isinstance(data["is_async"], bool), fid