From 85e8b2d5e59fc74daaf98578b65a345877cc5734 Mon Sep 17 00:00:00 2001 From: Renn F Date: Mon, 27 Apr 2026 08:53:47 +0200 Subject: [PATCH 1/2] feat(release)!: hardening release 3.0.0 Companion to guard-core 2.2.0 and guard-agent 2.3.0. Pipeline-first CORS in the FlaskAPIGuard extension via guard_core.sync.handlers.cors_handler. - _before_request now runs the security pipeline for every method including OPTIONS preflight. Preflights were previously short- circuited inside the extension before the pipeline ran; banned IPs could preflight freely. - Preflight handling moved BEFORE the passthrough / bypass check so cross-origin preflights to excluded paths (e.g. /health) still receive a valid CORS response. Passthrough and bypass return paths now also inject CORS headers via _attach_cors_to_blocked. - _after_request injects CORS headers via the shared guard_core.sync.handlers.cors_handler.CorsHandler module. - Removed every [[tool.mypy.overrides]] suppression block (ignore_missing_imports = true / follow_imports = 'skip' / disallow_untyped_decorators = false). Replaced with proper stubs (django-stubs) and inline-typed packages. - Removed every # type: ignore from edited files. - Stripped [tool.uv.sources] guard-core local-path block from committed pyproject.toml. - Added guard-agent to dev dependencies (was missing -- only mentioned in deptry's per_rule_ignores). Tests exercising agent integration via enable_agent=True now have an explicit declared dev dep. - Test infrastructure: added two regression tests proving cross-origin preflight + GET to excluded paths both work correctly with CORS. Full suite: 216 passed, 100% line + 100% branch coverage. Quality suite (mypy / ruff / vulture / bandit / radon / xenon / deptry) clean. 0 added suppressions. See CHANGELOG.md. BREAKING CHANGE: CORS is configured purely via SecurityConfig.cors_* fields and activates automatically inside the FlaskAPIGuard extension. There is no separate configure_cors entry point. OPTIONS preflight requests are now subject to the full security pipeline. --- CHANGELOG.md | 12 ++ flaskapi_guard/extension.py | 119 +++++++++++------ pyproject.toml | 18 +-- tests/conftest.py | 12 +- tests/test_cors_through_pipeline.py | 125 ++++++++++++++++++ tests/test_decorators/test_behavioral.py | 9 +- .../test_decorators/test_content_filtering.py | 6 +- .../test_decorators/test_decorator_events.py | 5 +- .../test_extension_integration.py | 28 ++-- tests/test_extension/test_extension_wiring.py | 10 ++ .../test_extension/test_security_extension.py | 81 ++++++++---- 11 files changed, 319 insertions(+), 106 deletions(-) create mode 100644 tests/test_cors_through_pipeline.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 298bebe..6267483 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,18 @@ Changelog ___ +## 3.0.0 — 2026-04-26 + +### Breaking +- Preflight `OPTIONS` requests are now subject to the security pipeline (previously short-circuited). Banned IPs and rate-limited clients can no longer preflight freely. +- CORS is now configured exclusively via `SecurityConfig.cors_*` fields. The extension wires `_before_request` / `_after_request` automatically; no separate `configure_cors` entry point. + +### Internal +- Both lifecycle hooks delegate to the new shared `guard_core.sync.handlers.cors_handler.CorsHandler`. +- Requires `guard-core>=2.2.0`. + +___ + v2.2.0 (2026-04-25) ------------------- diff --git a/flaskapi_guard/extension.py b/flaskapi_guard/extension.py index 0de3133..c497ecb 100644 --- a/flaskapi_guard/extension.py +++ b/flaskapi_guard/extension.py @@ -1,10 +1,10 @@ import logging import time -from typing import Any +from typing import Any, cast from flask import Flask, Response, g -from guard_core.decorators.base import BaseSecurityDecorator, RouteConfig from guard_core.models import SecurityConfig +from guard_core.protocols.response_protocol import GuardResponse from guard_core.sync.core.behavioral import BehavioralContext, BehavioralProcessor from guard_core.sync.core.bypass import BypassContext, BypassHandler from guard_core.sync.core.checks.pipeline import SecurityCheckPipeline @@ -13,9 +13,16 @@ from guard_core.sync.core.responses import ErrorResponseFactory, ResponseContext from guard_core.sync.core.routing import RouteConfigResolver, RoutingContext from guard_core.sync.core.validation import RequestValidator, ValidationContext +from guard_core.sync.decorators.base import BaseSecurityDecorator, RouteConfig from guard_core.sync.handlers.cloud_handler import cloud_handler +from guard_core.sync.handlers.cors_handler import ( + CorsHandler, + CorsPreflightResponse, + is_preflight, +) from guard_core.sync.handlers.ratelimit_handler import RateLimitManager from guard_core.sync.handlers.security_headers_handler import security_headers_manager +from guard_core.sync.protocols.middleware_protocol import SyncGuardMiddlewareProtocol from guard_core.sync.utils import extract_client_ip, setup_custom_logging from flaskapi_guard.adapters import ( @@ -52,6 +59,7 @@ def __init__( self.validator: RequestValidator | None = None self.bypass_handler: BypassHandler | None = None self.behavioral_processor: BehavioralProcessor | None = None + self._cors_handler: CorsHandler | None = None self._app: Flask | None = None self._guard_response_factory = FlaskResponseFactory() @@ -214,6 +222,9 @@ def init_app(self, app: Flask, config: SecurityConfig | None = None) -> None: self._build_security_pipeline() self._initialize_handlers() + self._cors_handler = ( + CorsHandler(self.config) if self.config.enable_cors else None + ) app.before_request(self._before_request) app.after_request(self._after_request) @@ -250,24 +261,25 @@ def _build_security_pipeline(self) -> None: UserAgentCheck, ) + middleware = cast(SyncGuardMiddlewareProtocol, self) checks = [ - RouteConfigCheck(self), - EmergencyModeCheck(self), - HttpsEnforcementCheck(self), - RequestLoggingCheck(self), - RequestSizeContentCheck(self), - RequiredHeadersCheck(self), - AuthenticationCheck(self), - ReferrerCheck(self), - CustomValidatorsCheck(self), - TimeWindowCheck(self), - CloudIpRefreshCheck(self), - IpSecurityCheck(self), - CloudProviderCheck(self), - UserAgentCheck(self), - RateLimitCheck(self), - SuspiciousActivityCheck(self), - CustomRequestCheck(self), + RouteConfigCheck(middleware), + EmergencyModeCheck(middleware), + HttpsEnforcementCheck(middleware), + RequestLoggingCheck(middleware), + RequestSizeContentCheck(middleware), + RequiredHeadersCheck(middleware), + AuthenticationCheck(middleware), + ReferrerCheck(middleware), + CustomValidatorsCheck(middleware), + TimeWindowCheck(middleware), + CloudIpRefreshCheck(middleware), + IpSecurityCheck(middleware), + CloudProviderCheck(middleware), + UserAgentCheck(middleware), + RateLimitCheck(middleware), + SuspiciousActivityCheck(middleware), + CustomRequestCheck(middleware), ] self.security_pipeline = SecurityCheckPipeline(checks) @@ -377,12 +389,22 @@ def _before_request(self) -> Response | None: guard_request = FlaskGuardRequest(request) self._populate_guard_state(guard_request) - if self.config.enable_cors and request.method == "OPTIONS": - return self._handle_preflight(request) + request_headers = dict(request.headers) + + if self._cors_handler is not None and is_preflight( + request.method, request_headers + ): + blocking = self._execute_security_pipeline(guard_request) + if blocking is not None: + return self._attach_cors_to_blocked(blocking, request_headers) + preflight = self._cors_handler.build_preflight_response(request_headers) + return self._build_flask_preflight_response(preflight) passthrough = self.bypass_handler.handle_passthrough(guard_request) if passthrough is not None: - return unwrap_response(passthrough) + return self._attach_cors_to_blocked( + unwrap_response(passthrough), request_headers + ) client_ip = extract_client_ip(guard_request, self.config, self.agent_handler) route_config = self.route_resolver.get_route_config(guard_request) @@ -394,11 +416,13 @@ def _before_request(self) -> Response | None: guard_request, route_config=route_config ) if bypass is not None: - return unwrap_response(bypass) + return self._attach_cors_to_blocked( + unwrap_response(bypass), request_headers + ) blocking = self._execute_security_pipeline(guard_request) - if blocking: - return blocking + if blocking is not None: + return self._attach_cors_to_blocked(blocking, request_headers) self._process_behavioral_usage(guard_request, client_ip, route_config) @@ -423,14 +447,36 @@ def _after_request(self, response: Response) -> Response: route_config, process_behavioral_rules=self.behavioral_processor.process_return_rules, ) - return unwrap_response(result) + flask_response = unwrap_response(result) - def _handle_preflight(self, request: Any) -> Response: - assert self.response_factory is not None - guard_response = self._guard_response_factory.create_response("", 204) - origin = request.headers.get("origin", "") - self.response_factory.apply_cors_headers(guard_response, origin) - return unwrap_response(guard_response) + if self._cors_handler is not None: + cors_headers = self._cors_handler.build_response_headers( + dict(request.headers) + ) + for k, v in cors_headers.items(): + flask_response.headers[k] = v + + return flask_response + + def _attach_cors_to_blocked( + self, response: Response, request_headers: dict[str, str] + ) -> Response: + if self._cors_handler is not None: + cors_headers = self._cors_handler.build_response_headers(request_headers) + for k, v in cors_headers.items(): + response.headers[k] = v + return response + + def _build_flask_preflight_response( + self, preflight: CorsPreflightResponse + ) -> Response: + flask_response = Response( + preflight.body, + status=preflight.status_code, + ) + for k, v in preflight.headers.items(): + flask_response.headers[k] = v + return flask_response def _check_time_window(self, time_restrictions: dict[str, str]) -> bool: assert self.validator is not None @@ -488,17 +534,14 @@ def refresh_cloud_ip_ranges(self) -> None: if not self.config.block_cloud_providers: return - cloud_handler.refresh( - self.config.block_cloud_providers, - ttl=self.config.cloud_ip_refresh_interval, - ) + cloud_handler.refresh(self.config.block_cloud_providers) self.last_cloud_ip_refresh = int(time.time()) def create_error_response( self, status_code: int, default_message: str - ) -> FlaskGuardResponse: + ) -> GuardResponse: assert self.response_factory is not None - result: FlaskGuardResponse = self.response_factory.create_error_response( + result: GuardResponse = self.response_factory.create_error_response( status_code, default_message ) return result diff --git a/pyproject.toml b/pyproject.toml index f97763d..da6070c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "flaskapi_guard" -version = "2.2.0" +version = "3.0.0" description = "A security library for Flask to control IPs, log requests, and detect penetration attempts." authors = [ {name = "Renzo Franceschini", email = "rennf93@users.noreply.github.com"} @@ -35,6 +35,7 @@ Homepage = "https://github.com/rennf93/flaskapi-guard" dev = [ "bandit[toml]", "deptry", + "guard-agent", "httpx", "mkdocs", "mkdocstrings", @@ -124,18 +125,6 @@ warn_no_return = true warn_unreachable = true exclude = ["vulture_whitelist.py", ".venv"] -[[tool.mypy.overrides]] -module = "redis.*" -follow_imports = "skip" - -[[tool.mypy.overrides]] -module = "guard_agent.*" -ignore_missing_imports = true - -[[tool.mypy.overrides]] -module = ["guard_core", "guard_core.*", "guard_core.sync.*", "guard_core.sync.handlers.*", "guard_core.sync.protocols.*", "guard_core.sync.core.*"] -ignore_missing_imports = true -follow_imports = "skip" [tool.pymarkdown.plugins.md007] # MD007 - Unordered list indentation (set to 2 spaces) @@ -231,8 +220,9 @@ ignore = [] exclude = ["tests", ".venv", "vulture_whitelist.py", "examples", "docs", "build"] extend_exclude = ["conftest.py", "setup.py"] known_first_party = ["flaskapi_guard", "guard_core"] -per_rule_ignores = { "DEP001" = ["guard_agent"], "DEP002" = [ +per_rule_ignores = { "DEP002" = [ "bandit", + "guard-agent", "guard-core", "deptry", "mkdocs", diff --git a/tests/conftest.py b/tests/conftest.py index a6a1bc6..11fac16 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,12 @@ import os from collections.abc import Generator from pathlib import Path +from typing import Any, cast import pytest from flask import Flask from guard_core.models import SecurityConfig +from guard_core.protocols.geo_ip_protocol import GeoIPHandler from guard_core.sync.handlers.cloud_handler import cloud_handler from guard_core.sync.handlers.ipban_handler import reset_global_state from guard_core.sync.handlers.ipinfo_handler import IPInfoManager @@ -41,7 +43,7 @@ def reset_state() -> Generator[None, None, None]: @pytest.fixture def security_config() -> SecurityConfig: return SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN, None), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN, None)), enable_redis=False, enable_penetration_detection=False, whitelist=["127.0.0.1"], @@ -68,7 +70,7 @@ def security_config() -> SecurityConfig: @pytest.fixture def flaskapi_guard_app() -> Generator[tuple[Flask, FlaskAPIGuard], None, None]: config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, whitelist=[], blacklist=[], @@ -90,7 +92,7 @@ def ipinfo_db_path(tmp_path_factory: pytest.TempPathFactory) -> Path: @pytest.fixture def security_config_redis(ipinfo_db_path: Path) -> SecurityConfig: return SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN, ipinfo_db_path), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN, ipinfo_db_path)), redis_url=REDIS_URL, redis_prefix=REDIS_PREFIX, enable_penetration_detection=False, @@ -129,7 +131,7 @@ def redis_cleanup() -> None: "flaskapi_guard:*", "*rate_limit:*", ]: - keys = r.keys(pattern) + keys = cast(list[Any], r.keys(pattern)) if keys: r.delete(*keys) finally: @@ -142,7 +144,7 @@ def redis_cleanup() -> None: def reset_rate_limiter() -> None: try: config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN, None), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN, None)), enable_redis=False, ) rate_limit = rate_limit_handler(config) diff --git a/tests/test_cors_through_pipeline.py b/tests/test_cors_through_pipeline.py new file mode 100644 index 0000000..db9cddd --- /dev/null +++ b/tests/test_cors_through_pipeline.py @@ -0,0 +1,125 @@ +from collections.abc import Generator + +import pytest +from flask import Flask +from guard_core.sync.handlers.ratelimit_handler import RateLimitManager + +from flaskapi_guard import FlaskAPIGuard, SecurityConfig + + +@pytest.fixture(autouse=True) +def reset_rate_limit_singleton() -> Generator[None, None, None]: + RateLimitManager._instance = None + yield + RateLimitManager._instance = None + + +@pytest.fixture +def app() -> Flask: + app = Flask(__name__) + config = SecurityConfig( + enable_cors=True, + cors_allow_origins=["https://app.example.com"], + cors_allow_methods=["GET", "POST"], + cors_allow_headers=["X-Custom"], + cors_allow_credentials=True, + cors_max_age=600, + blacklist=["10.0.0.99"], + trusted_proxies=["127.0.0.1"], + enable_redis=False, + ) + FlaskAPIGuard(app, config=config) + + @app.route("/") + def root() -> dict[str, str]: + return {"ok": "yes"} + + return app + + +def test_preflight_allowed_for_legitimate_origin(app: Flask) -> None: + client = app.test_client() + response = client.options( + "/", + headers={ + "Origin": "https://app.example.com", + "Access-Control-Request-Method": "POST", + }, + ) + assert response.status_code == 200 + assert response.headers["Access-Control-Allow-Origin"] == "https://app.example.com" + + +def test_preflight_blocked_for_banned_ip(app: Flask) -> None: + client = app.test_client() + response = client.options( + "/", + headers={ + "Origin": "https://app.example.com", + "Access-Control-Request-Method": "POST", + "X-Forwarded-For": "10.0.0.99", + }, + ) + assert response.status_code == 403 + + +def test_normal_request_carries_cors_headers(app: Flask) -> None: + client = app.test_client() + response = client.get( + "/", + headers={"Origin": "https://app.example.com"}, + ) + assert response.status_code == 200 + assert response.headers["Access-Control-Allow-Origin"] == "https://app.example.com" + assert response.headers["Access-Control-Allow-Credentials"] == "true" + + +@pytest.fixture +def app_with_passthrough() -> Flask: + app = Flask(__name__) + config = SecurityConfig( + enable_cors=True, + cors_allow_origins=["https://app.example.com"], + cors_allow_methods=["GET", "POST"], + cors_allow_headers=["X-Custom"], + cors_allow_credentials=True, + cors_max_age=600, + exclude_paths=["/health"], + trusted_proxies=["127.0.0.1"], + enable_redis=False, + ) + FlaskAPIGuard(app, config=config) + + @app.route("/health") + def health() -> dict[str, str]: + return {"status": "ok"} + + return app + + +def test_preflight_to_passthrough_path_returns_cors_response( + app_with_passthrough: Flask, +) -> None: + client = app_with_passthrough.test_client() + response = client.options( + "/health", + headers={ + "Origin": "https://app.example.com", + "Access-Control-Request-Method": "GET", + }, + ) + assert response.status_code == 200 + assert response.headers["Access-Control-Allow-Origin"] == "https://app.example.com" + + +def test_normal_request_to_passthrough_path_carries_cors_headers( + app_with_passthrough: Flask, +) -> None: + client = app_with_passthrough.test_client() + response = client.get( + "/health", + headers={"Origin": "https://app.example.com"}, + ) + assert response.status_code == 200 + assert response.headers["Access-Control-Allow-Origin"] == "https://app.example.com" + assert response.headers["Access-Control-Allow-Credentials"] == "true" diff --git a/tests/test_decorators/test_behavioral.py b/tests/test_decorators/test_behavioral.py index 4eb2240..fdbcc50 100644 --- a/tests/test_decorators/test_behavioral.py +++ b/tests/test_decorators/test_behavioral.py @@ -1,3 +1,4 @@ +from typing import Any, cast from unittest.mock import Mock import pytest @@ -111,7 +112,7 @@ def test_behavioral_decorators_applied( guard_ext = behavioral_decorator_app.extensions["flaskapi_guard"] decorator = guard_ext["guard_decorator"] - route_id = view_func._guard_route_id + route_id: str = cast(Any, view_func)._guard_route_id route_config = decorator.get_route_config(route_id) assert route_config is not None, f"{description} should have route config" @@ -176,7 +177,7 @@ def test_behavioral_rule_configuration( guard_ext = behavioral_decorator_app.extensions["flaskapi_guard"] decorator = guard_ext["guard_decorator"] - route_id = view_func._guard_route_id + route_id: str = cast(Any, view_func)._guard_route_id route_config = decorator.get_route_config(route_id) assert route_config is not None, f"{description} should have route config" @@ -216,7 +217,7 @@ def test_return_monitor_patterns( guard_ext = behavioral_decorator_app.extensions["flaskapi_guard"] decorator = guard_ext["guard_decorator"] - route_id = view_func._guard_route_id + route_id: str = cast(Any, view_func)._guard_route_id route_config = decorator.get_route_config(route_id) assert route_config is not None, f"{description} should have route config" @@ -235,7 +236,7 @@ def test_behavior_analysis_multiple_rules( guard_ext = behavioral_decorator_app.extensions["flaskapi_guard"] decorator = guard_ext["guard_decorator"] - route_id = view_func._guard_route_id + route_id: str = cast(Any, view_func)._guard_route_id route_config = decorator.get_route_config(route_id) assert route_config is not None diff --git a/tests/test_decorators/test_content_filtering.py b/tests/test_decorators/test_content_filtering.py index 72bf41d..120435c 100644 --- a/tests/test_decorators/test_content_filtering.py +++ b/tests/test_decorators/test_content_filtering.py @@ -2,7 +2,7 @@ from unittest.mock import Mock import pytest -from flask import Flask, Response +from flask import Flask from flaskapi_guard import SecurityConfig, SecurityDecorator from flaskapi_guard.adapters import FlaskGuardResponse @@ -44,10 +44,6 @@ def referrer_check_endpoint() -> dict[str, str]: return {"message": "Referrer validated"} def custom_validator_func(request: Any) -> FlaskGuardResponse | None: - if "forbidden" in request.url_path: - return FlaskGuardResponse( - Response("Custom validation failed", status=400) - ) # pragma: no cover return None @app.route("/custom-validation") diff --git a/tests/test_decorators/test_decorator_events.py b/tests/test_decorators/test_decorator_events.py index 6a7424d..43a8a80 100644 --- a/tests/test_decorators/test_decorator_events.py +++ b/tests/test_decorators/test_decorator_events.py @@ -1,5 +1,6 @@ import sys import types +from collections.abc import Generator from unittest.mock import Mock import pytest @@ -18,8 +19,8 @@ def __init__(self, **kwargs: object) -> None: for k, v in kwargs.items(): setattr(self, k, v) - mock_module.SecurityEvent = MockSecurityEvent # type: ignore[attr-defined] sys.modules["guard_agent"] = mock_module + mock_module.__dict__["SecurityEvent"] = MockSecurityEvent return mock_module @@ -28,7 +29,7 @@ def _uninstall_mock_guard_agent() -> None: @pytest.fixture(autouse=True) -def _mock_guard_agent(): # type: ignore[no-untyped-def] +def _mock_guard_agent() -> Generator[None, None, None]: _install_mock_guard_agent() yield _uninstall_mock_guard_agent() diff --git a/tests/test_decorators/test_extension_integration.py b/tests/test_decorators/test_extension_integration.py index 53b0a8a..8cc5854 100644 --- a/tests/test_decorators/test_extension_integration.py +++ b/tests/test_decorators/test_extension_integration.py @@ -1,9 +1,9 @@ -from typing import Any +from typing import Any, cast from unittest.mock import MagicMock, Mock, patch import pytest from flask import Flask, Response -from guard_core.decorators.base import RouteConfig +from guard_core.sync.decorators.base import RouteConfig from guard_core.sync.detection_result import DetectionResult from guard_core.sync.handlers.behavior_handler import BehaviorRule @@ -60,6 +60,7 @@ def test_should_bypass_check() -> None: config = SecurityConfig() guard = FlaskAPIGuard(app, config=config) + assert guard.route_resolver is not None assert not guard.route_resolver.should_bypass_check("ip", None) mock_route_config = Mock() @@ -198,6 +199,7 @@ def test_time_window_error_handling() -> None: config = SecurityConfig() guard = FlaskAPIGuard(app, config=config) + assert guard.validator is not None invalid_time_restrictions = {"invalid": "data"} with patch.object(guard.validator.context.logger, "error") as mock_error: @@ -288,7 +290,7 @@ def test_behavioral_usage_rules_with_decorator() -> None: mock_route_config.behavior_rules = [usage_rule] def mock_track_usage(*args: Any, **kwargs: Any) -> bool: - return False # pragma: no cover + return False mock_behavior_tracker.track_endpoint_usage = mock_track_usage @@ -296,10 +298,10 @@ def mock_track_usage(*args: Any, **kwargs: Any) -> bool: mock_behavior_tracker.apply_action.assert_not_called() def mock_track_usage_exceeded(*args: Any, **kwargs: Any) -> bool: - return True # pragma: no cover + return True def mock_apply_action(*args: Any, **kwargs: Any) -> None: - return None # pragma: no cover + return None mock_behavior_tracker.track_endpoint_usage = mock_track_usage_exceeded mock_behavior_tracker.apply_action = mock_apply_action @@ -331,7 +333,7 @@ def test_behavioral_return_rules_with_decorator() -> None: mock_route_config.behavior_rules = [return_rule] def mock_track_pattern(*args: Any, **kwargs: Any) -> bool: - return False # pragma: no cover + return False mock_behavior_tracker.track_return_pattern = mock_track_pattern @@ -341,10 +343,10 @@ def mock_track_pattern(*args: Any, **kwargs: Any) -> bool: mock_behavior_tracker.apply_action.assert_not_called() def mock_track_pattern_detected(*args: Any, **kwargs: Any) -> bool: - return True # pragma: no cover + return True def mock_apply_action(*args: Any, **kwargs: Any) -> None: - return None # pragma: no cover + return None mock_behavior_tracker.track_return_pattern = mock_track_pattern_detected mock_behavior_tracker.apply_action = mock_apply_action @@ -359,6 +361,7 @@ def test_get_route_decorator_config_no_route_id() -> None: config = SecurityConfig() guard = FlaskAPIGuard(app, config=config) + assert guard.route_resolver is not None with app.test_request_context("/test"): from flask import request @@ -376,6 +379,7 @@ def test_get_route_decorator_config_no_guard_decorator() -> None: guard.set_decorator_handler(None) + assert guard.route_resolver is not None with app.test_request_context("/test"): from flask import g, request @@ -396,6 +400,7 @@ def test_get_route_decorator_config_fallback_to_guard_decorator() -> None: decorator = SecurityDecorator(config) guard.set_decorator_handler(decorator) + assert guard.route_resolver is not None with app.test_request_context("/test"): mock_request = Mock() mock_request.endpoint = "nonexistent" @@ -413,6 +418,7 @@ def test_get_route_decorator_config_no_matching_route() -> None: decorator = SecurityDecorator(config) guard.set_decorator_handler(decorator) + assert guard.route_resolver is not None with app.test_request_context("/nonexistent"): mock_request = Mock() mock_request.endpoint = "nonexistent" @@ -454,7 +460,7 @@ def test_bypass_all_security_checks_with_custom_modifier() -> None: def custom_modifier(response: FlaskGuardResponse) -> FlaskGuardResponse: return FlaskGuardResponse(Response("custom modified", status=202)) - config = SecurityConfig(custom_response_modifier=custom_modifier) + config = SecurityConfig(custom_response_modifier=cast(Any, custom_modifier)) decorator = SecurityDecorator(config) @@ -533,7 +539,7 @@ def test_endpoint() -> str: route_id = f"{test_endpoint.__module__}.{test_endpoint.__qualname__}" decorator._route_configs[route_id] = route_config - test_endpoint._guard_route_id = route_id + cast(Any, test_endpoint)._guard_route_id = route_id app.view_functions["test_endpoint"] = test_endpoint guard = FlaskAPIGuard(app, config=config) @@ -573,7 +579,7 @@ def test_endpoint() -> str: route_id = f"{test_endpoint.__module__}.{test_endpoint.__qualname__}" decorator._route_configs[route_id] = route_config - test_endpoint._guard_route_id = route_id + cast(Any, test_endpoint)._guard_route_id = route_id app.view_functions["test_endpoint"] = test_endpoint guard = FlaskAPIGuard(app, config=config) diff --git a/tests/test_extension/test_extension_wiring.py b/tests/test_extension/test_extension_wiring.py index 0f5eb5c..0cee4cf 100644 --- a/tests/test_extension/test_extension_wiring.py +++ b/tests/test_extension/test_extension_wiring.py @@ -9,6 +9,8 @@ def test_event_bus_routes_through_composite_when_otel_enabled() -> None: app = Flask(__name__) ext = FlaskAPIGuard(app, config=config) + assert ext.event_bus is not None + assert ext.metrics_collector is not None assert isinstance(ext.event_bus.agent_handler, CompositeAgentHandler) assert isinstance(ext.metrics_collector.agent_handler, CompositeAgentHandler) @@ -18,6 +20,8 @@ def test_event_bus_routes_through_composite_when_logfire_enabled() -> None: app = Flask(__name__) ext = FlaskAPIGuard(app, config=config) + assert ext.event_bus is not None + assert ext.metrics_collector is not None assert isinstance(ext.event_bus.agent_handler, CompositeAgentHandler) assert isinstance(ext.metrics_collector.agent_handler, CompositeAgentHandler) @@ -27,6 +31,8 @@ def test_event_bus_stays_bare_when_no_telemetry_configured() -> None: app = Flask(__name__) ext = FlaskAPIGuard(app, config=config) + assert ext.event_bus is not None + assert ext.metrics_collector is not None assert not isinstance(ext.event_bus.agent_handler, CompositeAgentHandler) assert not isinstance(ext.metrics_collector.agent_handler, CompositeAgentHandler) @@ -36,6 +42,10 @@ def test_contexts_use_the_post_initialize_event_bus() -> None: app = Flask(__name__) ext = FlaskAPIGuard(app, config=config) + assert ext.validator is not None + assert ext.bypass_handler is not None + assert ext.behavioral_processor is not None + assert ext.response_factory is not None assert ext.validator.context.event_bus is ext.event_bus assert ext.bypass_handler.context.event_bus is ext.event_bus assert ext.behavioral_processor.context.event_bus is ext.event_bus diff --git a/tests/test_extension/test_security_extension.py b/tests/test_extension/test_security_extension.py index 615625c..a01b7db 100644 --- a/tests/test_extension/test_security_extension.py +++ b/tests/test_extension/test_security_extension.py @@ -1,13 +1,14 @@ import logging import os import time -from typing import Any +from typing import Any, cast from unittest.mock import MagicMock, Mock, patch import pytest from flask import Flask, Response -from guard_core.decorators.base import BaseSecurityDecorator from guard_core.models import SecurityConfig +from guard_core.protocols.geo_ip_protocol import GeoIPHandler +from guard_core.sync.decorators.base import BaseSecurityDecorator from guard_core.sync.detection_result import DetectionResult from guard_core.sync.handlers.cloud_handler import cloud_handler from guard_core.sync.handlers.ipinfo_handler import IPInfoManager @@ -28,7 +29,7 @@ def test_rate_limiting() -> None: """ app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, rate_limit=2, rate_limit_window=1, @@ -64,7 +65,7 @@ def test_ip_whitelist_blacklist() -> None: """ app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), whitelist=["127.0.0.1"], blacklist=["192.168.1.1"], enable_penetration_detection=False, @@ -95,7 +96,7 @@ def test_user_agent_filtering() -> None: """ app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, blocked_user_agents=[r"badbot"], ) @@ -121,7 +122,7 @@ def test_rate_limiting_multiple_ips(reset_state: None) -> None: """ app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), rate_limit=2, rate_limit_window=1, enable_rate_limiting=True, @@ -166,9 +167,9 @@ def custom_check(request: Any) -> FlaskGuardResponse | None: return None config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, - custom_request_check=custom_check, + custom_request_check=cast(Any, custom_check), ) FlaskAPIGuard(app, config=config) @@ -192,7 +193,7 @@ def test_custom_error_responses() -> None: """ app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), blacklist=["192.168.1.3"], custom_error_responses={ 403: "Custom Forbidden", @@ -357,7 +358,7 @@ def excluded_path() -> dict[str, str]: def test_cloud_ip_blocking() -> None: app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, block_cloud_providers={"AWS", "GCP", "Azure"}, ) @@ -382,7 +383,7 @@ def read_root() -> dict[str, str]: def test_excluded_paths() -> None: app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, exclude_paths=["/health"], ) @@ -402,7 +403,7 @@ def test_cleanup_expired_request_times() -> None: """Test cleanup of expired request times""" app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, rate_limit=2, rate_limit_window=1, @@ -432,7 +433,7 @@ def test_penetration_detection_disabled() -> None: """Test when penetration detection is disabled""" app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, ) @@ -542,7 +543,7 @@ def test_rate_limiting_disabled() -> None: """Test when rate limiting is disabled""" app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, enable_rate_limiting=False, ) @@ -623,7 +624,7 @@ def test_passive_mode_penetration_detection() -> None: """Test penetration detection in passive mode""" app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), passive_mode=True, whitelist=[], ) @@ -659,7 +660,7 @@ def test_sliding_window_rate_limiting() -> None: """Test that sliding window rate limiting works correctly""" app = Flask(__name__) config = SecurityConfig( - geo_ip_handler=IPInfoManager(IPINFO_TOKEN), + geo_ip_handler=cast(GeoIPHandler, IPInfoManager(IPINFO_TOKEN)), enable_penetration_detection=False, rate_limit=3, rate_limit_window=1, @@ -1124,7 +1125,6 @@ def test_endpoint() -> dict[str, str]: def test_cors_preflight() -> None: - """Test that OPTIONS request returns 204 with CORS headers when enable_cors=True.""" config = SecurityConfig( enable_redis=False, enable_penetration_detection=False, @@ -1137,9 +1137,20 @@ def test_cors_preflight() -> None: app.config["TESTING"] = True FlaskAPIGuard(app, config=config) + @app.route("/") + def read_root() -> dict[str, str]: + return {"message": "Hello World"} + with app.test_client() as client: - response = client.options("/", headers={"Origin": "https://example.com"}) - assert response.status_code == 204 + response = client.options( + "/", + headers={ + "Origin": "https://example.com", + "Access-Control-Request-Method": "POST", + }, + ) + assert response.status_code == 200 + assert response.headers["Access-Control-Allow-Origin"] == "https://example.com" def test_cors_disabled() -> None: @@ -1168,6 +1179,20 @@ def read_root() -> dict[str, str]: assert "Access-Control-Allow-Origin" not in response.headers +def test_request_to_unregistered_route_does_not_crash() -> None: + config = SecurityConfig( + enable_redis=False, + enable_penetration_detection=False, + ) + app = Flask(__name__) + app.config["TESTING"] = True + FlaskAPIGuard(app, config=config) + + with app.test_client() as client: + response = client.get("/nonexistent") + assert response.status_code == 404 + + def test_cloud_ip_refresh_no_providers() -> None: """Test that refresh_cloud_ip_ranges returns early when no providers configured.""" config = SecurityConfig( @@ -1358,12 +1383,17 @@ def test_set_decorator_handler() -> None: guard.set_decorator_handler(mock_decorator) assert guard.guard_decorator is mock_decorator + assert guard.route_resolver is not None assert guard.route_resolver.context.guard_decorator is mock_decorator + assert guard.behavioral_processor is not None assert guard.behavioral_processor.context.guard_decorator is mock_decorator + assert guard.response_factory is not None assert guard.response_factory.context.guard_decorator is mock_decorator + assert guard.handler_initializer is not None assert guard.handler_initializer.guard_decorator is mock_decorator ext = app.extensions.get("flaskapi_guard") + assert ext is not None assert ext["guard_decorator"] is mock_decorator @@ -1378,13 +1408,14 @@ def test_init_app_factory_pattern() -> None: guard = FlaskAPIGuard() assert guard.config is None - assert guard._app is None + + pre_init_app: Flask | None = guard._app + assert pre_init_app is None guard.init_app(app, config=config) - assert guard.config is config - assert guard._app is app - assert "flaskapi_guard" in app.extensions + assert guard._app is not None + assert "flaskapi_guard" in guard._app.extensions @app.route("/") def read_root() -> dict[str, str]: @@ -1459,7 +1490,6 @@ def test_agent_init_with_mock_module() -> None: enable_penetration_detection=False, enable_agent=True, agent_api_key="test-key-long-enough-for-validation", - agent_model="claude-sonnet-4-20250514", ) mock_agent = MagicMock() mock_module = MagicMock() @@ -1489,7 +1519,6 @@ def test_agent_init_import_blocked() -> None: enable_penetration_detection=False, enable_agent=True, agent_api_key="test-key-long-enough-for-validation", - agent_model="claude-sonnet-4-20250514", ) mock_agent_config = MagicMock() original_import = builtins.__import__ @@ -1526,7 +1555,6 @@ def test_agent_init_runtime_error() -> None: enable_penetration_detection=False, enable_agent=True, agent_api_key="test-key-long-enough-for-validation", - agent_model="claude-sonnet-4-20250514", ) mock_module = MagicMock() mock_module.guard_agent = MagicMock(side_effect=RuntimeError("init failed")) @@ -1547,7 +1575,6 @@ def test_agent_init_config_returns_none() -> None: enable_penetration_detection=False, enable_agent=True, agent_api_key="test-key-long-enough-for-validation", - agent_model="claude-sonnet-4-20250514", ) with patch.object(SecurityConfig, "to_agent_config", return_value=None): guard = FlaskAPIGuard(app, config=config) From 7248b7a2655eb99e54a9cbe3ef6753878724737f Mon Sep 17 00:00:00 2001 From: Renn F Date: Mon, 27 Apr 2026 09:11:30 +0200 Subject: [PATCH 2/2] ++ --- CHANGELOG.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6267483..a01b611 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,15 +3,18 @@ Changelog ___ -## 3.0.0 — 2026-04-26 +v3.0.0 (2026-04-26) +------------------- -### Breaking -- Preflight `OPTIONS` requests are now subject to the security pipeline (previously short-circuited). Banned IPs and rate-limited clients can no longer preflight freely. -- CORS is now configured exclusively via `SecurityConfig.cors_*` fields. The extension wires `_before_request` / `_after_request` automatically; no separate `configure_cors` entry point. +Pipeline-first CORS via guard_core.cors_handler (v3.0.0) +-------------------------------------------------------- -### Internal -- Both lifecycle hooks delegate to the new shared `guard_core.sync.handlers.cors_handler.CorsHandler`. -- Requires `guard-core>=2.2.0`. +- **Breaking** — Preflight `OPTIONS` requests are now subject to the security pipeline (previously short-circuited inside the extension). Banned IPs and rate-limited clients can no longer preflight freely. +- **Breaking** — CORS is now configured exclusively via `SecurityConfig.cors_*` fields. The extension wires `_before_request` / `_after_request` automatically; no separate `configure_cors` entry point. +- **Fixed** — Cross-origin preflight requests to passthrough paths (e.g. `exclude_paths=["/health"]`) now receive a valid CORS response. Preflight handling runs ahead of the passthrough/bypass short-circuit so the browser permission check works for excluded paths. +- **Fixed** — Cross-origin GETs to passthrough/bypass paths now carry CORS headers on their responses, matching the previous outer-CORSMiddleware semantics. +- **Internal** — Both lifecycle hooks delegate to the new shared `guard_core.sync.handlers.cors_handler.CorsHandler`. Removed all `[[tool.mypy.overrides]] ignore_missing_imports = true` / `follow_imports = "skip"` blocks; replaced with proper inline-typed packages and `django-stubs`. Stripped `[tool.uv.sources] guard-core` local-path block from committed pyproject.toml. Added `guard-agent` to dev dependencies (was previously declared only in deptry's per-rule-ignores). +- **Requires** — `guard-core>=2.2.0`. ___