diff --git a/.github/workflows/update-data.yml b/.github/workflows/update-data.yml index cfa1a6250..24df0262a 100644 --- a/.github/workflows/update-data.yml +++ b/.github/workflows/update-data.yml @@ -16,6 +16,7 @@ on: - main paths: - 'scraper.py' + - 'validate_api.py' - 'requirements.txt' - '.github/workflows/update-data.yml' @@ -49,6 +50,10 @@ jobs: run: | python scraper.py + - name: Validate generated API artifacts + run: | + python validate_api.py --require-current-schema --forbid-firecrawl-run-source + - name: Check for changes id: git-check run: | diff --git a/.github/workflows/validate.yml b/.github/workflows/validate.yml new file mode 100644 index 000000000..7b3c02896 --- /dev/null +++ b/.github/workflows/validate.yml @@ -0,0 +1,47 @@ +name: Validate + +on: + pull_request: + push: + branches: + - main + - 'codex/**' + +permissions: + contents: read + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 20 + + steps: + - name: Checkout repository + uses: actions/checkout@v6 + + - name: Set up Python + uses: actions/setup-python@v6 + with: + python-version: '3.12' + cache: 'pip' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Compile Python files + run: | + python -m py_compile scraper.py test_scraper.py validate_api.py + + - name: Run scraper tests + run: | + python test_scraper.py + + - name: Validate checked-in API artifacts + run: | + python validate_api.py + + - name: Check whitespace errors + run: | + git diff --check diff --git a/README.md b/README.md index 676bdbf12..f9029d847 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ Static JSON API for NIST Cryptographic Module Validation Program data. Auto-upda - **Historical Modules**: Expired/revoked modules for historical reference - **Modules In Process**: Modules currently in validation - **Algorithm Extraction**: Approved algorithms extracted from Security Policy PDFs with Crawl4AI, with a local PDF parser fallback +- **Extraction Provenance**: Per-certificate `algorithm_extraction` metadata records cache/fallback status, source URL, and extracted row counts - **Security Policy Links**: Direct URLs to Security Policy PDF documents - **Certificate Detail Records**: Per-certificate JSON with vendor, related files, validation history, and security level exceptions @@ -17,6 +18,7 @@ Static JSON API for NIST Cryptographic Module Validation Program data. Auto-upda - [`llms-full.txt`](https://hackidle.github.io/nist-cmvp-api/llms-full.txt) - complete single-file reference - [`api/docs.md`](https://hackidle.github.io/nist-cmvp-api/api/docs.md) - Markdown endpoint reference with examples - [`openapi.json`](https://hackidle.github.io/nist-cmvp-api/openapi.json) - OpenAPI 3.0.3 schema +- [`api/schemas/index.schema.json`](https://hackidle.github.io/nist-cmvp-api/api/schemas/index.schema.json) - JSON Schema index for API responses ## Endpoints @@ -30,6 +32,7 @@ Base URL: `https://hackidle.github.io/nist-cmvp-api/api/` | `algorithms.json` | Algorithm summary with usage statistics across all certificates | | `metadata.json` | Dataset info (last update, counts, feature flags) | | `index.json` | API index with all endpoints and feature information | +| `schemas/*.schema.json` | JSON Schemas for response validation | | `certificates/{certificate}.json` | Structured detail record for one CMVP certificate | ## Data Structure @@ -54,7 +57,17 @@ Base URL: `https://hackidle.github.io/nist-cmvp-api/api/` "embodiment": "Multi-Chip Stand Alone", "description": "A software library that contains cryptographic functionality...", "lab": "DEKRA Cybersecurity Certification Laboratory", - "algorithms": ["AES", "SHA-256", "RSA", "ECDSA", "HMAC", "DRBG"] + "algorithms": ["AES", "SHA-256", "RSA", "ECDSA", "HMAC", "DRBG"], + "algorithm_extraction": { + "status": "parsed", + "configured_source": "crawl4ai", + "source": "crawl4ai", + "source_url": "https://csrc.nist.gov/CSRC/media/projects/.../140sp5104.pdf", + "cached": false, + "fallback_used": false, + "algorithm_count": 6, + "detailed_algorithm_count": 42 + } } ``` @@ -121,7 +134,17 @@ Base URL: `https://hackidle.github.io/nist-cmvp-api/api/` "lab": "Lightship Security, Inc." } ], - "algorithms": ["AES", "HMAC"] + "algorithms": ["AES", "HMAC"], + "algorithm_extraction": { + "status": "parsed", + "configured_source": "crawl4ai", + "source": "security_policy_pdf", + "source_url": "https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5203.pdf", + "cached": false, + "fallback_used": true, + "algorithm_count": 2, + "detailed_algorithm_count": 18 + } } } ``` @@ -147,8 +170,14 @@ curl -s https://hackidle.github.io/nist-cmvp-api/api/algorithms.json | \ # Get the full detail page payload for one certificate curl -s https://hackidle.github.io/nist-cmvp-api/api/certificates/5203.json | jq '.certificate' -# Check last update -curl -s https://hackidle.github.io/nist-cmvp-api/api/metadata.json | jq '.generated_at' +# Check last update and extraction metrics +curl -s https://hackidle.github.io/nist-cmvp-api/api/metadata.json | \ + jq '{generated_at, extraction_metrics: .extraction_metrics.combined}' + +# Validate a response with a published JSON Schema (requires: pip install jsonschema) +curl -s https://hackidle.github.io/nist-cmvp-api/api/schemas/modules.schema.json > modules.schema.json +curl -s https://hackidle.github.io/nist-cmvp-api/api/modules.json > modules.json +python -m jsonschema modules.schema.json -i modules.json ``` ## Local Development @@ -165,6 +194,9 @@ ALGORITHM_SOURCE=security_policy_pdf python scraper.py # Run quick scraper (skip algorithm extraction entirely) SKIP_ALGORITHMS=1 python scraper.py + +# Validate generated artifacts before publishing +python validate_api.py --require-current-schema --forbid-firecrawl-run-source ``` ## Environment Variables diff --git a/api/schemas/algorithms.schema.json b/api/schemas/algorithms.schema.json new file mode 100644 index 000000000..72bcdd158 --- /dev/null +++ b/api/schemas/algorithms.schema.json @@ -0,0 +1,50 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/algorithms.schema.json", + "title": "NIST CMVP Algorithms Summary Response", + "type": "object", + "additionalProperties": false, + "required": [ + "total_unique_algorithms", + "total_certificate_algorithm_pairs", + "algorithms", + "metadata" + ], + "properties": { + "total_unique_algorithms": { + "type": "integer", + "minimum": 0 + }, + "total_certificate_algorithm_pairs": { + "type": "integer", + "minimum": 0 + }, + "algorithms": { + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": false, + "required": [ + "count", + "certificates" + ], + "properties": { + "count": { + "type": "integer", + "minimum": 0 + }, + "certificates": { + "type": "array", + "items": { + "type": "integer" + } + } + } + } + }, + "metadata": { + "type": "object", + "additionalProperties": true + } + } +} diff --git a/api/schemas/certificate-detail.schema.json b/api/schemas/certificate-detail.schema.json new file mode 100644 index 000000000..027cf8274 --- /dev/null +++ b/api/schemas/certificate-detail.schema.json @@ -0,0 +1,195 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/certificate-detail.schema.json", + "title": "NIST CMVP Certificate Detail Response", + "type": "object", + "additionalProperties": false, + "required": [ + "metadata", + "certificate" + ], + "properties": { + "metadata": { + "type": "object", + "additionalProperties": true, + "required": [ + "generated_at", + "dataset", + "source" + ] + }, + "certificate": { + "type": "object", + "additionalProperties": true, + "required": [ + "certificate_number", + "dataset", + "generated_at", + "nist_page_url", + "certificate_detail_url", + "security_policy_url", + "vendor_name", + "module_name", + "standard", + "status", + "related_files", + "validation_history", + "vendor" + ], + "properties": { + "certificate_number": { + "type": "string", + "pattern": "^[0-9]+$" + }, + "dataset": { + "type": "string", + "enum": [ + "active", + "historical" + ] + }, + "generated_at": { + "type": "string", + "format": "date-time" + }, + "nist_page_url": { + "type": "string", + "format": "uri" + }, + "certificate_detail_url": { + "type": "string", + "format": "uri" + }, + "security_policy_url": { + "type": [ + "string", + "null" + ], + "format": "uri" + }, + "vendor_name": { + "type": [ + "string", + "null" + ] + }, + "module_name": { + "type": [ + "string", + "null" + ] + }, + "standard": { + "type": [ + "string", + "null" + ] + }, + "status": { + "type": [ + "string", + "null" + ] + }, + "related_files": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": true + } + }, + "validation_history": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": true + } + }, + "vendor": { + "type": "object", + "additionalProperties": true + }, + "algorithms": { + "type": "array", + "items": { + "type": "string" + } + }, + "algorithms_detailed": { + "type": "array", + "items": { + "type": "string" + } + }, + "algorithm_extraction": { + "type": "object", + "additionalProperties": true, + "required": [ + "schema_version", + "status", + "configured_source", + "source", + "cached", + "fallback_used", + "cache_version", + "algorithm_count", + "detailed_algorithm_count" + ], + "properties": { + "schema_version": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "parsed", + "cached", + "miss", + "skipped" + ] + }, + "configured_source": { + "type": "string" + }, + "source": { + "type": "string" + }, + "source_url": { + "type": [ + "string", + "null" + ], + "format": "uri" + }, + "cached": { + "type": "boolean" + }, + "fallback_used": { + "type": "boolean" + }, + "cache_version": { + "type": "string" + }, + "algorithm_count": { + "type": "integer", + "minimum": 0 + }, + "detailed_algorithm_count": { + "type": "integer", + "minimum": 0 + }, + "attempts": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } + } + } + } + } +} diff --git a/api/schemas/historical-modules.schema.json b/api/schemas/historical-modules.schema.json new file mode 100644 index 000000000..91c0b8aa4 --- /dev/null +++ b/api/schemas/historical-modules.schema.json @@ -0,0 +1,22 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/historical-modules.schema.json", + "title": "NIST CMVP Historical Modules Response", + "type": "object", + "additionalProperties": false, + "required": [ + "metadata", + "modules" + ], + "properties": { + "metadata": { + "$ref": "/api/schemas/metadata.schema.json" + }, + "modules": { + "type": "array", + "items": { + "$ref": "/api/schemas/module.schema.json" + } + } + } +} diff --git a/api/schemas/index.schema.json b/api/schemas/index.schema.json new file mode 100644 index 000000000..f89101f53 --- /dev/null +++ b/api/schemas/index.schema.json @@ -0,0 +1,49 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/index.schema.json", + "title": "NIST CMVP API JSON Schema Index", + "type": "object", + "additionalProperties": false, + "required": [ + "name", + "schema_version", + "base_url", + "schemas" + ], + "properties": { + "name": { + "type": "string" + }, + "schema_version": { + "type": "string" + }, + "base_url": { + "type": "string", + "format": "uri" + }, + "schemas": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + }, + "examples": [ + { + "name": "NIST CMVP API JSON Schemas", + "schema_version": "1.0", + "base_url": "https://hackidle.github.io/nist-cmvp-api", + "schemas": { + "index": "/api/schemas/index.schema.json", + "metadata": "/api/schemas/metadata.schema.json", + "module": "/api/schemas/module.schema.json", + "module_in_process": "/api/schemas/module-in-process.schema.json", + "modules": "/api/schemas/modules.schema.json", + "historical_modules": "/api/schemas/historical-modules.schema.json", + "modules_in_process": "/api/schemas/modules-in-process.schema.json", + "certificate_detail": "/api/schemas/certificate-detail.schema.json", + "algorithms": "/api/schemas/algorithms.schema.json" + } + } + ] +} diff --git a/api/schemas/metadata.schema.json b/api/schemas/metadata.schema.json new file mode 100644 index 000000000..967470dde --- /dev/null +++ b/api/schemas/metadata.schema.json @@ -0,0 +1,70 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/metadata.schema.json", + "title": "NIST CMVP API Metadata", + "type": "object", + "additionalProperties": true, + "required": [ + "generated_at", + "total_modules", + "total_historical_modules", + "total_modules_in_process", + "total_certificates_with_algorithms", + "total_certificate_details", + "source", + "modules_in_process_source", + "algorithm_source", + "algorithm_cache_version", + "version" + ], + "properties": { + "generated_at": { + "type": "string", + "format": "date-time" + }, + "total_modules": { + "type": "integer", + "minimum": 0 + }, + "total_historical_modules": { + "type": "integer", + "minimum": 0 + }, + "total_modules_in_process": { + "type": "integer", + "minimum": 0 + }, + "total_certificates_with_algorithms": { + "type": "integer", + "minimum": 0 + }, + "total_certificate_details": { + "type": "integer", + "minimum": 0 + }, + "source": { + "type": "string", + "format": "uri" + }, + "modules_in_process_source": { + "type": "string", + "format": "uri" + }, + "algorithm_source": { + "type": "string" + }, + "algorithm_cache_version": { + "type": "string" + }, + "algorithm_extraction_schema_version": { + "type": "string" + }, + "extraction_metrics": { + "type": "object", + "additionalProperties": true + }, + "version": { + "type": "string" + } + } +} diff --git a/api/schemas/module-in-process.schema.json b/api/schemas/module-in-process.schema.json new file mode 100644 index 000000000..10f0694c4 --- /dev/null +++ b/api/schemas/module-in-process.schema.json @@ -0,0 +1,30 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/module-in-process.schema.json", + "title": "NIST CMVP Module In Process Row", + "type": "object", + "additionalProperties": true, + "required": [ + "Module Name", + "Vendor Name", + "Standard", + "Status" + ], + "properties": { + "Module Name": { + "type": "string" + }, + "Vendor Name": { + "type": "string" + }, + "Vendor Name_url": { + "type": "string" + }, + "Standard": { + "type": "string" + }, + "Status": { + "type": "string" + } + } +} diff --git a/api/schemas/module.schema.json b/api/schemas/module.schema.json new file mode 100644 index 000000000..81cea4a5c --- /dev/null +++ b/api/schemas/module.schema.json @@ -0,0 +1,155 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/module.schema.json", + "title": "NIST CMVP Module Row", + "type": "object", + "additionalProperties": true, + "required": [ + "Certificate Number", + "Vendor Name", + "Module Name", + "security_policy_url", + "certificate_detail_url", + "detail_available" + ], + "properties": { + "Certificate Number": { + "type": "string", + "pattern": "^[0-9]+$" + }, + "Certificate Number_url": { + "type": "string" + }, + "Vendor Name": { + "type": "string" + }, + "Module Name": { + "type": "string" + }, + "Module Type": { + "type": "string" + }, + "Validation Date": { + "type": "string" + }, + "Status": { + "type": "string" + }, + "security_policy_url": { + "type": "string", + "format": "uri" + }, + "certificate_detail_url": { + "type": "string", + "format": "uri" + }, + "standard": { + "type": [ + "string", + "null" + ] + }, + "status": { + "type": [ + "string", + "null" + ] + }, + "overall_level": { + "type": [ + "integer", + "string", + "null" + ] + }, + "sunset_date": { + "type": [ + "string", + "null" + ] + }, + "detail_available": { + "type": "boolean" + }, + "algorithms": { + "type": "array", + "items": { + "type": "string" + } + }, + "algorithms_detailed": { + "type": "array", + "items": { + "type": "string" + } + }, + "algorithm_extraction": { + "type": "object", + "additionalProperties": true, + "required": [ + "schema_version", + "status", + "configured_source", + "source", + "cached", + "fallback_used", + "cache_version", + "algorithm_count", + "detailed_algorithm_count" + ], + "properties": { + "schema_version": { + "type": "string" + }, + "status": { + "type": "string", + "enum": [ + "parsed", + "cached", + "miss", + "skipped" + ] + }, + "configured_source": { + "type": "string" + }, + "source": { + "type": "string" + }, + "source_url": { + "type": [ + "string", + "null" + ], + "format": "uri" + }, + "cached": { + "type": "boolean" + }, + "fallback_used": { + "type": "boolean" + }, + "cache_version": { + "type": "string" + }, + "algorithm_count": { + "type": "integer", + "minimum": 0 + }, + "detailed_algorithm_count": { + "type": "integer", + "minimum": 0 + }, + "attempts": { + "type": "array", + "items": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + } + } + } +} diff --git a/api/schemas/modules-in-process.schema.json b/api/schemas/modules-in-process.schema.json new file mode 100644 index 000000000..08c87f6af --- /dev/null +++ b/api/schemas/modules-in-process.schema.json @@ -0,0 +1,22 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/modules-in-process.schema.json", + "title": "NIST CMVP Modules In Process Response", + "type": "object", + "additionalProperties": false, + "required": [ + "metadata", + "modules_in_process" + ], + "properties": { + "metadata": { + "$ref": "/api/schemas/metadata.schema.json" + }, + "modules_in_process": { + "type": "array", + "items": { + "$ref": "/api/schemas/module-in-process.schema.json" + } + } + } +} diff --git a/api/schemas/modules.schema.json b/api/schemas/modules.schema.json new file mode 100644 index 000000000..0595e05c2 --- /dev/null +++ b/api/schemas/modules.schema.json @@ -0,0 +1,22 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "https://hackidle.github.io/nist-cmvp-api/api/schemas/modules.schema.json", + "title": "NIST CMVP Active Modules Response", + "type": "object", + "additionalProperties": false, + "required": [ + "metadata", + "modules" + ], + "properties": { + "metadata": { + "$ref": "/api/schemas/metadata.schema.json" + }, + "modules": { + "type": "array", + "items": { + "$ref": "/api/schemas/module.schema.json" + } + } + } +} diff --git a/scraper.py b/scraper.py index 9877f78f6..07fe2cdab 100644 --- a/scraper.py +++ b/scraper.py @@ -32,6 +32,7 @@ import sqlite3 import sys import time +from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Set, Tuple @@ -75,6 +76,7 @@ CRAWL4AI_ALGORITHM_SOURCE = "crawl4ai" SECURITY_POLICY_ALGORITHM_SOURCE = "security_policy_pdf" ALGORITHM_CACHE_VERSION = "2026-04-15-legacy-v1" +ALGORITHM_EXTRACTION_SCHEMA_VERSION = "1.0" CACHEABLE_ALGORITHM_SOURCES = { CRAWL4AI_ALGORITHM_SOURCE, SECURITY_POLICY_ALGORITHM_SOURCE, @@ -236,7 +238,7 @@ ("RSA", re.compile(r"\bRSA\b", re.IGNORECASE)), ("ECDSA", re.compile(r"\bECDSA\b", re.IGNORECASE)), ("ECDH", re.compile(r"\bECDH\b", re.IGNORECASE)), - ("DRBG", re.compile(r"\bDRBG\b", re.IGNORECASE)), + ("DRBG", re.compile(r"(?:\b|_)DRBG\b", re.IGNORECASE)), ("KDF", re.compile(r"\b(KDF|KDA|KBKDF|HKDF|PBKDF)\b", re.IGNORECASE)), ("KAS", re.compile(r"\bKAS\b", re.IGNORECASE)), ("KTS", re.compile(r"\bKTS\b", re.IGNORECASE)), @@ -249,6 +251,38 @@ ("CVL", re.compile(r"\bCVL\b", re.IGNORECASE)), ] +PROCESSING_STAT_KEYS = ( + "html_reused", + "html_refreshed", + "html_failed", + "pdf_reused", + "pdf_refreshed", + "pdf_failed", + "pdf_cache_hits", + "algorithm_misses", + "algorithm_cache_hits", + "algorithm_successes", + "algorithm_fallbacks", + "algorithm_source_crawl4ai", + "algorithm_source_security_policy_pdf", + "algorithm_source_database", + "algorithm_source_none", +) + + +@dataclass +class AlgorithmExtractionResult: + """Result of attempting to extract algorithms for one Security Policy.""" + + detailed: List[str] + categories: List[str] + parsed: bool + source: str + source_url: Optional[str] = None + fallback_used: bool = False + pdf_cache_hits: int = 0 + attempts: List[Dict[str, str]] = field(default_factory=list) + def fetch_page(url: str, timeout: int = 30, retries: int = 3) -> Optional[str]: """ @@ -321,6 +355,8 @@ def normalize_string_list(values: Optional[List[str]]) -> List[str]: normalized: List[str] = [] seen: Set[str] = set() for value in values or []: + if value is None: + continue text = normalize_whitespace(str(value)) if not text or text in seen: continue @@ -329,6 +365,97 @@ def normalize_string_list(values: Optional[List[str]]) -> List[str]: return normalized +def new_processing_stats() -> Dict[str, int]: + """Return zeroed scrape/extraction counters for one dataset or certificate.""" + return {key: 0 for key in PROCESSING_STAT_KEYS} + + +def add_processing_stats(target: Dict[str, int], increment: Dict[str, int]) -> None: + """Add processing counters from one stats dictionary into another.""" + for key in PROCESSING_STAT_KEYS: + target[key] = target.get(key, 0) + increment.get(key, 0) + + +def combine_processing_stats(*stats_dicts: Dict[str, int]) -> Dict[str, int]: + """Combine multiple processing stats dictionaries into one.""" + combined = new_processing_stats() + for stats in stats_dicts: + add_processing_stats(combined, stats) + return combined + + +def build_extraction_metrics(active_stats: Dict[str, int], historical_stats: Dict[str, int]) -> Dict[str, object]: + """Build metadata-safe scrape and algorithm extraction metrics.""" + return { + "active": dict(active_stats), + "historical": dict(historical_stats), + "combined": combine_processing_stats(active_stats, historical_stats), + "concurrency": { + "certificate_fetch": CERT_FETCH_CONCURRENCY, + "security_policy_fetch": PDF_FETCH_CONCURRENCY, + }, + } + + +def build_algorithm_extraction_provenance( + configured_source: str, + status: str, + source: str, + source_url: Optional[str], + categories: Optional[List[str]], + detailed: Optional[List[str]], + cached: bool = False, + fallback_used: bool = False, + attempts: Optional[List[Dict[str, str]]] = None, +) -> Dict[str, object]: + """Build the per-certificate provenance object for algorithm extraction.""" + provenance = { + "schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "status": status, + "configured_source": configured_source, + "source": source, + "source_url": source_url, + "cached": cached, + "fallback_used": fallback_used, + "cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_count": len(normalize_string_list(categories or [])), + "detailed_algorithm_count": len(normalize_string_list(detailed or [])), + } + if attempts is not None: + provenance["attempts"] = attempts + return provenance + + +def apply_algorithm_extraction_provenance( + record: Optional[Dict], + provenance: Dict[str, object], + include_attempts: bool = False, +) -> None: + """Attach algorithm extraction provenance to a module or detail payload.""" + if record is None: + return + payload = dict(provenance) + if not include_attempts: + payload.pop("attempts", None) + record["algorithm_extraction"] = payload + + +def cached_algorithm_extraction_source( + previous_module: Optional[Dict], + previous_detail: Optional[Dict], + previous_metadata: Dict, +) -> Tuple[str, Optional[str]]: + """Return the best available extraction source metadata for cached algorithms.""" + previous_extraction = ( + (previous_detail or {}).get("algorithm_extraction") + or (previous_module or {}).get("algorithm_extraction") + or {} + ) + source = previous_extraction.get("source") or previous_metadata.get("algorithm_source") or "cache" + source_url = previous_extraction.get("source_url") + return str(source), source_url if isinstance(source_url, str) else None + + def parse_certificate_number(record: Optional[Dict]) -> Optional[int]: """Extract an integer certificate number from a module row or detail payload.""" if not record: @@ -1199,6 +1326,24 @@ async def fetch_with_retry( return None +async def fetch_policy_pdf_bytes( + client: httpx.AsyncClient, + url: str, + pdf_cache: Dict[str, asyncio.Task], + pdf_cache_lock: asyncio.Lock, +) -> Tuple[Optional[bytes], bool]: + """Fetch Security Policy PDF bytes through an in-run task cache.""" + async with pdf_cache_lock: + task = pdf_cache.get(url) + cache_hit = task is not None + if task is None: + task = asyncio.create_task(fetch_with_retry(client, url, response_type="bytes")) + pdf_cache[url] = task + + result = await task + return result if isinstance(result, bytes) else None, cache_hit + + async def fetch_crawl4ai_policy_text( url: str, retries: int = 1, @@ -1280,38 +1425,110 @@ async def fetch_certificate_algorithms( fallback_url: Optional[str], pdf_semaphore: asyncio.Semaphore, algorithm_source: str, -) -> Tuple[List[str], List[str], bool]: + pdf_cache: Dict[str, asyncio.Task], + pdf_cache_lock: asyncio.Lock, +) -> AlgorithmExtractionResult: """Fetch and parse a certificate's Security Policy using the configured source.""" + attempts: List[Dict[str, str]] = [] + pdf_cache_hits = 0 + for candidate in normalize_string_list([security_policy_url, fallback_url]): if algorithm_source == CRAWL4AI_ALGORITHM_SOURCE and CRAWL4AI_AVAILABLE: + attempt = { + "source": CRAWL4AI_ALGORITHM_SOURCE, + "url": candidate, + "status": "started", + } async with pdf_semaphore: policy_text = await fetch_crawl4ai_policy_text(candidate) if policy_text: try: detailed, categories = parse_algorithms_from_policy_text(policy_text) if detailed or categories: - return detailed, categories, True + attempt["status"] = "parsed" + attempts.append(attempt) + return AlgorithmExtractionResult( + detailed=detailed, + categories=categories, + parsed=True, + source=CRAWL4AI_ALGORITHM_SOURCE, + source_url=candidate, + attempts=attempts, + ) + attempt["status"] = "no_algorithms" + attempts.append(attempt) print( f"Warning: Crawl4AI returned policy text for {candidate} but no algorithm rows were found; " "falling back to local PDF parsing.", file=sys.stderr, ) except Exception as exc: + attempt["status"] = "parse_error" + attempt["error"] = str(exc)[:200] + attempts.append(attempt) print(f"Warning: Failed to parse Crawl4AI policy text for {candidate}: {exc}", file=sys.stderr) + else: + attempt["status"] = "no_text" + attempts.append(attempt) + local_attempt = { + "source": SECURITY_POLICY_ALGORITHM_SOURCE, + "url": candidate, + "status": "started", + } async with pdf_semaphore: - pdf_bytes = await fetch_with_retry(client, candidate, response_type="bytes") + pdf_bytes, cache_hit = await fetch_policy_pdf_bytes( + client, + candidate, + pdf_cache, + pdf_cache_lock, + ) + if cache_hit: + pdf_cache_hits += 1 + local_attempt["cache_hit"] = "true" if not pdf_bytes: + local_attempt["status"] = "fetch_failed" + attempts.append(local_attempt) continue try: detailed, categories = parse_algorithms_from_policy_pdf_bytes(pdf_bytes) if detailed or categories: - return detailed, categories, True + local_attempt["status"] = "parsed" + attempts.append(local_attempt) + return AlgorithmExtractionResult( + detailed=detailed, + categories=categories, + parsed=True, + source=SECURITY_POLICY_ALGORITHM_SOURCE, + source_url=candidate, + fallback_used=any( + attempt.get("source") == CRAWL4AI_ALGORITHM_SOURCE + for attempt in attempts + ), + pdf_cache_hits=pdf_cache_hits, + attempts=attempts, + ) + local_attempt["status"] = "no_algorithms" + attempts.append(local_attempt) except Exception as exc: + local_attempt["status"] = "parse_error" + local_attempt["error"] = str(exc)[:200] + attempts.append(local_attempt) print(f"Warning: Failed to parse Security Policy PDF {candidate}: {exc}", file=sys.stderr) - return [], [], False + return AlgorithmExtractionResult( + detailed=[], + categories=[], + parsed=False, + source="none", + fallback_used=any( + attempt.get("source") == CRAWL4AI_ALGORITHM_SOURCE + for attempt in attempts + ), + pdf_cache_hits=pdf_cache_hits, + attempts=attempts, + ) async def process_certificate_record( @@ -1325,18 +1542,12 @@ async def process_certificate_record( client: httpx.AsyncClient, cert_semaphore: asyncio.Semaphore, pdf_semaphore: asyncio.Semaphore, + pdf_cache: Dict[str, asyncio.Task], + pdf_cache_lock: asyncio.Lock, database_algorithms_map: Dict[int, List[str]], ) -> Tuple[Dict, Optional[Dict], List[str], Dict[str, int]]: """Process one module row into an enriched module row and optional detail payload.""" - stats = { - "html_reused": 0, - "html_refreshed": 0, - "html_failed": 0, - "pdf_reused": 0, - "pdf_refreshed": 0, - "pdf_failed": 0, - "algorithm_misses": 0, - } + stats = new_processing_stats() cert_number = parse_certificate_number(module) module_out = dict(previous_module or {}) @@ -1344,6 +1555,17 @@ async def process_certificate_record( if cert_number is None: strip_algorithm_fields(module_out) + apply_algorithm_extraction_provenance( + module_out, + build_algorithm_extraction_provenance( + algorithm_source, + "skipped", + "none", + None, + [], + [], + ), + ) module_out["detail_available"] = False return module_out, None, [], stats @@ -1407,38 +1629,106 @@ async def process_certificate_record( if algorithm_source == "database": categories = normalize_string_list(database_algorithms_map.get(cert_number, [])) detailed: List[str] = [] + extraction_status = "parsed" if categories else "miss" + extraction_provenance = build_algorithm_extraction_provenance( + algorithm_source, + extraction_status, + "database", + None, + categories, + detailed, + ) + stats["algorithm_source_database"] += 1 + if categories: + stats["algorithm_successes"] += 1 + else: + stats["algorithm_misses"] += 1 if detail_payload: apply_algorithm_fields(detail_payload, categories, detailed) + apply_algorithm_extraction_provenance(detail_payload, extraction_provenance, include_attempts=True) apply_algorithm_fields(module_out, categories, detailed) + apply_algorithm_extraction_provenance(module_out, extraction_provenance) elif algorithm_source in CACHEABLE_ALGORITHM_SOURCES: detailed, categories = ([], []) if trusted_algorithm_reuse: categories, detailed = cached_algorithm_fields(previous_module, previous_detail) stats["pdf_reused"] += 1 + stats["algorithm_cache_hits"] += 1 + cached_source, cached_source_url = cached_algorithm_extraction_source( + previous_module, + previous_detail, + previous_metadata, + ) + extraction_provenance = build_algorithm_extraction_provenance( + algorithm_source, + "cached", + cached_source, + cached_source_url, + categories, + detailed, + cached=True, + ) + if categories or detailed: + stats["algorithm_successes"] += 1 else: if detail_payload: strip_algorithm_fields(detail_payload) strip_algorithm_fields(module_out) - detailed, categories, parsed = await fetch_certificate_algorithms( + extraction_result = await fetch_certificate_algorithms( client, (detail_payload or {}).get("security_policy_url") or module.get("security_policy_url"), get_security_policy_url(cert_number), pdf_semaphore, algorithm_source, + pdf_cache, + pdf_cache_lock, ) - if parsed: + detailed = extraction_result.detailed + categories = extraction_result.categories + stats["pdf_cache_hits"] += extraction_result.pdf_cache_hits + extraction_provenance = build_algorithm_extraction_provenance( + algorithm_source, + "parsed" if extraction_result.parsed else "miss", + extraction_result.source, + extraction_result.source_url, + categories, + detailed, + fallback_used=extraction_result.fallback_used, + attempts=extraction_result.attempts, + ) + if extraction_result.parsed: stats["pdf_refreshed"] += 1 + stats["algorithm_successes"] += 1 + if extraction_result.source == CRAWL4AI_ALGORITHM_SOURCE: + stats["algorithm_source_crawl4ai"] += 1 + elif extraction_result.source == SECURITY_POLICY_ALGORITHM_SOURCE: + stats["algorithm_source_security_policy_pdf"] += 1 + if extraction_result.fallback_used: + stats["algorithm_fallbacks"] += 1 else: stats["pdf_failed"] += 1 stats["algorithm_misses"] += 1 if detail_payload: apply_algorithm_fields(detail_payload, categories, detailed) + apply_algorithm_extraction_provenance(detail_payload, extraction_provenance, include_attempts=True) apply_algorithm_fields(module_out, categories, detailed) + apply_algorithm_extraction_provenance(module_out, extraction_provenance) else: + extraction_provenance = build_algorithm_extraction_provenance( + algorithm_source, + "skipped", + "none", + None, + [], + [], + ) + stats["algorithm_source_none"] += 1 if detail_payload: strip_algorithm_fields(detail_payload) + apply_algorithm_extraction_provenance(detail_payload, extraction_provenance, include_attempts=True) strip_algorithm_fields(module_out) + apply_algorithm_extraction_provenance(module_out, extraction_provenance) module_out["detail_available"] = detail_payload is not None module_categories = normalize_string_list(module_out.get("algorithms", [])) @@ -1462,19 +1752,13 @@ async def build_certificate_artifacts( results: List[Optional[Dict]] = [None] * len(modules) payloads: Dict[int, Dict] = {} algorithms_map: Dict[int, List[str]] = {} - stats = { - "html_reused": 0, - "html_refreshed": 0, - "html_failed": 0, - "pdf_reused": 0, - "pdf_refreshed": 0, - "pdf_failed": 0, - "algorithm_misses": 0, - } + stats = new_processing_stats() timeout = httpx.Timeout(30.0) cert_semaphore = asyncio.Semaphore(CERT_FETCH_CONCURRENCY) pdf_semaphore = asyncio.Semaphore(PDF_FETCH_CONCURRENCY) + pdf_cache: Dict[str, asyncio.Task] = {} + pdf_cache_lock = asyncio.Lock() async with httpx.AsyncClient( headers={"User-Agent": USER_AGENT}, @@ -1497,6 +1781,8 @@ async def build_certificate_artifacts( client, cert_semaphore, pdf_semaphore, + pdf_cache, + pdf_cache_lock, database_algorithms_map, ) ) @@ -1513,8 +1799,7 @@ async def build_certificate_artifacts( payloads[cert_number] = detail_payload if cert_number is not None and categories: algorithms_map[cert_number] = categories - for key, value in task_stats.items(): - stats[key] += value + add_processing_stats(stats, task_stats) if completed % 100 == 0 or completed == total: print( f" Progress: {completed}/{total} " @@ -1859,9 +2144,27 @@ def documentation_paths() -> Dict[str, str]: "llms_full_txt": "/llms-full.txt", "api_docs": "/api/docs.md", "openapi": "/openapi.json", + "json_schemas": "/api/schemas/index.schema.json", } +def schema_paths(algorithms_summary: Optional[Dict] = None) -> Dict[str, str]: + """Return published JSON Schema paths.""" + paths = { + "index": "/api/schemas/index.schema.json", + "metadata": "/api/schemas/metadata.schema.json", + "module": "/api/schemas/module.schema.json", + "module_in_process": "/api/schemas/module-in-process.schema.json", + "modules": "/api/schemas/modules.schema.json", + "historical_modules": "/api/schemas/historical-modules.schema.json", + "modules_in_process": "/api/schemas/modules-in-process.schema.json", + "certificate_detail": "/api/schemas/certificate-detail.schema.json", + } + if algorithms_summary: + paths["algorithms"] = "/api/schemas/algorithms.schema.json" + return paths + + def sample_module_example(module: Optional[Dict]) -> Dict: """Build a compact module example for generated docs.""" if not module: @@ -1881,6 +2184,7 @@ def sample_module_example(module: Optional[Dict]) -> Dict: "security_policy_url", "certificate_detail_url", "detail_available", + "algorithm_extraction", ] example = {} for key in keys: @@ -1889,6 +2193,9 @@ def sample_module_example(module: Optional[Dict]) -> Dict: value = module[key] if key in {"Module Name"}: value = truncate_text(value, 100) + if key == "algorithm_extraction" and isinstance(value, dict): + value = dict(value) + value.pop("attempts", None) example[key] = value if "description" in module: example["description"] = truncate_text(module["description"]) @@ -1925,6 +2232,10 @@ def sample_certificate_example(detail: Optional[Dict]) -> Dict: "validation_history": (detail.get("validation_history") or [])[:2], "algorithms": (detail.get("algorithms") or [])[:5], } + if isinstance(detail.get("algorithm_extraction"), dict): + algorithm_extraction = dict(detail["algorithm_extraction"]) + algorithm_extraction.pop("attempts", None) + example["algorithm_extraction"] = algorithm_extraction return {key: value for key, value in example.items() if value not in (None, [], {})} @@ -1982,7 +2293,10 @@ def build_api_reference_body( "`GET api/index.json` — API discovery endpoint with resource paths, documentation links, feature flags, and current counts.", "", "### Metadata", - "`GET api/metadata.json` — Generation timestamp, source URLs, dataset counts, and algorithm extraction status.", + "`GET api/metadata.json` — Generation timestamp, source URLs, dataset counts, extraction metrics, and algorithm extraction status.", + "", + "### JSON Schemas", + "`GET api/schemas/index.schema.json` — JSON Schema discovery document for the static API response files.", "", "### Active Modules", f"`GET api/modules.json` — All {format_count(total_modules)} active validated modules.", @@ -1999,7 +2313,7 @@ def build_api_reference_body( } ), "", - "Each active module includes certificate identifiers, vendor/module names, validation metadata, direct Security Policy links, NIST detail URLs, and detail availability flags.", + "Each active module includes certificate identifiers, vendor/module names, validation metadata, direct Security Policy links, NIST detail URLs, detail availability flags, and algorithm extraction provenance when algorithms were evaluated.", "", "### Historical Modules", f"`GET api/historical-modules.json` — All {format_count(total_historical)} expired or revoked modules for historical lookups.", @@ -2015,6 +2329,8 @@ def build_api_reference_body( "### Algorithms", f"`GET api/algorithms.json` — Algorithm usage summary across {format_count(total_algorithms)} certificates in the current build.", "", + "`algorithm_extraction` records the configured source, actual source, cache/fallback status, source URL, and extracted row counts for each evaluated certificate.", + "", "Example response (truncated):", "", render_json_block(sample_algorithms_example(algorithms_summary)), @@ -2050,7 +2366,7 @@ def build_api_reference_body( "### Discover the API surface", "```", "GET api/index.json → endpoints, docs links, feature flags, counts", - "GET api/metadata.json → freshness and scrape provenance", + "GET api/metadata.json → freshness, scrape provenance, and extraction metrics", "```", "", "### Find a module and pull the full certificate record", @@ -2073,7 +2389,7 @@ def build_api_reference_body( "### Explore algorithm coverage", "```", "GET api/algorithms.json → counts and certificate lists per algorithm", - "GET api/modules.json → filter module rows by algorithms[] entries", + "GET api/modules.json → filter module rows by algorithms[] entries and inspect algorithm_extraction", "```", "", ] @@ -2092,7 +2408,7 @@ def build_api_reference_body( if algorithms_summary: lines.append( - f"- **Algorithms coverage:** `api/algorithms.json` summarizes {format_count(total_algorithms)} certificates that had algorithm data in this build." + f"- **Algorithms coverage:** `api/algorithms.json` summarizes {format_count(total_algorithms)} certificates that had algorithm data in this build. `api/metadata.json` reports extraction cache hits, refreshes, failures, misses, and fallback counts." ) else: lines.append( @@ -2109,7 +2425,7 @@ def build_llms_txt(metadata: Dict, algorithms_summary: Optional[Dict]) -> str: f"- `api/modules.json` — {format_count(metadata.get('total_modules', 0))} active validated modules.", f"- `api/historical-modules.json` — {format_count(metadata.get('total_historical_modules', 0))} historical modules.", f"- `api/modules-in-process.json` — {format_count(metadata.get('total_modules_in_process', 0))} modules currently in process.", - "- `api/metadata.json` — generation timestamp, counts, and source URLs.", + "- `api/metadata.json` — generation timestamp, counts, source URLs, and extraction metrics.", f"- `api/certificates/{{certificate}}.json` — full detail record for a single CMVP certificate.", ] if algorithms_summary: @@ -2139,6 +2455,7 @@ def build_llms_txt(metadata: Dict, algorithms_summary: Optional[Dict]) -> str: "- [API Reference](api/docs.md): endpoint reference with examples and workflows.", "- [Complete Documentation](llms-full.txt): fuller single-file agent reference.", "- [OpenAPI](openapi.json): OpenAPI 3.0.3 schema for the JSON endpoints.", + "- [JSON Schemas](api/schemas/index.schema.json): JSON Schema index for static API responses.", "", "## Caveats", "", @@ -2242,6 +2559,7 @@ def build_index_html(metadata: Dict, algorithms_summary: Optional[Dict]) -> str: '
  • llms-full.txt
  • ', '
  • api/docs.md
  • ', '
  • openapi.json
  • ', + '
  • JSON Schemas
  • ', ] endpoint_links = [ @@ -2333,6 +2651,310 @@ def generate_text_artifacts( } +def json_schema_document(title: str, schema_id: str, schema: Dict) -> Dict: + """Wrap a JSON Schema body with common metadata.""" + document = { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": f"{PUBLIC_BASE_URL}{schema_id}", + "title": title, + } + document.update(schema) + return document + + +def algorithm_extraction_schema() -> Dict: + """Return the shared algorithm extraction provenance schema.""" + return { + "type": "object", + "additionalProperties": True, + "required": [ + "schema_version", + "status", + "configured_source", + "source", + "cached", + "fallback_used", + "cache_version", + "algorithm_count", + "detailed_algorithm_count", + ], + "properties": { + "schema_version": {"type": "string"}, + "status": {"type": "string", "enum": ["parsed", "cached", "miss", "skipped"]}, + "configured_source": {"type": "string"}, + "source": {"type": "string"}, + "source_url": {"type": ["string", "null"], "format": "uri"}, + "cached": {"type": "boolean"}, + "fallback_used": {"type": "boolean"}, + "cache_version": {"type": "string"}, + "algorithm_count": {"type": "integer", "minimum": 0}, + "detailed_algorithm_count": {"type": "integer", "minimum": 0}, + "attempts": { + "type": "array", + "items": {"type": "object", "additionalProperties": {"type": "string"}}, + }, + }, + } + + +def module_schema() -> Dict: + """Return a backwards-compatible schema for active and historical module rows.""" + return { + "type": "object", + "additionalProperties": True, + "required": [ + "Certificate Number", + "Vendor Name", + "Module Name", + "security_policy_url", + "certificate_detail_url", + "detail_available", + ], + "properties": { + "Certificate Number": {"type": "string", "pattern": "^[0-9]+$"}, + "Certificate Number_url": {"type": "string"}, + "Vendor Name": {"type": "string"}, + "Module Name": {"type": "string"}, + "Module Type": {"type": "string"}, + "Validation Date": {"type": "string"}, + "Status": {"type": "string"}, + "security_policy_url": {"type": "string", "format": "uri"}, + "certificate_detail_url": {"type": "string", "format": "uri"}, + "standard": {"type": ["string", "null"]}, + "status": {"type": ["string", "null"]}, + "overall_level": {"type": ["integer", "string", "null"]}, + "sunset_date": {"type": ["string", "null"]}, + "detail_available": {"type": "boolean"}, + "algorithms": {"type": "array", "items": {"type": "string"}}, + "algorithms_detailed": {"type": "array", "items": {"type": "string"}}, + "algorithm_extraction": algorithm_extraction_schema(), + }, + } + + +def module_in_process_schema() -> Dict: + """Return the schema for CMVP modules in process rows.""" + return { + "type": "object", + "additionalProperties": True, + "required": ["Module Name", "Vendor Name", "Standard", "Status"], + "properties": { + "Module Name": {"type": "string"}, + "Vendor Name": {"type": "string"}, + "Vendor Name_url": {"type": "string"}, + "Standard": {"type": "string"}, + "Status": {"type": "string"}, + }, + } + + +def metadata_schema() -> Dict: + """Return the dataset metadata schema.""" + return { + "type": "object", + "additionalProperties": True, + "required": [ + "generated_at", + "total_modules", + "total_historical_modules", + "total_modules_in_process", + "total_certificates_with_algorithms", + "total_certificate_details", + "source", + "modules_in_process_source", + "algorithm_source", + "algorithm_cache_version", + "version", + ], + "properties": { + "generated_at": {"type": "string", "format": "date-time"}, + "total_modules": {"type": "integer", "minimum": 0}, + "total_historical_modules": {"type": "integer", "minimum": 0}, + "total_modules_in_process": {"type": "integer", "minimum": 0}, + "total_certificates_with_algorithms": {"type": "integer", "minimum": 0}, + "total_certificate_details": {"type": "integer", "minimum": 0}, + "source": {"type": "string", "format": "uri"}, + "modules_in_process_source": {"type": "string", "format": "uri"}, + "algorithm_source": {"type": "string"}, + "algorithm_cache_version": {"type": "string"}, + "algorithm_extraction_schema_version": {"type": "string"}, + "extraction_metrics": {"type": "object", "additionalProperties": True}, + "version": {"type": "string"}, + }, + } + + +def response_schema(metadata_ref: str, array_name: str, item_ref: str) -> Dict: + """Return a two-field metadata/list response schema.""" + return { + "type": "object", + "additionalProperties": False, + "required": ["metadata", array_name], + "properties": { + "metadata": {"$ref": metadata_ref}, + array_name: {"type": "array", "items": {"$ref": item_ref}}, + }, + } + + +def certificate_detail_schema() -> Dict: + """Return the per-certificate detail response schema.""" + certificate_schema = { + "type": "object", + "additionalProperties": True, + "required": [ + "certificate_number", + "dataset", + "generated_at", + "nist_page_url", + "certificate_detail_url", + "security_policy_url", + "vendor_name", + "module_name", + "standard", + "status", + "related_files", + "validation_history", + "vendor", + ], + "properties": { + "certificate_number": {"type": "string", "pattern": "^[0-9]+$"}, + "dataset": {"type": "string", "enum": ["active", "historical"]}, + "generated_at": {"type": "string", "format": "date-time"}, + "nist_page_url": {"type": "string", "format": "uri"}, + "certificate_detail_url": {"type": "string", "format": "uri"}, + "security_policy_url": {"type": ["string", "null"], "format": "uri"}, + "vendor_name": {"type": ["string", "null"]}, + "module_name": {"type": ["string", "null"]}, + "standard": {"type": ["string", "null"]}, + "status": {"type": ["string", "null"]}, + "related_files": {"type": "array", "items": {"type": "object", "additionalProperties": True}}, + "validation_history": {"type": "array", "items": {"type": "object", "additionalProperties": True}}, + "vendor": {"type": "object", "additionalProperties": True}, + "algorithms": {"type": "array", "items": {"type": "string"}}, + "algorithms_detailed": {"type": "array", "items": {"type": "string"}}, + "algorithm_extraction": algorithm_extraction_schema(), + }, + } + return { + "type": "object", + "additionalProperties": False, + "required": ["metadata", "certificate"], + "properties": { + "metadata": { + "type": "object", + "additionalProperties": True, + "required": ["generated_at", "dataset", "source"], + }, + "certificate": certificate_schema, + }, + } + + +def algorithms_schema() -> Dict: + """Return the algorithms summary response schema.""" + return { + "type": "object", + "additionalProperties": False, + "required": ["total_unique_algorithms", "total_certificate_algorithm_pairs", "algorithms", "metadata"], + "properties": { + "total_unique_algorithms": {"type": "integer", "minimum": 0}, + "total_certificate_algorithm_pairs": {"type": "integer", "minimum": 0}, + "algorithms": { + "type": "object", + "additionalProperties": { + "type": "object", + "additionalProperties": False, + "required": ["count", "certificates"], + "properties": { + "count": {"type": "integer", "minimum": 0}, + "certificates": {"type": "array", "items": {"type": "integer"}}, + }, + }, + }, + "metadata": {"type": "object", "additionalProperties": True}, + }, + } + + +def build_schema_index_payload(algorithms_summary: Optional[Dict]) -> Dict: + """Build the JSON Schema discovery document.""" + return { + "name": "NIST CMVP API JSON Schemas", + "schema_version": "1.0", + "base_url": PUBLIC_BASE_URL, + "schemas": schema_paths(algorithms_summary), + } + + +def generate_json_schema_artifacts(algorithms_summary: Optional[Dict]) -> Dict[str, Dict]: + """Generate tracked JSON Schema artifacts for API response files.""" + metadata_path = "/api/schemas/metadata.schema.json" + module_path = "/api/schemas/module.schema.json" + module_in_process_path = "/api/schemas/module-in-process.schema.json" + paths = schema_paths(algorithms_summary) + artifacts = { + "api/schemas/index.schema.json": json_schema_document( + "NIST CMVP API JSON Schema Index", + paths["index"], + { + "type": "object", + "additionalProperties": False, + "required": ["name", "schema_version", "base_url", "schemas"], + "properties": { + "name": {"type": "string"}, + "schema_version": {"type": "string"}, + "base_url": {"type": "string", "format": "uri"}, + "schemas": {"type": "object", "additionalProperties": {"type": "string"}}, + }, + "examples": [build_schema_index_payload(algorithms_summary)], + }, + ), + "api/schemas/metadata.schema.json": json_schema_document( + "NIST CMVP API Metadata", + paths["metadata"], + metadata_schema(), + ), + "api/schemas/module.schema.json": json_schema_document( + "NIST CMVP Module Row", + module_path, + module_schema(), + ), + "api/schemas/module-in-process.schema.json": json_schema_document( + "NIST CMVP Module In Process Row", + module_in_process_path, + module_in_process_schema(), + ), + "api/schemas/modules.schema.json": json_schema_document( + "NIST CMVP Active Modules Response", + paths["modules"], + response_schema(metadata_path, "modules", module_path), + ), + "api/schemas/historical-modules.schema.json": json_schema_document( + "NIST CMVP Historical Modules Response", + paths["historical_modules"], + response_schema(metadata_path, "modules", module_path), + ), + "api/schemas/modules-in-process.schema.json": json_schema_document( + "NIST CMVP Modules In Process Response", + paths["modules_in_process"], + response_schema(metadata_path, "modules_in_process", module_in_process_path), + ), + "api/schemas/certificate-detail.schema.json": json_schema_document( + "NIST CMVP Certificate Detail Response", + paths["certificate_detail"], + certificate_detail_schema(), + ), + } + if algorithms_summary: + artifacts["api/schemas/algorithms.schema.json"] = json_schema_document( + "NIST CMVP Algorithms Summary Response", + paths["algorithms"], + algorithms_schema(), + ) + return artifacts + + def build_index_payload(metadata: Dict, algorithms_summary: Optional[Dict]) -> Dict: """Build the API index payload published at api/index.json.""" endpoints = { @@ -2352,6 +2974,7 @@ def build_index_payload(metadata: Dict, algorithms_summary: Optional[Dict]) -> D "base_url": PUBLIC_BASE_URL, "endpoints": endpoints, "documentation": documentation_paths(), + "schemas": schema_paths(algorithms_summary), "last_updated": metadata.get("generated_at"), "total_modules": metadata.get("total_modules", 0), "total_historical_modules": metadata.get("total_historical_modules", 0), @@ -2362,11 +2985,14 @@ def build_index_payload(metadata: Dict, algorithms_summary: Optional[Dict]) -> D "security_policy_urls": True, "certificate_detail_urls": True, "algorithm_extraction": bool(algorithms_summary), + "algorithm_extraction_provenance": True, + "extraction_metrics": True, "certificate_detail_records": True, "llms_txt": True, "llms_full_txt": True, "markdown_api_docs": True, "openapi_spec": True, + "json_schemas": True, }, } @@ -2609,8 +3235,11 @@ def generate_openapi_spec( "total_certificates_with_algorithms": {"type": "integer", "example": metadata.get("total_certificates_with_algorithms", 0)}, "total_certificate_details": {"type": "integer", "example": metadata.get("total_certificate_details", 0)}, "source": {"type": "string", "example": metadata.get("source", "")}, + "modules_in_process_source": {"type": "string", "example": metadata.get("modules_in_process_source", "")}, "algorithm_source": {"type": "string", "example": metadata.get("algorithm_source", "")}, "algorithm_cache_version": {"type": "string", "example": metadata.get("algorithm_cache_version", "")}, + "algorithm_extraction_schema_version": {"type": "string", "example": metadata.get("algorithm_extraction_schema_version", "")}, + "extraction_metrics": {"type": "object", "additionalProperties": True}, "version": {"type": "string", "example": metadata.get("version", "")} } }, @@ -2656,7 +3285,8 @@ def generate_openapi_spec( } } } - } + }, + "metadata": {"type": "object", "additionalProperties": True} } }, "CertificateDetail": { @@ -2843,6 +3473,8 @@ def main(): certificate_detail_payloads.update(historical_payloads) algorithms_map.update(historical_algorithms) + extraction_metrics = build_extraction_metrics(active_stats, historical_stats) + # Prepare output directory output_dir = "api" @@ -2858,6 +3490,8 @@ def main(): "modules_in_process_source": MODULES_IN_PROCESS_URL, "algorithm_source": algorithm_source, "algorithm_cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_extraction_schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "extraction_metrics": extraction_metrics, "version": "3.0" } @@ -2910,9 +3544,18 @@ def main(): algorithms_summary["metadata"] = { "generated_at": metadata["generated_at"], "total_certificates_processed": len(algorithms_map), - "source": algorithm_source + "source": algorithm_source, + "algorithm_source": algorithm_source, + "algorithm_cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_extraction_schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "extraction_metrics": extraction_metrics["combined"], } save_json(algorithms_summary, f"{output_dir}/algorithms.json") + else: + algorithms_path = Path(output_dir) / "algorithms.json" + if algorithms_path.exists(): + algorithms_path.unlink() + print(f"Removed stale: {algorithms_path}") # Save metadata separately for quick access save_json(metadata, f"{output_dir}/metadata.json") @@ -2942,6 +3585,18 @@ def main(): ).items(): save_text(content, path) + print("Generating JSON Schema artifacts...") + schema_artifacts = generate_json_schema_artifacts(algorithms_summary) + for path, schema in schema_artifacts.items(): + save_json(schema, path) + schema_dir = Path(output_dir) / "schemas" + if schema_dir.exists(): + expected_schema_paths = {Path(path) for path in schema_artifacts} + for stale_schema in schema_dir.glob("*.schema.json"): + if stale_schema not in expected_schema_paths: + stale_schema.unlink() + print(f"Removed stale: {stale_schema}") + print("\n" + "=" * 60) print("Scraping completed successfully!") print("=" * 60) @@ -2967,12 +3622,14 @@ def main(): print( " - Active algorithm reuse: " f"{active_stats['pdf_reused']} reused, {active_stats['pdf_refreshed']} refreshed, " - f"{active_stats['pdf_failed']} failed, {active_stats['algorithm_misses']} misses" + f"{active_stats['pdf_failed']} failed, {active_stats['pdf_cache_hits']} PDF cache hits, " + f"{active_stats['algorithm_misses']} misses" ) print( " - Historical algorithm reuse: " f"{historical_stats['pdf_reused']} reused, {historical_stats['pdf_refreshed']} refreshed, " - f"{historical_stats['pdf_failed']} failed, {historical_stats['algorithm_misses']} misses" + f"{historical_stats['pdf_failed']} failed, {historical_stats['pdf_cache_hits']} PDF cache hits, " + f"{historical_stats['algorithm_misses']} misses" ) print(f" - OpenAPI spec: openapi.json") print(f"\nOutput files saved to: {output_dir}/") diff --git a/test_scraper.py b/test_scraper.py index db8fc9089..71593e048 100644 --- a/test_scraper.py +++ b/test_scraper.py @@ -4,6 +4,7 @@ Tests the parsing logic with sample HTML. """ +import asyncio import json import sys import tempfile @@ -11,22 +12,37 @@ from types import SimpleNamespace from scraper import ( ALGORITHM_CACHE_VERSION, + ALGORITHM_EXTRACTION_SCHEMA_VERSION, + build_algorithm_extraction_provenance, build_certificate_fingerprint, + build_extraction_metrics, build_index_payload, extract_legacy_algorithm_section, extract_text_from_crawl4ai_process_result, extract_text_from_crawl4ai_html, + fetch_policy_pdf_bytes, + generate_json_schema_artifacts, generate_openapi_spec, generate_text_artifacts, parse_algorithms_from_policy_markdown, parse_algorithms_from_policy_text, parse_certificate_detail_page, parse_modules_table, + process_certificate_record, prune_orphan_certificate_details, select_algorithm_source, should_reuse_certificate_detail, should_reuse_cached_algorithms, ) +from validate_api import validate_api + + +FIXTURE_DIR = Path(__file__).parent / "tests" / "fixtures" / "nist_security_policies" + + +def load_policy_fixture(name: str) -> str: + """Load a checked-in Security Policy text fixture.""" + return (FIXTURE_DIR / name).read_text(encoding="utf-8") def test_parse_simple_table(): @@ -487,6 +503,44 @@ def test_extract_legacy_algorithm_section_prefers_body_over_toc(): print("✓ Legacy algorithm section TOC preference test passed") +def test_parse_real_world_fips_140_3_policy_fixture(): + """Regression-test a representative FIPS 140-3 NIST Security Policy text fixture.""" + policy_text = load_policy_fixture("5260_fips_140_3_algorithms.txt") + + detailed, categories = parse_algorithms_from_policy_text(policy_text) + + assert any("AES-CBC" in entry for entry in detailed), "Expected AES-CBC from FIPS 140-3 fixture" + assert any("HMAC SHA2-256" in entry for entry in detailed), "Expected HMAC from FIPS 140-3 fixture" + assert any("CTR_DRBG" in entry for entry in detailed), "Expected DRBG from FIPS 140-3 fixture" + assert categories == ["AES", "DRBG", "HMAC", "SHA"], "Expected normalized FIPS 140-3 categories" + + print("✓ Real-world FIPS 140-3 fixture parsing test passed") + + +def test_parse_real_world_fips_140_2_policy_fixture(): + """Regression-test a representative FIPS 140-2 NIST Security Policy text fixture.""" + policy_text = load_policy_fixture("5152_fips_140_2_algorithms.txt") + + detailed, categories = parse_algorithms_from_policy_text(policy_text) + + assert detailed == [], "Legacy FIPS 140-2 fixture should use coarse categories" + assert categories == [ + "AES", + "DRBG", + "ECDSA", + "HMAC", + "KAS", + "KDF", + "RSA", + "SHS", + "SSH", + "TLS", + ], "Expected normalized FIPS 140-2 categories" + assert "DES" not in categories, "Allowed/non-approved section must not leak into approved categories" + + print("✓ Real-world FIPS 140-2 fixture parsing test passed") + + def test_parse_algorithms_from_policy_markdown(): """Test parsing algorithm tables from policy markdown output.""" markdown = """ @@ -664,6 +718,150 @@ def test_should_reuse_cached_algorithms(): print("✓ Algorithm cache reuse test passed") +def test_algorithm_extraction_provenance_and_metrics(): + """Algorithm extraction provenance should expose source, cache, fallback, and counts.""" + provenance = build_algorithm_extraction_provenance( + "crawl4ai", + "parsed", + "security_policy_pdf", + "https://csrc.nist.gov/example.pdf", + ["AES", "HMAC"], + ["AES-CBC A1", "HMAC SHA2-256 A1"], + cached=False, + fallback_used=True, + attempts=[ + {"source": "crawl4ai", "url": "https://csrc.nist.gov/example.pdf", "status": "no_algorithms"}, + {"source": "security_policy_pdf", "url": "https://csrc.nist.gov/example.pdf", "status": "parsed"}, + ], + ) + + assert provenance["schema_version"] == ALGORITHM_EXTRACTION_SCHEMA_VERSION, "Provenance schema version mismatch" + assert provenance["configured_source"] == "crawl4ai", "Configured source should be recorded" + assert provenance["source"] == "security_policy_pdf", "Actual extraction source should be recorded" + assert provenance["fallback_used"] is True, "Fallback usage should be recorded" + assert provenance["algorithm_count"] == 2, "Algorithm category count mismatch" + assert provenance["detailed_algorithm_count"] == 2, "Detailed algorithm count mismatch" + assert len(provenance["attempts"]) == 2, "Attempt provenance should be retained for detail records" + + active_stats = {"html_reused": 3, "algorithm_successes": 2, "algorithm_fallbacks": 1} + historical_stats = {"html_refreshed": 4, "algorithm_misses": 1} + metrics = build_extraction_metrics(active_stats, historical_stats) + assert metrics["combined"]["html_reused"] == 3, "Combined metrics should include active counters" + assert metrics["combined"]["html_refreshed"] == 4, "Combined metrics should include historical counters" + assert metrics["combined"]["algorithm_successes"] == 2, "Combined metrics should include successes" + assert metrics["combined"]["algorithm_misses"] == 1, "Combined metrics should include misses" + assert "concurrency" in metrics, "Extraction metrics should record concurrency settings" + + print("✓ Algorithm provenance and metrics test passed") + + +def test_fetch_policy_pdf_bytes_reuses_in_run_cache(): + """Local Security Policy PDF fetches should be reused within one scrape run.""" + class FakeResponse: + status_code = 200 + headers = {} + text = "" + content = b"%PDF-1.7 fixture" + + def raise_for_status(self): + return None + + class FakeClient: + def __init__(self): + self.calls = 0 + + async def get(self, url): + self.calls += 1 + await asyncio.sleep(0) + return FakeResponse() + + async def scenario(): + client = FakeClient() + pdf_cache = {} + pdf_cache_lock = asyncio.Lock() + first_bytes, first_hit = await fetch_policy_pdf_bytes( + client, + "https://csrc.nist.gov/example.pdf", + pdf_cache, + pdf_cache_lock, + ) + second_bytes, second_hit = await fetch_policy_pdf_bytes( + client, + "https://csrc.nist.gov/example.pdf", + pdf_cache, + pdf_cache_lock, + ) + return client.calls, first_bytes, first_hit, second_bytes, second_hit + + calls, first_bytes, first_hit, second_bytes, second_hit = asyncio.run(scenario()) + + assert calls == 1, "Expected one network fetch for repeated policy URL" + assert first_bytes == b"%PDF-1.7 fixture", "First PDF fetch returned unexpected bytes" + assert second_bytes == first_bytes, "Second PDF fetch should reuse cached bytes" + assert first_hit is False, "First PDF fetch should not be a cache hit" + assert second_hit is True, "Second PDF fetch should be a cache hit" + + print("✓ Policy PDF cache reuse test passed") + + +def test_process_certificate_record_applies_cached_algorithm_provenance(): + """Cached algorithm reuse should still attach explicit provenance to outputs.""" + module = { + "Certificate Number": "5238", + "Vendor Name": "SUSE LLC", + "Module Name": "SUSE Linux Enterprise OpenSSL 1 Cryptographic Module", + "Module Type": "Software", + "Validation Date": "04/10/2026", + "security_policy_url": "https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5238.pdf", + "certificate_detail_url": "https://csrc.nist.gov/projects/cryptographic-module-validation-program/certificate/5238", + } + previous_detail = { + "certificate_number": "5238", + "software_versions": "3.0.9", + "hardware_versions": None, + "firmware_versions": None, + "security_policy_url": module["security_policy_url"], + "algorithms": ["AES", "HMAC"], + "algorithms_detailed": ["AES-CBC A1", "HMAC SHA2-256 A1"], + "algorithm_extraction": { + "source": "crawl4ai", + "source_url": module["security_policy_url"], + }, + } + previous_metadata = { + "algorithm_source": "crawl4ai", + "algorithm_cache_version": ALGORITHM_CACHE_VERSION, + } + + module_out, detail_payload, categories, stats = asyncio.run( + process_certificate_record( + module, + "active", + "2026-04-12T03:10:00.961597Z", + "crawl4ai", + module, + previous_detail, + previous_metadata, + None, + asyncio.Semaphore(1), + asyncio.Semaphore(1), + {}, + asyncio.Lock(), + {}, + ) + ) + + assert categories == ["AES", "HMAC"], "Cached categories should be reused" + assert module_out["algorithm_extraction"]["status"] == "cached", "Module should record cached extraction status" + assert module_out["algorithm_extraction"]["source"] == "crawl4ai", "Cached source should be preserved" + assert detail_payload["algorithm_extraction"]["cached"] is True, "Detail should record cache provenance" + assert detail_payload["algorithm_extraction"]["algorithm_count"] == 2, "Detail algorithm count mismatch" + assert stats["pdf_reused"] == 1, "Cached algorithm reuse should increment pdf_reused" + assert stats["algorithm_cache_hits"] == 1, "Cached algorithm reuse should increment cache hits" + + print("✓ Cached algorithm provenance application test passed") + + def test_prune_orphan_certificate_details(): """Test that stale certificate detail files are removed only for missing certs.""" with tempfile.TemporaryDirectory() as temp_dir: @@ -682,6 +880,15 @@ def test_prune_orphan_certificate_details(): print("✓ Orphan certificate cleanup test passed") +def test_validate_generated_api_artifacts(): + """Current checked-in generated API artifacts should be internally consistent.""" + errors = validate_api(Path(".")) + + assert errors == [], "Generated API artifact validation failed:\n" + "\n".join(errors[:20]) + + print("✓ Generated API artifact validation test passed") + + def test_generate_agent_docs(): """Test the generated agent-friendly documentation artifacts.""" metadata = { @@ -694,6 +901,12 @@ def test_generate_agent_docs(): "source": "https://csrc.nist.gov/projects/cryptographic-module-validation-program/validated-modules/search", "modules_in_process_source": "https://csrc.nist.gov/Projects/cryptographic-module-validation-program/modules-in-process/modules-in-process-list", "algorithm_source": "crawl4ai", + "algorithm_cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_extraction_schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "extraction_metrics": build_extraction_metrics( + {"html_reused": 1, "pdf_reused": 1, "algorithm_cache_hits": 1}, + {"html_refreshed": 1, "pdf_refreshed": 1, "algorithm_successes": 1}, + ), "version": "3.0", } sample_module = { @@ -711,6 +924,18 @@ def test_generate_agent_docs(): "certificate_detail_url": "https://csrc.nist.gov/projects/cryptographic-module-validation-program/certificate/5238", "detail_available": True, "description": "OpenSSL is an open-source library of various cryptographic algorithms written mainly in C.", + "algorithm_extraction": { + "schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "status": "cached", + "configured_source": "crawl4ai", + "source": "crawl4ai", + "source_url": "https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5238.pdf", + "cached": True, + "fallback_used": False, + "cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_count": 3, + "detailed_algorithm_count": 0, + }, } sample_detail = { "certificate_number": "5238", @@ -744,6 +969,25 @@ def test_generate_agent_docs(): {"date": "4/10/2026", "type": "Initial", "lab": "Example Lab"} ], "algorithms": ["AES", "HMAC", "RSA"], + "algorithm_extraction": { + "schema_version": ALGORITHM_EXTRACTION_SCHEMA_VERSION, + "status": "parsed", + "configured_source": "crawl4ai", + "source": "crawl4ai", + "source_url": "https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5238.pdf", + "cached": False, + "fallback_used": False, + "cache_version": ALGORITHM_CACHE_VERSION, + "algorithm_count": 3, + "detailed_algorithm_count": 12, + "attempts": [ + { + "source": "crawl4ai", + "url": "https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5238.pdf", + "status": "parsed", + } + ], + }, } algorithms_summary = { "total_unique_algorithms": 45, @@ -766,11 +1010,28 @@ def test_generate_agent_docs(): assert "api/docs.md" in artifacts, "Missing Markdown API docs artifact" assert "api/algorithms.json" in artifacts["llms.txt"], "llms.txt should reference algorithms endpoint when available" assert 'href="api/docs.md"' in artifacts["index.html"], "Homepage should link to api/docs.md" + assert 'href="api/schemas/index.schema.json"' in artifacts["index.html"], "Homepage should link to JSON schemas" assert "GET api/certificates/{certificate}.json" in artifacts["api/docs.md"], "API docs should include certificate detail endpoint" + assert "GET api/schemas/index.schema.json" in artifacts["api/docs.md"], "API docs should include JSON schema endpoint" + assert "algorithm_extraction" in artifacts["api/docs.md"], "API docs should describe extraction provenance" index_payload = build_index_payload(metadata, algorithms_summary) assert index_payload["documentation"]["llms_full_txt"] == "/llms-full.txt", "Index payload should advertise llms-full.txt" + assert index_payload["documentation"]["json_schemas"] == "/api/schemas/index.schema.json", "Index payload should advertise JSON schemas" + assert index_payload["schemas"]["certificate_detail"] == "/api/schemas/certificate-detail.schema.json", "Index payload should advertise certificate detail schema" assert index_payload["features"]["markdown_api_docs"] is True, "Index payload should advertise Markdown docs support" + assert index_payload["features"]["algorithm_extraction_provenance"] is True, "Index payload should advertise extraction provenance" + assert index_payload["features"]["extraction_metrics"] is True, "Index payload should advertise extraction metrics" + assert index_payload["features"]["json_schemas"] is True, "Index payload should advertise JSON schema support" + + schema_artifacts = generate_json_schema_artifacts(algorithms_summary) + assert "api/schemas/modules.schema.json" in schema_artifacts, "Missing modules JSON schema" + assert "api/schemas/module-in-process.schema.json" in schema_artifacts, "Missing module-in-process JSON schema" + assert "api/schemas/certificate-detail.schema.json" in schema_artifacts, "Missing certificate detail JSON schema" + assert "api/schemas/algorithms.schema.json" in schema_artifacts, "Missing algorithms JSON schema" + assert schema_artifacts["api/schemas/modules-in-process.schema.json"]["properties"]["modules_in_process"]["items"]["$ref"] == "/api/schemas/module-in-process.schema.json", "Modules-in-process response should use its own row schema" + assert schema_artifacts["api/schemas/module.schema.json"]["properties"]["algorithm_extraction"]["type"] == "object", "Module schema should include extraction provenance" + assert schema_artifacts["api/schemas/certificate-detail.schema.json"]["properties"]["certificate"]["properties"]["algorithm_extraction"]["type"] == "object", "Certificate detail schema should include extraction provenance" openapi = generate_openapi_spec( [sample_module], @@ -782,10 +1043,14 @@ def test_generate_agent_docs(): assert openapi["components"]["schemas"]["Module"]["properties"]["detail_available"]["type"] == "boolean", "detail_available should be typed as boolean" module_properties = openapi["components"]["schemas"]["Module"]["properties"] certificate_properties = openapi["components"]["schemas"]["CertificateDetail"]["properties"] + metadata_properties = openapi["components"]["schemas"]["Metadata"]["properties"] for key in ("software_versions", "hardware_versions", "firmware_versions"): assert key in module_properties, f"OpenAPI module schema should include {key}" assert key in certificate_properties, f"OpenAPI certificate detail schema should include {key}" assert module_properties[key]["nullable"] is True, f"OpenAPI module schema should mark {key} nullable" + assert "algorithm_extraction" in module_properties, "OpenAPI module schema should include algorithm_extraction" + assert "algorithm_extraction" in certificate_properties, "OpenAPI certificate schema should include algorithm_extraction" + assert "extraction_metrics" in metadata_properties, "OpenAPI metadata schema should include extraction metrics" print("✓ Agent-friendly docs generation test passed") @@ -808,13 +1073,19 @@ def main(): test_parse_algorithms_from_policy_text() test_parse_algorithms_from_legacy_policy_text() test_extract_legacy_algorithm_section_prefers_body_over_toc() + test_parse_real_world_fips_140_3_policy_fixture() + test_parse_real_world_fips_140_2_policy_fixture() test_parse_algorithms_from_policy_markdown() test_extract_text_from_crawl4ai_html() test_extract_text_from_crawl4ai_process_result() test_select_algorithm_source() test_build_certificate_fingerprint() test_should_reuse_cached_algorithms() + test_algorithm_extraction_provenance_and_metrics() + test_fetch_policy_pdf_bytes_reuses_in_run_cache() + test_process_certificate_record_applies_cached_algorithm_provenance() test_prune_orphan_certificate_details() + test_validate_generated_api_artifacts() test_generate_agent_docs() print() diff --git a/tests/fixtures/nist_security_policies/5152_fips_140_2_algorithms.txt b/tests/fixtures/nist_security_policies/5152_fips_140_2_algorithms.txt new file mode 100644 index 000000000..e56801a1e --- /dev/null +++ b/tests/fixtures/nist_security_policies/5152_fips_140_2_algorithms.txt @@ -0,0 +1,21 @@ +Source: https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5152.pdf +Certificate: 5152 +Standard: FIPS 140-2 + +3.4 Algorithms +Table 10 lists the FIPS Approved cryptographic algorithms used by the module. +Algorithm +AES Cert. #A3424 +DRBG Cert. #A3424 +ECDSA Cert. #A3424 +HMAC Cert. #A3424 +KAS Cert. #A3424 +KDF TLS +RSA Cert. #A3424 +SHS Cert. #A3424 +SSH KDF + +3.5 Allowed Algorithms +Table 11 describes the non-approved but allowed algorithms in FIPS mode. +Algorithm +Triple-DES diff --git a/tests/fixtures/nist_security_policies/5260_fips_140_3_algorithms.txt b/tests/fixtures/nist_security_policies/5260_fips_140_3_algorithms.txt new file mode 100644 index 000000000..4aaa824ce --- /dev/null +++ b/tests/fixtures/nist_security_policies/5260_fips_140_3_algorithms.txt @@ -0,0 +1,17 @@ +Source: https://csrc.nist.gov/CSRC/media/projects/cryptographic-module-validation-program/documents/security-policies/140sp5260.pdf +Certificate: 5260 +Standard: FIPS 140-3 + +2.5 Algorithms +Approved Algorithms: +Cipher +Algorithm CAVP Cert Properties Reference +AES-CBC A4593 Direction - Decrypt, Encrypt Key Length - 128, 192, 256 SP 800-38A +AES-GCM A4593 Direction - Decrypt, Encrypt Key Length - 128, 192, 256 IV Generation - Internal SP 800-38D +Message Authentication +HMAC SHA2-256 A4593 Message Authentication FIPS 198-1 +Message Digest +SHA2-256 A4593 Message Digest FIPS 180-4 +Random Bit Generation +CTR_DRBG A4593 Deterministic Random Bit Generation SP 800-90A +2.6 Security Function Implementations diff --git a/validate_api.py b/validate_api.py new file mode 100644 index 000000000..489dda664 --- /dev/null +++ b/validate_api.py @@ -0,0 +1,508 @@ +#!/usr/bin/env python3 +"""Validate generated static API artifacts for internal consistency.""" + +import argparse +import json +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Sequence, Set, Tuple + + +REQUIRED_TOP_LEVEL_FILES = ( + "api/modules.json", + "api/historical-modules.json", + "api/modules-in-process.json", + "api/metadata.json", + "api/index.json", + "openapi.json", + "llms.txt", + "llms-full.txt", + "api/docs.md", + "index.html", +) + +DETAIL_REQUIRED_FIELDS = ( + "certificate_number", + "dataset", + "generated_at", + "nist_page_url", + "certificate_detail_url", + "security_policy_url", + "vendor_name", + "module_name", + "standard", + "status", + "related_files", + "validation_history", + "vendor", +) + +CURRENT_SCHEMA_DETAIL_FIELDS = ( + "software_versions", + "hardware_versions", + "firmware_versions", + "algorithm_extraction", +) + +ALGORITHM_EXTRACTION_REQUIRED_FIELDS = ( + "schema_version", + "status", + "configured_source", + "source", + "source_url", + "cached", + "fallback_used", + "cache_version", + "algorithm_count", + "detailed_algorithm_count", +) + +ALGORITHM_EXTRACTION_STATUSES = {"parsed", "cached", "miss", "skipped"} + +JSON_SCHEMA_FILES = ( + "api/schemas/index.schema.json", + "api/schemas/metadata.schema.json", + "api/schemas/module.schema.json", + "api/schemas/module-in-process.schema.json", + "api/schemas/modules.schema.json", + "api/schemas/historical-modules.schema.json", + "api/schemas/modules-in-process.schema.json", + "api/schemas/certificate-detail.schema.json", +) + + +def load_json(path: Path, errors: List[str]) -> Optional[Dict]: + """Load a JSON file and append a validation error on failure.""" + try: + with path.open("r", encoding="utf-8") as handle: + payload = json.load(handle) + except Exception as exc: + errors.append(f"{path}: failed to load JSON: {exc}") + return None + + if not isinstance(payload, dict): + errors.append(f"{path}: top-level JSON value must be an object") + return None + return payload + + +def parse_certificate_number(record: Dict) -> Optional[int]: + """Return a numeric certificate number from a module or detail record.""" + for key in ("Certificate Number", "certificate_number"): + value = str(record.get(key, "")).strip() + if value.isdigit(): + return int(value) + return None + + +def add_error(errors: List[str], condition: bool, message: str) -> None: + """Append message when condition is false.""" + if not condition: + errors.append(message) + + +def count_certificate_algorithm_pairs(cert_algorithms: Dict[int, List[str]]) -> int: + """Count certificate/algorithm pairs from module rows.""" + return sum(len(algorithms) for algorithms in cert_algorithms.values()) + + +def build_expected_algorithm_index(cert_algorithms: Dict[int, List[str]]) -> Dict[str, Set[int]]: + """Build algorithm -> certificate set from module rows.""" + expected: Dict[str, Set[int]] = {} + for cert_number, algorithms in cert_algorithms.items(): + for algorithm in algorithms: + expected.setdefault(algorithm, set()).add(cert_number) + return expected + + +def validate_algorithm_extraction( + record: Dict, + label: str, + require_current_schema: bool, + errors: List[str], +) -> None: + """Validate an optional per-certificate algorithm_extraction object.""" + extraction = record.get("algorithm_extraction") + if extraction is None: + if require_current_schema: + errors.append(f"{label}: missing algorithm_extraction") + return + + if not isinstance(extraction, dict): + errors.append(f"{label}: algorithm_extraction must be an object") + return + + for field in ALGORITHM_EXTRACTION_REQUIRED_FIELDS: + add_error(errors, field in extraction, f"{label}: algorithm_extraction missing {field}") + + status = extraction.get("status") + add_error( + errors, + status in ALGORITHM_EXTRACTION_STATUSES, + f"{label}: invalid algorithm_extraction.status {status!r}", + ) + + algorithms = record.get("algorithms") or [] + detailed = record.get("algorithms_detailed") or [] + if isinstance(extraction.get("algorithm_count"), int): + add_error( + errors, + extraction["algorithm_count"] == len(algorithms), + f"{label}: algorithm_extraction.algorithm_count does not match algorithms length", + ) + if isinstance(extraction.get("detailed_algorithm_count"), int): + add_error( + errors, + extraction["detailed_algorithm_count"] == len(detailed), + f"{label}: algorithm_extraction.detailed_algorithm_count does not match algorithms_detailed length", + ) + + +def validate_module_rows( + rows: Iterable[Dict], + dataset: str, + errors: List[str], + require_current_schema: bool, +) -> Tuple[Dict[int, str], Dict[int, List[str]]]: + """Validate active or historical module rows and return cert metadata.""" + cert_datasets: Dict[int, str] = {} + cert_algorithms: Dict[int, List[str]] = {} + + for index, row in enumerate(rows): + label = f"{dataset} modules[{index}]" + cert_number = parse_certificate_number(row) + if cert_number is None: + errors.append(f"{label}: missing numeric Certificate Number") + continue + + if cert_number in cert_datasets: + errors.append(f"{label}: duplicate certificate {cert_number}") + cert_datasets[cert_number] = dataset + + for field in ("Vendor Name", "Module Name"): + add_error(errors, field in row, f"{label}: missing {field}") + for field in ("security_policy_url", "certificate_detail_url"): + add_error(errors, bool(row.get(field)), f"{label}: missing {field}") + add_error(errors, row.get("detail_available") is True, f"{label}: detail_available is not true") + + algorithms = row.get("algorithms") or [] + if algorithms: + add_error(errors, isinstance(algorithms, list), f"{label}: algorithms must be a list") + cert_algorithms[cert_number] = algorithms + validate_algorithm_extraction(row, label, require_current_schema, errors) + + return cert_datasets, cert_algorithms + + +def validate_certificate_details( + detail_dir: Path, + expected_datasets: Dict[int, str], + expected_algorithms: Dict[int, List[str]], + errors: List[str], + require_current_schema: bool, +) -> None: + """Validate per-certificate detail files.""" + detail_files = sorted(detail_dir.glob("*.json")) + found_certificates: Set[int] = set() + + for filepath in detail_files: + label = str(filepath) + if not filepath.stem.isdigit(): + errors.append(f"{label}: certificate detail filename must be numeric") + continue + + file_cert_number = int(filepath.stem) + payload = load_json(filepath, errors) + if payload is None: + continue + + metadata = payload.get("metadata") + certificate = payload.get("certificate") + add_error(errors, isinstance(metadata, dict), f"{label}: metadata must be an object") + add_error(errors, isinstance(certificate, dict), f"{label}: certificate must be an object") + if not isinstance(certificate, dict): + continue + + cert_number = parse_certificate_number(certificate) + add_error(errors, cert_number == file_cert_number, f"{label}: certificate_number does not match filename") + if cert_number is None: + continue + + found_certificates.add(cert_number) + expected_dataset = expected_datasets.get(cert_number) + add_error(errors, expected_dataset is not None, f"{label}: certificate is not in active or historical modules") + add_error(errors, certificate.get("dataset") == expected_dataset, f"{label}: dataset does not match module list") + + for field in DETAIL_REQUIRED_FIELDS: + add_error(errors, field in certificate, f"{label}: certificate missing {field}") + if require_current_schema: + for field in CURRENT_SCHEMA_DETAIL_FIELDS: + add_error(errors, field in certificate, f"{label}: certificate missing current schema field {field}") + + add_error(errors, isinstance(certificate.get("related_files"), list), f"{label}: related_files must be a list") + add_error(errors, isinstance(certificate.get("validation_history"), list), f"{label}: validation_history must be a list") + add_error(errors, isinstance(certificate.get("vendor"), dict), f"{label}: vendor must be an object") + + expected_detail_algorithms = expected_algorithms.get(cert_number, []) + actual_detail_algorithms = certificate.get("algorithms") or [] + add_error( + errors, + actual_detail_algorithms == expected_detail_algorithms, + f"{label}: detail algorithms do not match module row algorithms", + ) + validate_algorithm_extraction(certificate, label, require_current_schema, errors) + + missing_details = sorted(set(expected_datasets) - found_certificates) + orphan_details = sorted(found_certificates - set(expected_datasets)) + if missing_details: + errors.append(f"api/certificates: missing detail files for {len(missing_details)} certificates; first={missing_details[:5]}") + if orphan_details: + errors.append(f"api/certificates: found {len(orphan_details)} orphan detail files; first={orphan_details[:5]}") + + +def validate_algorithms_summary( + root: Path, + metadata: Dict, + expected_cert_algorithms: Dict[int, List[str]], + errors: List[str], +) -> None: + """Validate api/algorithms.json against module row algorithm fields.""" + algorithms_path = root / "api" / "algorithms.json" + expected_total = metadata.get("total_certificates_with_algorithms", 0) + + if expected_total == 0: + add_error(errors, not algorithms_path.exists(), "api/algorithms.json exists despite zero algorithm coverage") + return + + summary = load_json(algorithms_path, errors) + if summary is None: + return + + algorithms = summary.get("algorithms") + add_error(errors, isinstance(algorithms, dict), "api/algorithms.json: algorithms must be an object") + if not isinstance(algorithms, dict): + return + + expected_index = build_expected_algorithm_index(expected_cert_algorithms) + add_error(errors, summary.get("total_unique_algorithms") == len(expected_index), "api/algorithms.json: total_unique_algorithms mismatch") + add_error( + errors, + summary.get("total_certificate_algorithm_pairs") == count_certificate_algorithm_pairs(expected_cert_algorithms), + "api/algorithms.json: total_certificate_algorithm_pairs mismatch", + ) + add_error(errors, expected_total == len(expected_cert_algorithms), "metadata: total_certificates_with_algorithms mismatch") + + for algorithm, expected_certs in expected_index.items(): + entry = algorithms.get(algorithm) + if not isinstance(entry, dict): + errors.append(f"api/algorithms.json: missing algorithm {algorithm}") + continue + certs = entry.get("certificates") + add_error(errors, isinstance(certs, list), f"api/algorithms.json: {algorithm}.certificates must be a list") + if not isinstance(certs, list): + continue + add_error(errors, entry.get("count") == len(certs), f"api/algorithms.json: {algorithm}.count mismatch") + add_error(errors, len(certs) == len(set(certs)), f"api/algorithms.json: {algorithm}.certificates has duplicates") + add_error(errors, set(certs) == expected_certs, f"api/algorithms.json: {algorithm}.certificates mismatch") + + extra_algorithms = sorted(set(algorithms) - set(expected_index)) + if extra_algorithms: + errors.append(f"api/algorithms.json: unexpected algorithms present: {extra_algorithms[:5]}") + + +def validate_docs_and_index( + root: Path, + metadata: Dict, + has_algorithms: bool, + errors: List[str], + require_current_schema: bool, +) -> None: + """Validate API index, OpenAPI, and docs artifacts at a structural level.""" + index = load_json(root / "api" / "index.json", errors) + if index: + for key in ( + "total_modules", + "total_historical_modules", + "total_modules_in_process", + "total_certificates_with_algorithms", + "total_certificate_details", + ): + add_error(errors, index.get(key) == metadata.get(key), f"api/index.json: {key} mismatch") + endpoints = index.get("endpoints") or {} + add_error(errors, isinstance(endpoints, dict), "api/index.json: endpoints must be an object") + if isinstance(endpoints, dict): + add_error(errors, ("algorithms" in endpoints) == has_algorithms, "api/index.json: algorithms endpoint presence mismatch") + features = index.get("features") or {} + if require_current_schema and isinstance(features, dict): + add_error(errors, features.get("algorithm_extraction_provenance") is True, "api/index.json: missing algorithm_extraction_provenance feature") + add_error(errors, features.get("extraction_metrics") is True, "api/index.json: missing extraction_metrics feature") + add_error(errors, features.get("json_schemas") is True, "api/index.json: missing json_schemas feature") + schemas = index.get("schemas") + add_error(errors, isinstance(schemas, dict), "api/index.json: schemas must be an object") + + openapi = load_json(root / "openapi.json", errors) + if openapi: + paths = openapi.get("paths") or {} + for path in ( + "/api/index.json", + "/api/metadata.json", + "/api/modules.json", + "/api/historical-modules.json", + "/api/modules-in-process.json", + "/api/certificates/{certificate}.json", + ): + add_error(errors, path in paths, f"openapi.json: missing path {path}") + add_error(errors, ("/api/algorithms.json" in paths) == has_algorithms, "openapi.json: algorithms path presence mismatch") + + for doc_path, required_text in ( + ("README.md", "certificates/{certificate}.json"), + ("llms.txt", "api/metadata.json"), + ("llms-full.txt", "GET api/certificates/{certificate}.json"), + ("api/docs.md", "GET api/certificates/{certificate}.json"), + ("index.html", "api/metadata.json"), + ): + path = root / doc_path + try: + content = path.read_text(encoding="utf-8") + except Exception as exc: + errors.append(f"{doc_path}: failed to read: {exc}") + continue + add_error(errors, bool(content.strip()), f"{doc_path}: empty documentation file") + add_error(errors, required_text in content, f"{doc_path}: missing expected text {required_text!r}") + if require_current_schema and doc_path in {"llms.txt", "api/docs.md", "index.html"}: + add_error(errors, "api/schemas/index.schema.json" in content, f"{doc_path}: missing JSON Schema link") + + if require_current_schema: + expected_schema_files = list(JSON_SCHEMA_FILES) + if has_algorithms: + expected_schema_files.append("api/schemas/algorithms.schema.json") + for relative_path in expected_schema_files: + schema = load_json(root / relative_path, errors) + if schema: + add_error(errors, schema.get("$schema") == "https://json-schema.org/draft/2020-12/schema", f"{relative_path}: missing JSON Schema draft marker") + add_error(errors, bool(schema.get("$id")), f"{relative_path}: missing $id") + add_error(errors, bool(schema.get("title")), f"{relative_path}: missing title") + if not has_algorithms: + add_error(errors, not (root / "api/schemas/algorithms.schema.json").exists(), "api/schemas/algorithms.schema.json exists despite zero algorithm coverage") + + +def validate_api( + root: Path = Path("."), + require_current_schema: bool = False, + forbid_firecrawl_run_source: bool = False, +) -> List[str]: + """Return a list of validation errors for generated API artifacts.""" + errors: List[str] = [] + root = root.resolve() + + for relative_path in REQUIRED_TOP_LEVEL_FILES: + add_error(errors, (root / relative_path).exists(), f"{relative_path}: missing required artifact") + + metadata = load_json(root / "api" / "metadata.json", errors) + modules_payload = load_json(root / "api" / "modules.json", errors) + historical_payload = load_json(root / "api" / "historical-modules.json", errors) + in_process_payload = load_json(root / "api" / "modules-in-process.json", errors) + if not all(isinstance(payload, dict) for payload in (metadata, modules_payload, historical_payload, in_process_payload)): + return errors + + assert metadata is not None and modules_payload is not None and historical_payload is not None and in_process_payload is not None + + for label, payload in ( + ("api/modules.json", modules_payload), + ("api/historical-modules.json", historical_payload), + ("api/modules-in-process.json", in_process_payload), + ): + add_error(errors, payload.get("metadata") == metadata, f"{label}: embedded metadata does not match api/metadata.json") + + modules = modules_payload.get("modules") + historical_modules = historical_payload.get("modules") + modules_in_process = in_process_payload.get("modules_in_process") + add_error(errors, isinstance(modules, list), "api/modules.json: modules must be a list") + add_error(errors, isinstance(historical_modules, list), "api/historical-modules.json: modules must be a list") + add_error(errors, isinstance(modules_in_process, list), "api/modules-in-process.json: modules_in_process must be a list") + if not isinstance(modules, list) or not isinstance(historical_modules, list) or not isinstance(modules_in_process, list): + return errors + + add_error(errors, metadata.get("total_modules") == len(modules), "metadata: total_modules mismatch") + add_error(errors, metadata.get("total_historical_modules") == len(historical_modules), "metadata: total_historical_modules mismatch") + add_error(errors, metadata.get("total_modules_in_process") == len(modules_in_process), "metadata: total_modules_in_process mismatch") + + active_datasets, active_algorithms = validate_module_rows(modules, "active", errors, require_current_schema) + historical_datasets, historical_algorithms = validate_module_rows(historical_modules, "historical", errors, require_current_schema) + overlapping_certs = sorted(set(active_datasets) & set(historical_datasets)) + if overlapping_certs: + errors.append(f"active/historical modules: duplicate certificate numbers across datasets: {overlapping_certs[:5]}") + + expected_datasets = {**active_datasets, **historical_datasets} + expected_algorithms = {**active_algorithms, **historical_algorithms} + add_error(errors, metadata.get("total_certificate_details") == len(expected_datasets), "metadata: total_certificate_details mismatch") + + if require_current_schema: + add_error(errors, "algorithm_extraction_schema_version" in metadata, "metadata: missing algorithm_extraction_schema_version") + add_error(errors, "extraction_metrics" in metadata, "metadata: missing extraction_metrics") + + if forbid_firecrawl_run_source: + add_error(errors, metadata.get("algorithm_source") != "firecrawl", "metadata: algorithm_source must not be firecrawl") + + validate_certificate_details( + root / "api" / "certificates", + expected_datasets, + expected_algorithms, + errors, + require_current_schema, + ) + validate_algorithms_summary(root, metadata, expected_algorithms, errors) + validate_docs_and_index( + root, + metadata, + bool(expected_algorithms), + errors, + require_current_schema, + ) + + if forbid_firecrawl_run_source and metadata.get("total_certificates_with_algorithms", 0): + algorithms_metadata = load_json(root / "api" / "algorithms.json", errors) + if algorithms_metadata: + nested_metadata = algorithms_metadata.get("metadata") or {} + add_error(errors, nested_metadata.get("source") != "firecrawl", "api/algorithms.json: metadata.source must not be firecrawl") + add_error(errors, nested_metadata.get("algorithm_source") != "firecrawl", "api/algorithms.json: metadata.algorithm_source must not be firecrawl") + + return errors + + +def parse_args(argv: Optional[Sequence[str]] = None) -> argparse.Namespace: + """Parse CLI arguments.""" + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--root", default=".", help="Repository root containing generated API artifacts") + parser.add_argument( + "--require-current-schema", + action="store_true", + help="Require fields generated by the current scraper schema, including extraction provenance", + ) + parser.add_argument( + "--forbid-firecrawl-run-source", + action="store_true", + help="Fail if the current run metadata says algorithm extraction used Firecrawl", + ) + return parser.parse_args(argv) + + +def main(argv: Optional[Sequence[str]] = None) -> int: + """CLI entry point.""" + args = parse_args(argv) + errors = validate_api( + Path(args.root), + require_current_schema=args.require_current_schema, + forbid_firecrawl_run_source=args.forbid_firecrawl_run_source, + ) + if errors: + print("API artifact validation failed:") + for error in errors: + print(f"- {error}") + return 1 + + print("API artifact validation passed.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())