diff --git a/.env.example b/.env.example index 155c3d7..18c0c4b 100644 --- a/.env.example +++ b/.env.example @@ -24,3 +24,13 @@ REDIS_URL= # Environment ENVIRONMENT=development LOG_LEVEL=INFO + +# IAB Diligence Platform Integration (via SafeGuard Privacy) +# optional; inert when SGP_API_KEY is empty +SGP_API_KEY= +# Staging Environment: https://api.safeguardprivacy-demo.com +SGP_BASE_URL=https://api.safeguardprivacy.com +SGP_ENFORCE=false +# block | warn | allow +SGP_UNKNOWN_VENDOR_POLICY=block +SGP_CACHE_TTL_SECONDS=900 diff --git a/README.md b/README.md index 885a081..ac70461 100644 --- a/README.md +++ b/README.md @@ -118,6 +118,12 @@ Reveal buyer identity progressively to unlock better pricing from sellers: → [Authentication Guide](https://iabtechlab.github.io/buyer-agent/api/authentication/) +### Vendor Approval Gating (optional) + +Plug in an [IAB Diligence Platform](https://safeguardprivacy.com) tenant to keep unapproved sellers out of the buyer-agent workflow. Consults the `iabBuyerAgentApproval` flag via SGP's integration API. When `SGP_ENFORCE=true`, `DiscoverInventoryTool` filters NOT APPROVED vendors out of search results before the agent ever sees them, and `RequestDealTool` enforces the same check as a safety net at Deal ID time. With enforcement off, products are annotated APPROVED / NOT APPROVED / UNKNOWN but never filtered. Off by default — inert when `SGP_API_KEY` is empty. + +→ [IAB Diligence Platform Approval](https://iabtechlab.github.io/buyer-agent/integration/iab-diligence-platform/) + ## Quick Start ### Install diff --git a/docs/guides/configuration.md b/docs/guides/configuration.md index 2f908bb..5f927a4 100644 --- a/docs/guides/configuration.md +++ b/docs/guides/configuration.md @@ -175,6 +175,22 @@ CORS_ALLOWED_ORIGINS=https://dashboard.example.com,https://app.example.com --- +### IAB Diligence Platform Approval + +Optional integration that gates deal requests against the buyer's [IAB Diligence Platform](https://safeguardprivacy.com/iab-diligence-platform/) vendor portfolio. Inert when `SGP_API_KEY` is empty. + +| Variable | Type | Default | Description | +|----------|------|---------|-------------| +| `SGP_API_KEY` | `str` | `""` | API key with the `iab:buyerAgent` scope. Empty = integration disabled. | +| `SGP_BASE_URL` | `str` | `https://api.safeguardprivacy.com` | SGP base URL. Staging: `https://api.safeguardprivacy-demo.com`. | +| `SGP_ENFORCE` | `bool` | `False` | When `True`, NOT APPROVED vendors are filtered out at discovery and Deal ID generation is blocked for them. SGP transport errors halt the flow. | +| `SGP_UNKNOWN_VENDOR_POLICY` | `str` | `block` | Behavior when the vendor is not in the buyer's SGP portfolio (HTTP 404). One of `block`, `warn`, `allow`. | +| `SGP_CACHE_TTL_SECONDS` | `int` | `900` | Per-domain cache lifetime for approval lookups. | + +See the [IAB Diligence Platform Approval](../integration/iab-diligence-platform.md) integration guide for endpoint contract, behavior matrix, and troubleshooting. + +--- + ### Environment | Variable | Type | Default | Description | diff --git a/docs/integration/iab-diligence-platform.md b/docs/integration/iab-diligence-platform.md new file mode 100644 index 0000000..7fd3d48 --- /dev/null +++ b/docs/integration/iab-diligence-platform.md @@ -0,0 +1,171 @@ +# IAB Diligence Platform + +The buyer agent can verify, before issuing a Deal ID, that the buyer has explicitly approved a seller's vendor record for IAB buyer-agent purchases. Approvals are stored in the buyer's [IAB Diligence Platform](https://safeguardprivacy.com/iab-diligence-platform/) tenant; the buyer agent consults them through SGP's integration API. + +This integration is **optional and off by default**. When `SGP_API_KEY` is empty the feature is fully inert — the buyer agent behaves exactly as it did before this page existed. Once configured, it acts as a privacy rail in front of the existing deal workflow. + +## Who should enable this + +IAB Diligence Platform customers who treat vendor onboarding and approval as a compliance prerequisite for programmatic buying. If your team already maintains a vendor inventory in SGP with IAB buyer-agent approval flags, this integration enforces that workflow inside the buyer agent itself. + +## Endpoint contract + +The client calls a single endpoint on the IAB Diligence Platform (SafeGuard Privacy API): + +``` +GET /api/v1/integrations/iab/buyer-agent-approval?domain=a.com,b.com +``` + +| Property | Value | +|--------------|---------------------------------------------------------| +| Auth | `api-key` header | +| Domain | `domain` query parameter - Up to 10 domains per request | +| Tenant scope | Results are scoped to the caller's SGP tenant | + +The response contains one `IabBuyerAgentResource` per matched vendor: + +```json +{ + "status": "success", + "code": 200, + "data": [ +{ + "vendorId": 123, + "vendorCompanyId": 456, + "companyName": "Example Publisher", + "domain": "example.com", + "iabBuyerAgentApproval": true, + "iabBuyerAgentApprovedAt": "2026-03-14T12:00:00Z" + } + ] +} +``` + +Three response states matter to the buyer agent: + +| State | Meaning | How the gate treats it | +|-------|---------|------------------------| +| `iabBuyerAgentApproval: true` | Buyer has approved this vendor | ✅ Deal proceeds | +| `iabBuyerAgentApproval: false` | Vendor exists but is not approved | ❌ Deal blocked | +| HTTP 404 | Vendor is not in the buyer's SGP portfolio | Governed by `SGP_UNKNOWN_VENDOR_POLICY` | + +## Configuration + +| Variable | Type | Default | Description | +|----------|------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `SGP_API_KEY` | `str` | `""` | API key from the SGP api. Empty = integration disabled. | +| `SGP_BASE_URL` | `str` | `https://api.safeguardprivacy.com` | Production endpoint. The staging environment is `https://api.safeguardprivacy-demo.com`. | +| `SGP_ENFORCE` | `bool` | `False` | When `True`, NOT APPROVED vendors are filtered out at discovery, the deal-request gate blocks Deal ID generation, and SGP transport errors halt the flow. | +| `SGP_UNKNOWN_VENDOR_POLICY` | `str` | `"block"` | Behavior for domains not in the SGP portfolio (HTTP 404). One of `block`, `warn`, `allow`. Applies at both discovery and deal-request stages when enforcement is on. | +| `SGP_CACHE_TTL_SECONDS` | `int` | `900` | Per-domain cache lifetime. Discovery→pricing→booking reuse a single SGP call within the TTL. | + +!!! warning "Enforcement without a key is a no-op" + If `SGP_ENFORCE=true` but `SGP_API_KEY` is empty, the gate cannot be evaluated and is silently bypassed. The buyer agent logs a warning at flow construction time so this misconfiguration is visible. + +## Where the gate runs + +The integration plugs into two existing buyer-agent tools. Behavior at each stage is governed by the same `SGP_ENFORCE` flag. + +### Inventory discovery + +`DiscoverInventoryTool` accepts an optional `SGPClient`. When provided, it extracts the seller domain from each returned product (checking `seller_url`, `publisher_domain`, then `publisherId`/`publisher` if they contain a `.`), batches distinct domains into groups of 10, and annotates each product row in the formatted output: + +``` +1. Premium CTV - Sports + Product ID: ctv-premium-sports + Publisher: premium-pub-001 + CPM: $28.26 (was $35.00) + SGP Approval: ✓ APPROVED — seller.example.com +``` + +Behavior depends on `SGP_ENFORCE`: + +| `SGP_ENFORCE` | NOT APPROVED rows | Unknown vendors | Missing seller domain | SGP transport error | +|---|---|---|---|---| +| `false` (annotate only) | kept + annotated | kept + annotated | kept (no annotation) | logged, no annotations | +| `true` (filter) | **filtered out** | governed by `SGP_UNKNOWN_VENDOR_POLICY` | filtered out | flow halts (fails closed) | + +When enforcement removes any products, a tail line is appended so the action is auditable: + +``` +-------------------------------------------------- +Total products found: 4 +SGP enforcement filtered 2 product(s): 1 not approved, 1 unknown to SGP +``` + +### Deal-request gate + +`RequestDealTool` checks the seller's vendor approval after fetching product details and before generating a Deal ID. The gate acts as a safety net behind discovery filtering — it runs only when an `SGPClient` is wired in and `sgp_enforce=True`: + +```python +# Injected automatically by BuyerDealFlow from settings +RequestDealTool( + client=unified_client, + buyer_context=ctx, + sgp_client=sgp_client, + sgp_enforce=settings.sgp_enforce, + sgp_unknown_policy=settings.sgp_unknown_vendor_policy, +) +``` + +A successful gate prepends a banner to the Deal ID response: + +``` +SGP: ✓ Example Publisher approved for IAB buyer-agent purchases (since 2026-03-14T12:00:00Z). + +============================================================ +DEAL CREATED SUCCESSFULLY +============================================================ +... +``` + +A failed gate returns a blocking message and does **not** generate a Deal ID. + +## Behavior matrix + +With enforcement on (`SGP_ENFORCE=true`, `SGP_API_KEY` set), behavior is consistent across stages: + +| SGP response | `block` policy | `warn` policy | `allow` policy | +|---|---|---|---| +| `iabBuyerAgentApproval: true` | ✅ kept + approved banner | same | same | +| `iabBuyerAgentApproval: false` | ❌ filtered at discovery; blocked at request | ❌ | ❌ | +| 404 (not onboarded in SGP) | ❌ filtered at discovery; blocked at request | ✅ kept + warning annotation/banner | ✅ kept silently | +| Transport error | ❌ flow halts | ❌ flow halts | ❌ flow halts | +| Product has no seller domain field | ❌ filtered at discovery; blocked at request | ❌ | ❌ | + +The `iabBuyerAgentApproval: false` row is intentionally the same across all three unknown-vendor policies — an explicit non-approval is always fatal. The policies only govern the "unknown to SGP" case. + +## Agent tool + +For CrewAI agents that want to consult approvals outside the automatic gate, a tool is provided: + +```python +from ad_buyer.clients import SGPClient +from ad_buyer.tools.research import SGPVendorApprovalTool + +sgp = SGPClient(api_key=settings.sgp_api_key, base_url=settings.sgp_base_url) +tool = SGPVendorApprovalTool(client=sgp) + +# Agent calls it with a list of domains (any number; client chunks to 10) +# Returns a formatted APPROVED / NOT APPROVED / UNKNOWN summary. +``` + +`BuyerDealFlow` injects this tool into the Buyer Deal Specialist automatically when an SGP client is configured, so the agent can consult approval status during product selection (before commitment), not only at Deal ID generation time. + +The class is prefixed `SGP` so future vendor-approval integrations can coexist under their own class names and CrewAI `name` attributes without colliding. + +## Troubleshooting + +| Symptom | Likely cause | +|---------|------------------------------------------------------------------------------------------------------------------------------------------------| +| `IAB Diligence Platform rejected the api-key` (401) | The key is missing, revoked, or lacks the proper scope. Request a new key from SGP. | +| `Deal blocked: is not in your IAB Diligence Platform portfolio` | The vendor is not onboarded in SGP. Add and approve the vendor in SGP, or switch `SGP_UNKNOWN_VENDOR_POLICY` to `warn` for soft-fail behavior. | +| `Deal blocked: does not carry the IAB buyer-agent approval flag` | The vendor is onboarded but not marked approved for IAB buyer-agent purchases. Toggle the approval in SGP. | +| `Deal blocked: IAB Diligence Platform lookup failed` | SGP was unreachable or returned a transient error. Enforcement fails closed; retry once the service is reachable. | +| Gate seems to do nothing | Either `SGP_API_KEY` is empty or `SGP_ENFORCE=false`. Check startup logs for the bypass warning. | + +## Related + +- [Configuration reference](../guides/configuration.md) — all env vars including SGP +- [Buyer Deal Flow](../architecture/buyer-deal-flow.md) — the flow the gate plugs into +- [Seller Agent Integration](seller-agent.md) — the seller side of the deal request diff --git a/mkdocs.yml b/mkdocs.yml index e34225b..b4d6cc7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -104,6 +104,7 @@ nav: - Integration: - Seller Agent Guide: integration/seller-agent.md - OpenDirect Protocol: integration/opendirect.md + - IAB Diligence Platform Approval: integration/iab-diligence-platform.md - AI Assistant Setup: - Claude (Desktop & Web): claude-desktop-setup.md - ChatGPT, Codex & AI IDEs: multi-client-setup.md diff --git a/src/ad_buyer/clients/__init__.py b/src/ad_buyer/clients/__init__.py index 1c79355..e25a7c2 100644 --- a/src/ad_buyer/clients/__init__.py +++ b/src/ad_buyer/clients/__init__.py @@ -7,6 +7,7 @@ from .deals_client import DealsClient, DealsClientError from .mcp_client import IABMCPClient, MCPClientError, MCPToolResult from .opendirect_client import OpenDirectClient +from .sgp_client import SGPAuthError, SGPClient, SGPClientError from .ucp_client import UCPClient, UCPExchangeResult from .unified_client import Protocol, UnifiedClient, UnifiedResult @@ -31,4 +32,8 @@ # IAB Deals API v1.0 client (quote-then-book flow) "DealsClient", "DealsClientError", + # IAB Diligence Platform (SGP) approval gate + "SGPClient", + "SGPClientError", + "SGPAuthError", ] diff --git a/src/ad_buyer/clients/sgp_client.py b/src/ad_buyer/clients/sgp_client.py new file mode 100644 index 0000000..6c988f8 --- /dev/null +++ b/src/ad_buyer/clients/sgp_client.py @@ -0,0 +1,239 @@ +# Author: SafeGuard Privacy +# Donated to IAB Tech Lab + +"""IAB Diligence Platform (SGP) platform client. + +Async HTTP client for the IAB Diligence Platform integration API. Currently +exposes a single capability: checking whether a vendor has the IAB +buyer-agent approval flag set on the buyer's SGP tenant. + +Endpoint: + GET /api/v1/integrations/iab/buyer-agent-approval?domain=a.com,b.com + +Auth: api-key header, scope `iab:buyerAgent`. +Limit: up to 10 domains per call (SGP-enforced). +""" + +from __future__ import annotations + +import logging +import time +from urllib.parse import urlparse + +import httpx + +from ..models.sgp import ApprovalRecord + +logger = logging.getLogger(__name__) + +_DEFAULT_TIMEOUT = 15.0 +_MAX_BATCH = 10 +_RETRYABLE_STATUS_CODES = {502, 503, 504} +_ENDPOINT = "/api/v1/integrations/iab/buyer-agent-approval" + +# Ordered list of product-dict keys to probe when deriving a seller domain +# for an SGP approval lookup. Explicit domain fields come first; publisher +# identifiers are used only when they look like a hostname. +_DOMAIN_KEYS = ("seller_url", "sellerUrl", "publisher_domain", "publisherDomain") +_PUBLISHER_KEYS = ("publisherId", "publisher") + + +def extract_product_domain(product: dict) -> str | None: + """Best-guess seller domain from a product dict for an SGP lookup. + + Checks explicit domain/URL fields first, then falls back to + ``publisherId`` / ``publisher`` when those values contain a ``.`` + (i.e. look like a hostname rather than an opaque ID). Returns the + raw value; ``SGPClient.normalize_domain`` handles cleanup. + """ + for key in _DOMAIN_KEYS: + value = product.get(key) + if isinstance(value, str) and value: + return value + for key in _PUBLISHER_KEYS: + value = product.get(key) + if isinstance(value, str) and "." in value: + return value + return None + + +class SGPClientError(Exception): + """Error raised by SGPClient for API or transport failures.""" + + def __init__(self, message: str, status_code: int = 0) -> None: + super().__init__(message) + self.status_code = status_code + + +class SGPAuthError(SGPClientError): + """Raised on 401 — api-key missing, invalid, or lacks required scope.""" + + +class SGPClient: + """Async client for IAB Diligence Platform buyer-agent approval checks. + + Normalizes domains (strips scheme, www, port, lowercases), dedupes, + chunks into groups of 10, and caches per-domain results for + ``cache_ttl_seconds``. Returns a dict keyed by normalized domain; a + value of ``None`` means the vendor is unknown to SGP (HTTP 404 or + absent from the batch response). + + Args: + api_key: SGP API key with ``iab:buyerAgent`` scope. + base_url: SGP base URL. Defaults to production + (``https://api.safeguardprivacy.com``). The demo environment + is at ``https://api.safeguardprivacy-demo.com``. + timeout: Request timeout in seconds. + cache_ttl_seconds: How long to cache per-domain results. + """ + + def __init__( + self, + api_key: str, + base_url: str = "https://api.safeguardprivacy.com", + timeout: float = _DEFAULT_TIMEOUT, + cache_ttl_seconds: int = 900, + ) -> None: + self._api_key = api_key + self._base_url = base_url.rstrip("/") + self._timeout = timeout + self._cache_ttl = cache_ttl_seconds + self._cache: dict[str, tuple[float, ApprovalRecord | None]] = {} + self._http = httpx.AsyncClient( + base_url=self._base_url, + headers={"api-key": api_key}, + timeout=timeout, + ) + + async def aclose(self) -> None: + """Close the underlying httpx client.""" + await self._http.aclose() + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @staticmethod + def normalize_domain(value: str) -> str: + """Reduce a seller URL or raw domain to the form SGP accepts. + + Strips scheme, ``www.``, path, query, and port; lowercases. + Returns an empty string for inputs that yield no host. + """ + if not value: + return "" + raw = value.strip() + # urlparse needs a scheme to extract netloc reliably + if "://" not in raw: + raw = "http://" + raw + host = urlparse(raw).hostname or "" + host = host.lower() + if host.startswith("www."): + host = host[4:] + return host + + async def check_approvals(self, domains: list[str]) -> dict[str, ApprovalRecord | None]: + """Look up IAB buyer-agent approval for a list of domains. + + Args: + domains: Raw seller URLs or domains. Duplicates and invalid + entries are silently dropped. + + Returns: + Dict keyed by normalized domain. ``None`` value means the + vendor is unknown to SGP (not onboarded on the buyer's tenant). + """ + normalized = [self.normalize_domain(d) for d in domains] + normalized = [d for d in normalized if d] + if not normalized: + return {} + + now = time.monotonic() + result: dict[str, ApprovalRecord | None] = {} + to_fetch: list[str] = [] + + seen: set[str] = set() + for d in normalized: + if d in seen: + continue + seen.add(d) + cached = self._cache.get(d) + if cached and (now - cached[0]) < self._cache_ttl: + result[d] = cached[1] + else: + to_fetch.append(d) + + for i in range(0, len(to_fetch), _MAX_BATCH): + chunk = to_fetch[i : i + _MAX_BATCH] + chunk_result = await self._fetch_chunk(chunk) + stamp = time.monotonic() + for d in chunk: + record = chunk_result.get(d) + self._cache[d] = (stamp, record) + result[d] = record + + return result + + # ------------------------------------------------------------------ + # HTTP + # ------------------------------------------------------------------ + + async def _fetch_chunk(self, domains: list[str]) -> dict[str, ApprovalRecord | None]: + """Fetch approvals for up to 10 domains in a single HTTP call.""" + params = {"domain": ",".join(domains)} + try: + resp = await self._http.get(_ENDPOINT, params=params) + except httpx.RequestError as exc: + # Connection refused, timeout, DNS, read errors, etc. — surface + # as SGPClientError so callers catch it on a single type and + # the deal-request gate can fail closed. + raise SGPClientError( + f"IAB Diligence Platform request failed: {exc.__class__.__name__}: {exc}" + ) from exc + + if resp.status_code == 404: + # Entire batch unknown to SGP. + return {d: None for d in domains} + + if resp.status_code == 401: + raise SGPAuthError( + "IAB Diligence Platform rejected the api-key " + "(missing or lacks iab:buyerAgent scope)", + status_code=401, + ) + + if resp.status_code == 400: + raise SGPClientError( + f"IAB Diligence Platform rejected the request as malformed: {resp.text}", + status_code=400, + ) + + if resp.status_code in _RETRYABLE_STATUS_CODES or resp.status_code >= 500: + raise SGPClientError( + f"IAB Diligence Platform returned {resp.status_code}: {resp.text}", + status_code=resp.status_code, + ) + + if resp.status_code != 200: + raise SGPClientError( + f"Unexpected IAB Diligence Platform response {resp.status_code}: {resp.text}", + status_code=resp.status_code, + ) + + try: + payload = resp.json() + except ValueError as exc: + raise SGPClientError(f"SGP response was not JSON: {exc}") from None + + raw_records = payload.get("data") or [] + by_domain: dict[str, ApprovalRecord | None] = {d: None for d in domains} + for raw in raw_records: + try: + record = ApprovalRecord.model_validate(raw) + except (ValueError, TypeError): + logger.warning("Skipping malformed SGP record: %r", raw) + continue + domain_key = self.normalize_domain(record.domain) or record.domain.lower() + by_domain[domain_key] = record + + return by_domain diff --git a/src/ad_buyer/config/settings.py b/src/ad_buyer/config/settings.py index a349485..1092a4c 100644 --- a/src/ad_buyer/config/settings.py +++ b/src/ad_buyer/config/settings.py @@ -36,6 +36,21 @@ class Settings(BaseSettings): opendirect_token: str | None = None opendirect_api_key: str | None = None + # IAB Diligence Platform — vendor approval gate. + # The integration is inert when ``sgp_api_key`` is empty; enforcement + # only activates once an SGP API key is supplied AND ``sgp_enforce`` + # is true. When enforcing, NOT APPROVED vendors are filtered out at + # discovery and the request-stage gate acts as a safety net. + sgp_api_key: str = "" + # Production endpoint. For testing, use the demo environment: + # https://api.safeguardprivacy-demo.com + sgp_base_url: str = "https://api.safeguardprivacy.com" + sgp_enforce: bool = False + # Behavior when IAB Diligence Platform returns 404 for a seller domain (vendor + # not in the buyer's SGP portfolio). One of: "block", "warn", "allow". + sgp_unknown_vendor_policy: str = "block" + sgp_cache_ttl_seconds: int = 900 + def get_seller_endpoints(self) -> list[str]: """Parse seller endpoints from comma-separated string. diff --git a/src/ad_buyer/flows/buyer_deal_flow.py b/src/ad_buyer/flows/buyer_deal_flow.py index c5906b5..1952aad 100644 --- a/src/ad_buyer/flows/buyer_deal_flow.py +++ b/src/ad_buyer/flows/buyer_deal_flow.py @@ -22,7 +22,9 @@ from pydantic import BaseModel, Field from ..agents.level2.buyer_deal_specialist_agent import create_buyer_deal_specialist_agent +from ..clients.sgp_client import SGPClient from ..clients.unified_client import UnifiedClient +from ..config.settings import settings from ..models.audience_plan import AudiencePlan from ..time_utils import utc_now from ..models.buyer_identity import ( @@ -43,6 +45,7 @@ ) from ..storage.deal_store import DealStore from ..tools.buyer_deals import DiscoverInventoryTool, GetPricingTool, RequestDealTool +from ..tools.research import SGPVendorApprovalTool logger = logging.getLogger(__name__) @@ -171,6 +174,7 @@ def __init__( buyer_context: BuyerContext, store: Optional[DealStore] = None, brief: Optional[CampaignBrief] = None, + sgp_client: SGPClient | None = None, ): """Initialize the flow with client, buyer context, and optional persistence. @@ -184,6 +188,8 @@ def __init__( ``AudiencePlan`` is threaded onto seller-bound calls (proposal §5.3 / bead ar-ts30 §18). When None, the flow stays audience-blind for backward compatibility. + sgp_client: Optional IAB Diligence Platform client. When omitted, + one is built from settings if ``SGP_API_KEY`` is set. """ super().__init__() self._client = client @@ -198,10 +204,27 @@ def __init__( # CampaignPipeline.get_audience_planner_result on Path A). self._audience_planner_result: Optional[AudiencePlannerResult] = None + if sgp_client is None and settings.sgp_api_key: + sgp_client = SGPClient( + api_key=settings.sgp_api_key, + base_url=settings.sgp_base_url, + cache_ttl_seconds=settings.sgp_cache_ttl_seconds, + ) + if sgp_client is None and settings.sgp_enforce: + logger.warning( + "SGP_ENFORCE is true but SGP_API_KEY is empty; " + "the IAB Diligence Platform approval gate will be bypassed. " + "Set SGP_API_KEY to enable vendor approval enforcement." + ) + self._sgp_client = sgp_client + # Create tools self._discover_tool = DiscoverInventoryTool( client=client, buyer_context=buyer_context, + sgp_client=sgp_client, + sgp_enforce=settings.sgp_enforce, + sgp_unknown_policy=settings.sgp_unknown_vendor_policy, ) self._pricing_tool = GetPricingTool( client=client, @@ -210,6 +233,13 @@ def __init__( self._deal_tool = RequestDealTool( client=client, buyer_context=buyer_context, + sgp_client=sgp_client, + sgp_enforce=settings.sgp_enforce, + sgp_unknown_policy=settings.sgp_unknown_vendor_policy, + ) + # Agent-callable vendor approval tool — only useful with an SGP client. + self._vendor_approval_tool: SGPVendorApprovalTool | None = ( + SGPVendorApprovalTool(client=sgp_client) if sgp_client is not None else None ) # ------------------------------------------------------------------ @@ -399,10 +429,14 @@ def evaluate_and_select(self, discovery_result: dict[str, Any]) -> dict[str, Any try: self.state.status = BuyerDealFlowStatus.EVALUATING_PRICING - # Create crew for intelligent selection - deal_agent = create_buyer_deal_specialist_agent( - tools=[self._discover_tool, self._pricing_tool], - ) + # Create crew for intelligent selection. Include the vendor + # approval tool so the agent can check IAB buyer-agent approval + # status for candidate sellers during selection, not just at + # Deal ID generation. + agent_tools: list[Any] = [self._discover_tool, self._pricing_tool] + if self._vendor_approval_tool is not None: + agent_tools.append(self._vendor_approval_tool) + deal_agent = create_buyer_deal_specialist_agent(tools=agent_tools) selection_task = Task( description=f"""Analyze the discovery results and select the best product diff --git a/src/ad_buyer/models/sgp.py b/src/ad_buyer/models/sgp.py new file mode 100644 index 0000000..f46f771 --- /dev/null +++ b/src/ad_buyer/models/sgp.py @@ -0,0 +1,30 @@ +# Author: SafeGuard Privacy +# Donated to IAB Tech Lab + +"""IAB Diligence Platform (SGP) integration models. + +Mirrors the IabBuyerAgentResource returned by + GET /api/v1/integrations/iab/buyer-agent-approval +on the IAB Diligence Platform platform. +""" + +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict, Field + + +class ApprovalRecord(BaseModel): + """A single vendor's IAB buyer-agent approval status from IAB Diligence Platform.""" + + model_config = ConfigDict(populate_by_name=True) + + vendor_id: int = Field(alias="vendorId") + vendor_company_id: int = Field(alias="vendorCompanyId") + company_name: str = Field(alias="companyName", default="") + domain: str = "" + iab_buyer_agent_approval: bool = Field(alias="iabBuyerAgentApproval", default=False) + iab_buyer_agent_approved_at: datetime | None = Field( + alias="iabBuyerAgentApprovedAt", default=None + ) diff --git a/src/ad_buyer/tools/buyer_deals/discover_inventory.py b/src/ad_buyer/tools/buyer_deals/discover_inventory.py index 17537ca..2f79247 100644 --- a/src/ad_buyer/tools/buyer_deals/discover_inventory.py +++ b/src/ad_buyer/tools/buyer_deals/discover_inventory.py @@ -3,6 +3,7 @@ """Inventory discovery tool for buyer deal workflows.""" +import logging from typing import Any from crewai.tools import BaseTool @@ -10,8 +11,12 @@ from ...async_utils import run_async from ...booking.pricing import PricingCalculator +from ...clients.sgp_client import SGPClient, SGPClientError, extract_product_domain from ...clients.unified_client import UnifiedClient from ...models.buyer_identity import BuyerContext +from ...models.sgp import ApprovalRecord + +logger = logging.getLogger(__name__) class DiscoverInventoryInput(BaseModel): @@ -76,11 +81,17 @@ class DiscoverInventoryTool(BaseTool): args_schema: type[BaseModel] = DiscoverInventoryInput _client: UnifiedClient _buyer_context: BuyerContext + _sgp_client: SGPClient | None + _sgp_enforce: bool + _sgp_unknown_policy: str def __init__( self, client: UnifiedClient, buyer_context: BuyerContext, + sgp_client: SGPClient | None = None, + sgp_enforce: bool = False, + sgp_unknown_policy: str = "block", **kwargs: Any, ): """Initialize with unified client and buyer context. @@ -88,10 +99,24 @@ def __init__( Args: client: UnifiedClient for seller communication buyer_context: BuyerContext with identity for tiered access + sgp_client: Optional IAB Diligence Platform client. When provided, + each returned product is annotated with the seller's + IAB buyer-agent approval status. + sgp_enforce: When True and ``sgp_client`` is provided, NOT + APPROVED vendors are removed from the result before the + agent sees them, and an SGP transport failure halts the + flow instead of falling back to unannotated results. + sgp_unknown_policy: How to treat vendors absent from the + buyer's SGP portfolio when enforcing. One of ``block`` + (filter out), ``warn`` (keep with warning annotation), + or ``allow`` (keep silently). """ super().__init__(**kwargs) self._client = client self._buyer_context = buyer_context + self._sgp_client = sgp_client + self._sgp_enforce = sgp_enforce + self._sgp_unknown_policy = sgp_unknown_policy def _run( self, @@ -154,19 +179,139 @@ async def _arun( if not result.success: return f"Error discovering inventory: {result.error}" - return self._format_results(result.data, identity_context) + approvals = await self._fetch_approvals(result.data) + filtered, filter_summary = self._apply_enforcement(result.data, approvals) + return self._format_results( + filtered, identity_context, approvals, filter_summary + ) + except SGPClientError as e: + # Reached only when enforcement is on; _fetch_approvals swallows + # transport errors otherwise. Halts the flow via the caller's + # broad except clause. + raise SGPClientError( + f"Inventory discovery halted: IAB Diligence Platform unreachable " + f"while SGP_ENFORCE=true ({e})." + ) from e except (OSError, ValueError, RuntimeError) as e: return f"Error discovering inventory: {e}" + def _approval_line( + self, + product: dict, + approvals: dict[str, ApprovalRecord | None] | None, + ) -> str | None: + """Render the SGP approval annotation for a single product row.""" + if not approvals or self._sgp_client is None: + return None + raw_domain = extract_product_domain(product) + if not raw_domain: + return " SGP Approval: ? UNKNOWN (no seller domain on product)" + normalized = self._sgp_client.normalize_domain(raw_domain) + record = approvals.get(normalized) + if record is None: + # When enforcing under "allow" policy, unknowns pass silently. + if self._sgp_enforce and self._sgp_unknown_policy == "allow": + return None + if self._sgp_enforce and self._sgp_unknown_policy == "warn": + return ( + f" SGP WARNING: {normalized} not in SGP portfolio — " + f"onboard and approve to suppress this warning." + ) + return f" SGP Approval: ? UNKNOWN — {normalized} not in SGP portfolio" + if record.iab_buyer_agent_approval: + return f" SGP Approval: ✓ APPROVED — {normalized}" + return f" SGP Approval: ✗ NOT APPROVED — {normalized}" + + async def _fetch_approvals( + self, products: Any + ) -> dict[str, ApprovalRecord | None]: + """Batch-check SGP approvals for the distinct seller domains in the result. + + Returns a dict keyed by normalized domain. Empty dict when no + SGP client is configured or no products carry a seller domain. + When enforcement is off and a transport error occurs, logs and + returns an empty dict so discovery still produces (unannotated) + results. When enforcement is on, the transport error propagates + so the caller can halt the flow. + """ + if self._sgp_client is None: + return {} + product_list = products if isinstance(products, list) else [products] + raw_domains: list[str] = [] + for product in product_list: + if not isinstance(product, dict): + continue + domain = extract_product_domain(product) + if domain: + raw_domains.append(domain) + if not raw_domains: + return {} + try: + return await self._sgp_client.check_approvals(raw_domains) + except SGPClientError: + if self._sgp_enforce: + raise + logger.warning( + "SGP approval lookup failed during discovery; " + "continuing without annotations", + exc_info=True, + ) + return {} + + def _apply_enforcement( + self, + products: Any, + approvals: dict[str, ApprovalRecord | None], + ) -> tuple[list, dict[str, int]]: + """Filter products by SGP approval status when enforcement is on. + + Returns ``(filtered_products, summary_counts)``. When enforcement + is off, the product list is returned unchanged and counts are + empty. ``summary_counts`` keys: ``not_approved``, + ``unknown_blocked``, ``no_domain_blocked``. + """ + product_list = products if isinstance(products, list) else [products] + if not self._sgp_enforce or self._sgp_client is None: + return product_list, {} + + filtered: list = [] + counts = {"not_approved": 0, "unknown_blocked": 0, "no_domain_blocked": 0} + + for product in product_list: + if not isinstance(product, dict): + filtered.append(product) + continue + raw_domain = extract_product_domain(product) + if not raw_domain: + counts["no_domain_blocked"] += 1 + continue + normalized = self._sgp_client.normalize_domain(raw_domain) + record = approvals.get(normalized) + if record is None: + if self._sgp_unknown_policy == "block": + counts["unknown_blocked"] += 1 + continue + filtered.append(product) + elif not record.iab_buyer_agent_approval: + counts["not_approved"] += 1 + else: + filtered.append(product) + + return filtered, counts + def _format_results( self, products: Any, identity_context: dict, + approvals: dict[str, ApprovalRecord | None] | None = None, + filter_summary: dict[str, int] | None = None, ) -> str: """Format discovery results with tier information.""" if not products: - return "No inventory found matching your criteria." + base = "No inventory found matching your criteria." + tail = self._filter_summary_line(filter_summary) + return f"{base}\n{tail}" if tail else base tier = identity_context.get("access_tier", "public") discount = self._buyer_context.identity.get_discount_percentage() @@ -210,6 +355,8 @@ def _format_results( else str(base_price) ) + approval_line = self._approval_line(product, approvals) + output_lines.extend( [ f"{i}. {name}", @@ -221,15 +368,20 @@ def _format_results( if isinstance(impressions, int) else f" Available: {impressions}", f" Targeting: {', '.join(targeting) if targeting else 'Standard'}", - "", ] ) + if approval_line: + output_lines.append(approval_line) + output_lines.append("") else: output_lines.append(f"{i}. {product}") output_lines.append("") output_lines.append("-" * 50) output_lines.append(f"Total products found: {len(product_list)}") + summary_line = self._filter_summary_line(filter_summary) + if summary_line: + output_lines.append(summary_line) if self._buyer_context.can_access_premium_inventory(): output_lines.append("Premium inventory access: ENABLED") @@ -237,3 +389,20 @@ def _format_results( output_lines.append("Price negotiation: AVAILABLE") return "\n".join(output_lines) + + @staticmethod + def _filter_summary_line(summary: dict[str, int] | None) -> str | None: + """One-line description of products removed by SGP enforcement.""" + if not summary: + return None + total = sum(summary.values()) + if total == 0: + return None + parts: list[str] = [] + if summary.get("not_approved"): + parts.append(f"{summary['not_approved']} not approved") + if summary.get("unknown_blocked"): + parts.append(f"{summary['unknown_blocked']} unknown to SGP") + if summary.get("no_domain_blocked"): + parts.append(f"{summary['no_domain_blocked']} missing seller domain") + return f"SGP enforcement filtered {total} product(s): " + ", ".join(parts) diff --git a/src/ad_buyer/tools/buyer_deals/request_deal.py b/src/ad_buyer/tools/buyer_deals/request_deal.py index eb4ab86..bdf1879 100644 --- a/src/ad_buyer/tools/buyer_deals/request_deal.py +++ b/src/ad_buyer/tools/buyer_deals/request_deal.py @@ -3,6 +3,7 @@ """Deal ID request tool for buyer deal workflows.""" +import logging from datetime import datetime, timedelta, timezone from typing import Any @@ -12,6 +13,7 @@ from ...async_utils import run_async from ...booking.deal_id import generate_deal_id from ...booking.pricing import PricingCalculator +from ...clients.sgp_client import SGPClient, SGPClientError, extract_product_domain from ...clients.unified_client import UnifiedClient from ...models.audience_plan import AudiencePlan from ...models.buyer_identity import ( @@ -21,6 +23,11 @@ DealResponse, DealType, ) +from ...models.sgp import ApprovalRecord + +logger = logging.getLogger(__name__) + +_VALID_UNKNOWN_POLICIES = {"block", "warn", "allow"} class RequestDealInput(BaseModel): @@ -94,11 +101,17 @@ class RequestDealTool(BaseTool): args_schema: type[BaseModel] = RequestDealInput _client: UnifiedClient _buyer_context: BuyerContext + _sgp_client: SGPClient | None + _sgp_enforce: bool + _sgp_unknown_policy: str def __init__( self, client: UnifiedClient, buyer_context: BuyerContext, + sgp_client: SGPClient | None = None, + sgp_enforce: bool = False, + sgp_unknown_policy: str = "block", **kwargs: Any, ): """Initialize with unified client and buyer context. @@ -106,10 +119,26 @@ def __init__( Args: client: UnifiedClient for seller communication buyer_context: BuyerContext with identity for tiered access + sgp_client: Optional IAB Diligence Platform client. When provided + and ``sgp_enforce`` is True, the seller's IAB buyer-agent + approval is verified before a Deal ID is generated. + sgp_enforce: When True, block the deal request unless SGP + returns ``iabBuyerAgentApproval=true`` for the seller. + sgp_unknown_policy: How to treat vendors absent from the + buyer's SGP portfolio (HTTP 404). One of ``block``, + ``warn``, ``allow``. """ super().__init__(**kwargs) self._client = client self._buyer_context = buyer_context + self._sgp_client = sgp_client + self._sgp_enforce = sgp_enforce + if sgp_unknown_policy not in _VALID_UNKNOWN_POLICIES: + raise ValueError( + f"Invalid sgp_unknown_policy '{sgp_unknown_policy}'. " + f"Must be one of: {', '.join(sorted(_VALID_UNKNOWN_POLICIES))}" + ) + self._sgp_unknown_policy = sgp_unknown_policy def _run( self, @@ -171,7 +200,7 @@ async def _arun( product = product_result.data if not product: return f"Product {product_id} not found." - + # Build the seller-bound DealRequest payload so the plan # rides on the wire (proposal §5.2 / §5.3 / bead ar-ts30 §18). # We construct the payload even when audience_plan is None so @@ -193,6 +222,11 @@ async def _arun( self._last_deal_request = deal_request_payload self._last_audience_plan = audience_plan + # IAB Diligence Platform approval gate — must pass before a Deal ID is issued. + gate_error, approval_banner = await self._check_sgp_approval(product) + if gate_error: + return gate_error + # Calculate pricing deal_response = self._create_deal_response( product=product, @@ -204,11 +238,93 @@ async def _arun( audience_plan=audience_plan, ) - return self._format_deal_response(deal_response, audience_plan) + formatted = self._format_deal_response(deal_response, audience_plan) + if approval_banner: + formatted = f"{approval_banner}\n{formatted}" + return formatted except (OSError, ValueError, RuntimeError) as e: return f"Error requesting deal: {e}" + async def _check_sgp_approval( + self, product: dict + ) -> tuple[str | None, str | None]: + """Gate a deal request against IAB Diligence Platform approval. + + Returns ``(error_message, banner)``: + * ``error_message`` is non-None when the deal must be refused. + * ``banner`` is a one-line note prepended to a successful deal + response (e.g. "warn" policy, unknown vendor proceeding). + + When ``sgp_client`` is None or ``sgp_enforce`` is False, the gate + is skipped entirely. + """ + if self._sgp_client is None or not self._sgp_enforce: + return None, None + + raw_domain = extract_product_domain(product) + if not raw_domain: + return ( + "Deal blocked: cannot determine seller domain for IAB " + "Diligence Platform approval check. Add a seller_url / " + "publisher_domain field to the product, or disable SGP_ENFORCE.", + None, + ) + + domain = self._sgp_client.normalize_domain(raw_domain) or raw_domain + + try: + approvals = await self._sgp_client.check_approvals([raw_domain]) + except SGPClientError as exc: + logger.warning( + "IAB Diligence Platform lookup failed for %s during deal request", domain, + exc_info=True, + ) + # Fail closed — enforcement is on, so we must not issue a Deal ID + # when the privacy gate cannot be evaluated. + return ( + f"Deal blocked: IAB Diligence Platform lookup failed for {domain} " + f"({exc}). Retry once the SGP service is reachable.", + None, + ) + + record: ApprovalRecord | None = approvals.get(domain) + + if record is None: + if self._sgp_unknown_policy == "allow": + return None, f"SGP: {domain} not in SGP portfolio — allowed by policy." + if self._sgp_unknown_policy == "warn": + return None, ( + f"SGP WARNING: {domain} is not in your SGP portfolio. " + f"Onboard and approve this vendor in IAB Diligence Platform " + f"to suppress this warning." + ) + return ( + f"Deal blocked: {domain} is not in your IAB Diligence Platform " + f"portfolio. Onboard and approve the vendor in SGP before " + f"requesting a Deal ID.", + None, + ) + + if not record.iab_buyer_agent_approval: + return ( + f"Deal blocked: {record.company_name or domain} does not carry " + f"the IAB buyer-agent approval flag in IAB Diligence Platform. " + f"Update the vendor's approval in SGP and retry.", + None, + ) + + approved_at = ( + record.iab_buyer_agent_approved_at.isoformat() + if record.iab_buyer_agent_approved_at + else "date unknown" + ) + banner = ( + f"SGP: ✓ {record.company_name or domain} approved for IAB " + f"buyer-agent purchases (since {approved_at})." + ) + return None, banner + def _create_deal_response( self, product: dict, diff --git a/src/ad_buyer/tools/research/__init__.py b/src/ad_buyer/tools/research/__init__.py index 5e7c982..b596c0b 100644 --- a/src/ad_buyer/tools/research/__init__.py +++ b/src/ad_buyer/tools/research/__init__.py @@ -5,5 +5,6 @@ from .avails_check import AvailsCheckTool from .product_search import ProductSearchTool +from .sgp_vendor_approval import SGPVendorApprovalTool -__all__ = ["ProductSearchTool", "AvailsCheckTool"] +__all__ = ["ProductSearchTool", "AvailsCheckTool", "SGPVendorApprovalTool"] diff --git a/src/ad_buyer/tools/research/sgp_vendor_approval.py b/src/ad_buyer/tools/research/sgp_vendor_approval.py new file mode 100644 index 0000000..adb2ca2 --- /dev/null +++ b/src/ad_buyer/tools/research/sgp_vendor_approval.py @@ -0,0 +1,96 @@ +# Author: SafeGuard Privacy +# Donated to IAB Tech Lab + +"""CrewAI tool: check IAB buyer-agent approval via the IAB Diligence Platform. + +The class is intentionally prefixed ``SGP`` so that future vendor-approval +integrations (e.g. OneTrust, an IAB Tech Lab registry) can coexist under +distinct class names and distinct CrewAI tool ``name`` attributes. +""" + +from __future__ import annotations + +from typing import Any + +from crewai.tools import BaseTool +from pydantic import BaseModel, Field + +from ...async_utils import run_async +from ...clients.sgp_client import SGPClient, SGPClientError + + +class SGPVendorApprovalInput(BaseModel): + """Input schema for the IAB Diligence Platform vendor approval check tool.""" + + domains: list[str] = Field( + ..., + description=( + "Seller domains or full seller URLs to check. Scheme, www, and " + "port are stripped automatically. Up to 10 are checked per call; " + "larger lists are batched." + ), + ) + + +class SGPVendorApprovalTool(BaseTool): + """Check whether seller vendors carry the IAB buyer-agent approval flag. + + Consults the IAB Diligence Platform `iab/buyer-agent-approval` endpoint, + which returns the ``iabBuyerAgentApproval`` boolean per vendor on the + buyer's SGP tenant. Vendors absent from the tenant come back as + ``UNKNOWN``. + """ + + name: str = "check_sgp_vendor_approval" + description: str = ( + "Check IAB buyer-agent approval for seller domains via the IAB " + "Diligence Platform. Returns APPROVED / NOT APPROVED / UNKNOWN per domain, " + "along with the approval date when available. Use before " + "requesting a Deal ID from a seller." + ) + args_schema: type[BaseModel] = SGPVendorApprovalInput + _client: SGPClient + + def __init__(self, client: SGPClient, **kwargs: Any) -> None: + super().__init__(**kwargs) + self._client = client + + def _run(self, domains: list[str]) -> str: + return run_async(self._arun(domains=domains)) + + async def _arun(self, domains: list[str]) -> str: + try: + results = await self._client.check_approvals(domains) + except SGPClientError as exc: + return f"IAB Diligence Platform lookup failed: {exc}" + + if not results: + return "No valid domains were provided." + + lines = [ + "IAB Diligence Platform Approval", + "-" * 50, + ] + for domain in sorted(results): + record = results[domain] + if record is None: + lines.append(f"? {domain}: UNKNOWN (not in SGP portfolio)") + continue + if record.iab_buyer_agent_approval: + approved_at = ( + record.iab_buyer_agent_approved_at.isoformat() + if record.iab_buyer_agent_approved_at + else "date unknown" + ) + lines.append( + f"✓ {domain}: APPROVED " + f"({record.company_name or 'company name unavailable'}, " + f"since {approved_at})" + ) + else: + lines.append( + f"✗ {domain}: NOT APPROVED " + f"({record.company_name or 'company name unavailable'})" + ) + + return "\n".join(lines) diff --git a/tests/unit/test_sgp_client.py b/tests/unit/test_sgp_client.py new file mode 100644 index 0000000..c4f9b23 --- /dev/null +++ b/tests/unit/test_sgp_client.py @@ -0,0 +1,269 @@ +# Author: SafeGuard Privacy +# Donated to IAB Tech Lab + +"""Tests for the IAB Diligence Platform (SGP) client. + +Covers domain normalization, batch chunking to 10, HTTP status handling +(200 / 400 / 401 / 404 / 5xx), response parsing, TTL cache, and the +api-key header. +""" + +from __future__ import annotations + +import httpx +import pytest + +from ad_buyer.clients.sgp_client import ( + SGPAuthError, + SGPClient, + SGPClientError, +) + +BASE_URL = "https://sgp.test" + + +def _make_client(handler, *, cache_ttl_seconds: int = 900) -> SGPClient: + """Build an SGPClient whose internal httpx client uses MockTransport.""" + c = SGPClient( + api_key="test-key", + base_url=BASE_URL, + cache_ttl_seconds=cache_ttl_seconds, + timeout=5.0, + ) + transport = httpx.MockTransport(handler) + c._http = httpx.AsyncClient( + transport=transport, + base_url=BASE_URL, + headers=dict(c._http.headers), + timeout=5.0, + ) + return c + + +def _success_body(records: list[dict]) -> dict: + return { + "status": "success", + "code": 200, + "message": "", + "data": records, + "pagination": {}, + } + + +def _record(domain: str, approved: bool, approved_at: str | None = "2026-03-14T12:00:00Z") -> dict: + return { + "vendorId": hash(domain) & 0xFFFF, + "vendorCompanyId": (hash(domain) + 1) & 0xFFFF, + "companyName": domain.split(".")[0].title() + " Inc.", + "domain": domain, + "iabBuyerAgentApproval": approved, + "iabBuyerAgentApprovedAt": approved_at, + } + + +# --------------------------------------------------------------------------- +# Domain normalization +# --------------------------------------------------------------------------- + + +class TestNormalizeDomain: + @pytest.mark.parametrize( + "raw, expected", + [ + ("example.com", "example.com"), + ("Example.COM", "example.com"), + ("www.example.com", "example.com"), + ("http://example.com", "example.com"), + ("https://www.example.com/path?q=1", "example.com"), + ("http://seller.example.com:8001", "seller.example.com"), + ("", ""), + (" ", ""), + ], + ) + def test_normalizes(self, raw: str, expected: str) -> None: + assert SGPClient.normalize_domain(raw) == expected + + +# --------------------------------------------------------------------------- +# Successful lookups +# --------------------------------------------------------------------------- + + +class TestCheckApprovalsSuccess: + @pytest.mark.asyncio + async def test_single_approved_vendor(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + assert request.url.path == "/api/v1/integrations/iab/buyer-agent-approval" + assert request.url.params["domain"] == "example.com" + assert request.headers["api-key"] == "test-key" + return httpx.Response(200, json=_success_body([_record("example.com", True)])) + + client = _make_client(handler) + results = await client.check_approvals(["https://example.com/foo"]) + assert set(results) == {"example.com"} + record = results["example.com"] + assert record is not None + assert record.iab_buyer_agent_approval is True + assert record.iab_buyer_agent_approved_at is not None + + @pytest.mark.asyncio + async def test_multiple_domains_single_call(self) -> None: + seen_params: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + seen_params.append(request.url.params["domain"]) + return httpx.Response( + 200, + json=_success_body( + [ + _record("a.com", True), + _record("b.com", False), + ] + ), + ) + + client = _make_client(handler) + results = await client.check_approvals(["a.com", "b.com"]) + assert seen_params == ["a.com,b.com"] + assert results["a.com"].iab_buyer_agent_approval is True + assert results["b.com"].iab_buyer_agent_approval is False + + @pytest.mark.asyncio + async def test_batches_more_than_ten_domains(self) -> None: + captured: list[list[str]] = [] + + def handler(request: httpx.Request) -> httpx.Response: + domains = request.url.params["domain"].split(",") + captured.append(domains) + records = [_record(d, True) for d in domains] + return httpx.Response(200, json=_success_body(records)) + + client = _make_client(handler) + domains = [f"d{i}.com" for i in range(25)] + results = await client.check_approvals(domains) + + assert [len(c) for c in captured] == [10, 10, 5] + assert len(results) == 25 + assert all(r is not None and r.iab_buyer_agent_approval for r in results.values()) + + @pytest.mark.asyncio + async def test_dedupes_input(self) -> None: + captured_domains: list[str] = [] + + def handler(request: httpx.Request) -> httpx.Response: + captured_domains.extend(request.url.params["domain"].split(",")) + return httpx.Response(200, json=_success_body([_record("example.com", True)])) + + client = _make_client(handler) + await client.check_approvals(["example.com", "www.example.com", "EXAMPLE.COM"]) + assert captured_domains == ["example.com"] + + +# --------------------------------------------------------------------------- +# Not-found / unknown vendor +# --------------------------------------------------------------------------- + + +class TestUnknownVendor: + @pytest.mark.asyncio + async def test_404_marks_all_batch_domains_unknown(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(404, json={"status": "error", "code": 404, "data": None}) + + client = _make_client(handler) + results = await client.check_approvals(["unknown1.com", "unknown2.com"]) + assert results == {"unknown1.com": None, "unknown2.com": None} + + @pytest.mark.asyncio + async def test_partial_batch_response_marks_missing_as_unknown(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + # SGP only returns records for domains it actually knows; the + # unknown ones are simply absent from the data array. + return httpx.Response(200, json=_success_body([_record("known.com", True)])) + + client = _make_client(handler) + results = await client.check_approvals(["known.com", "mystery.com"]) + assert results["known.com"] is not None + assert results["mystery.com"] is None + + +# --------------------------------------------------------------------------- +# Error paths +# --------------------------------------------------------------------------- + + +class TestErrorHandling: + @pytest.mark.asyncio + async def test_401_raises_auth_error(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(401, text="unauthorized") + + client = _make_client(handler) + with pytest.raises(SGPAuthError): + await client.check_approvals(["example.com"]) + + @pytest.mark.asyncio + async def test_400_raises_client_error(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(400, text="bad domain") + + client = _make_client(handler) + with pytest.raises(SGPClientError) as exc_info: + await client.check_approvals(["example.com"]) + assert exc_info.value.status_code == 400 + + @pytest.mark.asyncio + async def test_5xx_raises_client_error(self) -> None: + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503, text="maintenance") + + client = _make_client(handler) + with pytest.raises(SGPClientError) as exc_info: + await client.check_approvals(["example.com"]) + assert exc_info.value.status_code == 503 + + @pytest.mark.asyncio + async def test_transport_error_wrapped_as_client_error(self) -> None: + """Real httpx transport failures (connect/timeout/DNS) surface as SGPClientError.""" + + def handler(request: httpx.Request) -> httpx.Response: + raise httpx.ConnectError("connection refused") + + client = _make_client(handler) + with pytest.raises(SGPClientError) as exc_info: + await client.check_approvals(["example.com"]) + assert "ConnectError" in str(exc_info.value) + + +# --------------------------------------------------------------------------- +# Caching +# --------------------------------------------------------------------------- + + +class TestCache: + @pytest.mark.asyncio + async def test_cache_hit_avoids_second_request(self) -> None: + calls = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + return httpx.Response(200, json=_success_body([_record("cached.com", True)])) + + client = _make_client(handler) + first = await client.check_approvals(["cached.com"]) + second = await client.check_approvals(["cached.com"]) + assert calls["n"] == 1 + assert first["cached.com"].vendor_id == second["cached.com"].vendor_id + + @pytest.mark.asyncio + async def test_cache_stores_unknown_result(self) -> None: + calls = {"n": 0} + + def handler(request: httpx.Request) -> httpx.Response: + calls["n"] += 1 + return httpx.Response(404, json={"status": "error", "code": 404}) + + client = _make_client(handler) + await client.check_approvals(["mystery.com"]) + await client.check_approvals(["mystery.com"]) + assert calls["n"] == 1 diff --git a/tests/unit/test_sgp_gate.py b/tests/unit/test_sgp_gate.py new file mode 100644 index 0000000..3e431f3 --- /dev/null +++ b/tests/unit/test_sgp_gate.py @@ -0,0 +1,506 @@ +# Author: SafeGuard Privacy +# Donated to IAB Tech Lab + +"""Tests for the IAB Diligence Platform deal-request gate in RequestDealTool.""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from ad_buyer.clients.sgp_client import SGPClientError +from ad_buyer.models.buyer_identity import BuyerContext, BuyerIdentity +from ad_buyer.models.sgp import ApprovalRecord +from ad_buyer.tools.buyer_deals import RequestDealTool + + +@pytest.fixture +def agency_context() -> BuyerContext: + identity = BuyerIdentity( + seat_id="ttd-seat-123", + agency_id="omnicom-456", + agency_name="OMD", + ) + return BuyerContext(identity=identity, is_authenticated=True) + + +@pytest.fixture +def mock_client() -> MagicMock: + """UnifiedClient mock that returns a product with a seller_url.""" + client = MagicMock() + client.get_product = AsyncMock( + return_value=MagicMock( + success=True, + data={ + "id": "prod_1", + "name": "Premium CTV", + "basePrice": 20.00, + "seller_url": "http://seller.example.com:8001", + }, + ) + ) + return client + + +def _approved(domain: str) -> ApprovalRecord: + return ApprovalRecord.model_validate( + { + "vendorId": 1, + "vendorCompanyId": 10, + "companyName": "Example Seller", + "domain": domain, + "iabBuyerAgentApproval": True, + "iabBuyerAgentApprovedAt": "2026-03-01T00:00:00Z", + } + ) + + +def _denied(domain: str) -> ApprovalRecord: + return ApprovalRecord.model_validate( + { + "vendorId": 2, + "vendorCompanyId": 20, + "companyName": "Shady Seller", + "domain": domain, + "iabBuyerAgentApproval": False, + "iabBuyerAgentApprovedAt": None, + } + ) + + +# --------------------------------------------------------------------------- +# Gate off +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_no_sgp_client_bypasses_gate(mock_client, agency_context): + """When no SGP client is wired in, the tool operates as before.""" + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=None, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "DEAL CREATED SUCCESSFULLY" in result + + +@pytest.mark.asyncio +async def test_enforce_false_bypasses_gate(mock_client, agency_context): + """When enforcement is off, the gate does not block.""" + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock( + return_value={"seller.example.com": _denied("seller.example.com")} + ) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=False, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "DEAL CREATED SUCCESSFULLY" in result + sgp.check_approvals.assert_not_called() + + +# --------------------------------------------------------------------------- +# Approved +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_approved_vendor_allows_deal(mock_client, agency_context): + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock( + return_value={"seller.example.com": _approved("seller.example.com")} + ) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "DEAL CREATED SUCCESSFULLY" in result + assert "SGP: ✓" in result + assert "approved" in result.lower() + + +# --------------------------------------------------------------------------- +# Denied +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_denied_vendor_blocks_deal(mock_client, agency_context): + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock( + return_value={"seller.example.com": _denied("seller.example.com")} + ) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "Deal blocked" in result + assert "IAB buyer-agent approval" in result + assert "DEAL CREATED SUCCESSFULLY" not in result + + +# --------------------------------------------------------------------------- +# Unknown vendor policies +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unknown_vendor_blocks_by_default(mock_client, agency_context): + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock(return_value={"seller.example.com": None}) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="block", + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "Deal blocked" in result + assert "not in your IAB Diligence Platform" in result + assert "DEAL CREATED SUCCESSFULLY" not in result + + +@pytest.mark.asyncio +async def test_unknown_vendor_warn_allows_with_banner(mock_client, agency_context): + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock(return_value={"seller.example.com": None}) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="warn", + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "SGP WARNING" in result + assert "DEAL CREATED SUCCESSFULLY" in result + + +@pytest.mark.asyncio +async def test_unknown_vendor_allow_proceeds_silently(mock_client, agency_context): + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock(return_value={"seller.example.com": None}) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="allow", + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "DEAL CREATED SUCCESSFULLY" in result + + +# --------------------------------------------------------------------------- +# Failure modes +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_transport_error_fails_closed_when_enforcing(mock_client, agency_context): + """When SGP is unreachable and enforcement is on, deal must not be issued.""" + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="seller.example.com") + sgp.check_approvals = AsyncMock(side_effect=SGPClientError("upstream 503")) + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "Deal blocked" in result + assert "IAB Diligence Platform lookup failed" in result + assert "DEAL CREATED SUCCESSFULLY" not in result + + +@pytest.mark.asyncio +async def test_product_without_domain_blocks_when_enforcing(agency_context): + """A product missing any seller domain field cannot be evaluated, so block.""" + mock_client = MagicMock() + mock_client.get_product = AsyncMock( + return_value=MagicMock( + success=True, + data={"id": "prod_1", "name": "Test", "basePrice": 20.00}, + ) + ) + sgp = MagicMock() + sgp.normalize_domain = MagicMock(return_value="") + sgp.check_approvals = AsyncMock() + tool = RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + ) + result = await tool._arun(product_id="prod_1", impressions=100) + assert "Deal blocked" in result + assert "seller domain" in result + sgp.check_approvals.assert_not_called() + + +def test_invalid_unknown_policy_rejected(mock_client, agency_context): + with pytest.raises(ValueError, match="sgp_unknown_policy"): + RequestDealTool( + client=mock_client, + buyer_context=agency_context, + sgp_unknown_policy="maybe", + ) + + +# --------------------------------------------------------------------------- +# Flow-level wiring of SGPVendorApprovalTool +# --------------------------------------------------------------------------- + + +def test_flow_wires_vendor_approval_tool_when_sgp_configured(agency_context): + """BuyerDealFlow exposes the vendor approval tool to the deal agent.""" + from ad_buyer.clients.sgp_client import SGPClient + from ad_buyer.flows.buyer_deal_flow import BuyerDealFlow + from ad_buyer.tools.research import SGPVendorApprovalTool + + sgp = SGPClient(api_key="k", base_url="https://sgp.test") + flow = BuyerDealFlow( + client=MagicMock(), + buyer_context=agency_context, + sgp_client=sgp, + ) + assert isinstance(flow._vendor_approval_tool, SGPVendorApprovalTool) + + +def test_flow_omits_vendor_approval_tool_without_sgp(agency_context, monkeypatch): + """Without an SGP client (and no SGP_API_KEY env), the tool is not built.""" + from ad_buyer.config.settings import settings + from ad_buyer.flows.buyer_deal_flow import BuyerDealFlow + + monkeypatch.setattr(settings, "sgp_api_key", "") + flow = BuyerDealFlow( + client=MagicMock(), + buyer_context=agency_context, + sgp_client=None, + ) + assert flow._vendor_approval_tool is None + + +# --------------------------------------------------------------------------- +# DiscoverInventoryTool enforcement (filters before the agent sees products) +# --------------------------------------------------------------------------- + + +def _product(product_id: str, domain: str, price: float = 20.0) -> dict: + return { + "id": product_id, + "name": f"Product {product_id}", + "publisherId": "pub-1", + "channel": "ctv", + "basePrice": price, + "availableImpressions": 1_000_000, + "seller_url": f"http://{domain}", + } + + +@pytest.fixture +def discovery_client() -> MagicMock: + """UnifiedClient mock returning a mixed list of seller domains.""" + client = MagicMock() + products = [ + _product("p1", "approved.example.com"), + _product("p2", "denied.example.com"), + _product("p3", "unknown.example.com"), + ] + client.search_products = AsyncMock( + return_value=MagicMock(success=True, data=products) + ) + client.list_products = AsyncMock( + return_value=MagicMock(success=True, data=products) + ) + return client + + +def _strip_scheme(d: str) -> str: + """Tiny stand-in for SGPClient.normalize_domain for test mocks.""" + return d.replace("http://", "").replace("https://", "").split(":")[0] + + +def _discovery_sgp_mock() -> MagicMock: + """SGP mock with approved/denied/unknown for the three discovery_client domains.""" + sgp = MagicMock() + sgp.normalize_domain = MagicMock(side_effect=_strip_scheme) + sgp.check_approvals = AsyncMock( + return_value={ + "approved.example.com": _approved("approved.example.com"), + "denied.example.com": _denied("denied.example.com"), + "unknown.example.com": None, + } + ) + return sgp + + +@pytest.mark.asyncio +async def test_discovery_enforce_filters_not_approved( + discovery_client, agency_context +): + """When enforcing, NOT APPROVED rows are dropped before formatting.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = _discovery_sgp_mock() + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="block", + ) + result = await tool._arun(query="ctv inventory") + assert "approved.example.com" in result + assert "denied.example.com" not in result + assert "unknown.example.com" not in result + assert "filtered" in result.lower() + + +@pytest.mark.asyncio +async def test_discovery_enforce_warn_keeps_unknowns( + discovery_client, agency_context +): + """warn policy keeps unknowns in the result and emits a warning line.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = _discovery_sgp_mock() + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="warn", + ) + result = await tool._arun(query="ctv inventory") + assert "approved.example.com" in result + assert "denied.example.com" not in result + assert "unknown.example.com" in result + assert "SGP WARNING" in result + + +@pytest.mark.asyncio +async def test_discovery_enforce_allow_keeps_unknowns_silently( + discovery_client, agency_context +): + """allow policy keeps unknowns and suppresses the per-row annotation.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = _discovery_sgp_mock() + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + sgp_unknown_policy="allow", + ) + result = await tool._arun(query="ctv inventory") + # p3 is the unknown vendor — kept silently under "allow" policy + assert "Product p3" in result + assert "SGP WARNING" not in result + # NOT APPROVED (p2) is still filtered regardless of unknown policy + assert "Product p2" not in result + + +@pytest.mark.asyncio +async def test_discovery_no_enforce_annotates_only(discovery_client, agency_context): + """Without enforcement, all products pass through with annotations.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = _discovery_sgp_mock() + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=False, + ) + result = await tool._arun(query="ctv inventory") + assert "approved.example.com" in result + assert "denied.example.com" in result # not filtered + assert "unknown.example.com" in result + assert "NOT APPROVED" in result + assert "filtered" not in result.lower() + + +@pytest.mark.asyncio +async def test_discovery_fails_closed_when_sgp_unreachable_and_enforcing( + discovery_client, agency_context +): + """SGP transport error propagates so the flow can mark FAILED.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = MagicMock() + sgp.normalize_domain = MagicMock(side_effect=lambda d: d) + sgp.check_approvals = AsyncMock(side_effect=SGPClientError("upstream 503")) + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=True, + ) + with pytest.raises(SGPClientError, match="Inventory discovery halted"): + await tool._arun(query="ctv inventory") + + +@pytest.mark.asyncio +async def test_discovery_no_enforce_swallows_sgp_error( + discovery_client, agency_context, caplog +): + """Without enforcement, transport error returns unannotated results.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + sgp = MagicMock() + sgp.normalize_domain = MagicMock(side_effect=lambda d: d) + sgp.check_approvals = AsyncMock(side_effect=SGPClientError("upstream 503")) + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=sgp, + sgp_enforce=False, + ) + result = await tool._arun(query="ctv inventory") + assert "Product p1" in result + assert "Product p2" in result # not filtered (annotate-only mode) + assert "Product p3" in result + assert "SGP Approval" not in result # no annotations + assert "Total products found: 3" in result + + +@pytest.mark.asyncio +async def test_discovery_no_sgp_client_pass_through( + discovery_client, agency_context +): + """Without an SGP client, discovery behaves as before — no annotations, no filter.""" + from ad_buyer.tools.buyer_deals import DiscoverInventoryTool + + tool = DiscoverInventoryTool( + client=discovery_client, + buyer_context=agency_context, + sgp_client=None, + sgp_enforce=True, # no-op without a client + ) + result = await tool._arun(query="ctv inventory") + assert "Product p1" in result + assert "Product p2" in result + assert "Product p3" in result + assert "SGP Approval" not in result + assert "Total products found: 3" in result