-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplugin_manager.py
More file actions
155 lines (128 loc) · 5.33 KB
/
Copy pathplugin_manager.py
File metadata and controls
155 lines (128 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Plugin architecture for Counterscarp Engine.
Supports community-contributed analyzers and heuristic rules via a
simple plugin discovery mechanism. Plugins are Python modules placed
in configured directories that expose a ``register()`` function.
"""
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
from typing import Any, Dict, List, Protocol, runtime_checkable
try:
from logger import get_logger
logger = get_logger(__name__)
except ImportError:
import logging
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Plugin Protocols
# ---------------------------------------------------------------------------
@runtime_checkable
class AnalyzerPlugin(Protocol):
"""Protocol for community analyzer plugins."""
name: str
version: str
def analyze(
self, target: str, config: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Run analysis on target, return list of finding dicts."""
...
@runtime_checkable
class RulePlugin(Protocol):
"""Protocol for custom heuristic rule plugins."""
def get_rules(self) -> list:
"""Return list of HeuristicRule instances."""
...
# ---------------------------------------------------------------------------
# Plugin Manager
# ---------------------------------------------------------------------------
class PluginManager:
"""Discovers, registers, and manages plugins."""
def __init__(self) -> None:
self._analyzers: List[AnalyzerPlugin] = []
self._rule_plugins: List[RulePlugin] = []
self._loaded_modules: Dict[str, Any] = {}
def discover_plugins(self, plugin_dirs: List[str]) -> int:
"""Scan directories for plugin modules with a register() function.
Returns the number of plugins successfully loaded.
"""
count = 0
for dir_path in plugin_dirs:
p = Path(dir_path).expanduser().resolve()
if not p.is_dir():
logger.debug("Plugin directory does not exist: %s", p)
continue
for py_file in sorted(p.glob("*.py")):
if py_file.name.startswith("_"):
continue
try:
module = self._load_module(py_file)
if hasattr(module, "register"):
module.register(self)
count += 1
logger.info("Loaded plugin: %s", py_file.name)
else:
logger.debug(
"Skipping %s — no register() function",
py_file.name
)
except Exception as exc:
logger.warning(
"Failed to load plugin %s: %s", py_file.name, exc
)
logger.info(
"Plugin discovery complete: %d plugins loaded "
"(%d analyzers, %d rule sets)",
count, len(self._analyzers), len(self._rule_plugins),
)
return count
def _load_module(self, path: Path) -> Any:
"""Dynamically load a Python module from a file path."""
module_name = f"counterscarp_plugin_{path.stem}"
if module_name in self._loaded_modules:
return self._loaded_modules[module_name]
spec = importlib.util.spec_from_file_location(module_name, str(path))
if spec is None or spec.loader is None:
raise ImportError(f"Cannot create module spec for {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
self._loaded_modules[module_name] = module
return module
def register_analyzer(self, plugin: AnalyzerPlugin) -> None:
"""Register an analyzer plugin."""
if not isinstance(plugin, AnalyzerPlugin):
raise TypeError(
f"Expected AnalyzerPlugin protocol, "
f"got {type(plugin).__name__}"
)
self._analyzers.append(plugin)
logger.info(
"Registered analyzer plugin: %s v%s",
plugin.name, plugin.version
)
def register_rules(self, plugin: RulePlugin) -> None:
"""Register a rule plugin."""
if not isinstance(plugin, RulePlugin):
raise TypeError(
f"Expected RulePlugin protocol, got {type(plugin).__name__}"
)
self._rule_plugins.append(plugin)
logger.info("Registered rule plugin: %s", type(plugin).__name__)
def get_analyzers(self) -> List[AnalyzerPlugin]:
"""Return all registered analyzer plugins."""
return list(self._analyzers)
def get_rules(self) -> list:
"""Return all rules from registered rule plugins."""
all_rules = []
for plugin in self._rule_plugins:
try:
rules = plugin.get_rules()
all_rules.extend(rules)
except Exception as exc:
logger.warning("Failed to get rules from plugin: %s", exc)
return all_rules
def get_analyzer_count(self) -> int:
return len(self._analyzers)
def get_rule_plugin_count(self) -> int:
return len(self._rule_plugins)