diff --git a/README.md b/README.md index 4c5b77c..0780a0e 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ sources: host: localhost port: 5432 database: postgis - username: username + user: postgres password: password # Outputs: Define where notifications are sent @@ -67,7 +67,9 @@ outputs: broker_port: 1883 ``` -See [examples/config.yaml](./examples/config.yaml) for a complete configuration example with all available options. +Both outputs publish [OGC PubSub CloudEvents-JSON](https://github.com/opengeospatial/pubsub) messages (types such as `org.ogc.api.collection.item.create`). The pgSTAC source can enrich events with item geometry (`include_geometry: true`, default). Optional top-level `filters` restrict forwarding by collection, operation (`create`/`replace`/`delete`), and bbox. + +See [examples/config.yaml](./examples/config.yaml) for configuration and [examples/output-create.json](./examples/output-create.json) / [examples/output-delete.json](./examples/output-delete.json) for sample MQTT and CloudEvents payloads. ## Kubernetes Deployment @@ -87,7 +89,8 @@ See [Helm Chart README](helm-chart/eoapi-notifier/README.md) for configuration o - `pgstac`: Monitor PostgreSQL/pgSTAC database changes #### Outputs -- `mqtt`: Publish events to MQTT broker +- `mqtt`: Publish OGC CloudEvents-JSON to an MQTT broker +- `cloudevents`: POST OGC CloudEvents-JSON to an HTTP endpoint (or Knative `K_SINK`) ## Development diff --git a/docs/development.md b/docs/development.md index fb737a6..5863b0b 100644 --- a/docs/development.md +++ b/docs/development.md @@ -126,6 +126,7 @@ class MyOutput(BaseOutput): async def send_event(self, event: NotificationEvent) -> bool: try: + # Built-in mqtt/cloudevents outputs use eoapi_notifier.core.ogc.build_cloudevent(). payload = { "id": event.id, "source": event.source, diff --git a/eoapi_notifier/core/event.py b/eoapi_notifier/core/event.py index bf3043a..c57bd4f 100644 --- a/eoapi_notifier/core/event.py +++ b/eoapi_notifier/core/event.py @@ -23,6 +23,8 @@ class NotificationEvent(BaseModel): collection: STAC collection ID item_id: STAC item ID timestamp: Event timestamp + geometry: GeoJSON geometry + bbox: Bounding box [minx, miny, maxx, maxy] data: Additional event data """ @@ -33,4 +35,6 @@ class NotificationEvent(BaseModel): collection: str item_id: str | None = None timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC)) + geometry: dict[str, Any] | None = None + bbox: list[float] | None = None data: dict[str, Any] = Field(default_factory=dict) diff --git a/eoapi_notifier/core/filter.py b/eoapi_notifier/core/filter.py new file mode 100644 index 0000000..f194dda --- /dev/null +++ b/eoapi_notifier/core/filter.py @@ -0,0 +1,75 @@ +"""Event filtering by collection, operation, and bbox.""" + +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + +from .event import NotificationEvent +from .ogc import canonical_operation + + +class FilterConfig(BaseModel): + """Single filter block; unset fields are ignored.""" + + model_config = ConfigDict(extra="forbid") + + collections: list[str] | None = None + operations: list[str] | None = None + bbox: list[float] | None = Field(default=None, min_length=4, max_length=4) + + +def parse_filters(raw: list[dict[str, Any]]) -> list[FilterConfig]: + """Parse filter blocks from config.""" + return [FilterConfig.model_validate(block) for block in raw] + + +def _event_bbox(event: NotificationEvent) -> list[float] | None: + if event.bbox and len(event.bbox) >= 4: + return event.bbox + if not event.geometry: + return None + coords = event.geometry.get("coordinates") + if not coords: + return None + # Flatten coordinate pairs for a simple bbox estimate + flat: list[float] = [] + + def walk(node: Any) -> None: + if isinstance(node, int | float): + flat.append(float(node)) + elif isinstance(node, list): + for item in node: + walk(item) + + walk(coords) + if len(flat) < 2: + return None + xs = flat[0::2] + ys = flat[1::2] + return [min(xs), min(ys), max(xs), max(ys)] + + +def _bbox_overlaps(a: list[float], b: list[float]) -> bool: + return not (a[2] < b[0] or a[0] > b[2] or a[3] < b[1] or a[1] > b[3]) + + +def _matches_block(event: NotificationEvent, block: FilterConfig) -> bool: + if block.collections is not None and event.collection not in block.collections: + return False + if block.operations is not None: + event_op = canonical_operation(event.operation) or event.operation.lower() + allowed = {canonical_operation(op) or op.lower() for op in block.operations} + if event_op not in allowed: + return False + if block.bbox is not None: + event_bbox = _event_bbox(event) + if event_bbox is None or not _bbox_overlaps(event_bbox, block.bbox): + return False + return True + + +def matches(event: NotificationEvent, filters: list[FilterConfig]) -> bool: + """Return True if event passes filters (OR across blocks, AND within block).""" + if not filters: + return True + return any(_matches_block(event, block) for block in filters) diff --git a/eoapi_notifier/core/main.py b/eoapi_notifier/core/main.py index 0fe126d..ed51039 100644 --- a/eoapi_notifier/core/main.py +++ b/eoapi_notifier/core/main.py @@ -9,6 +9,7 @@ import yaml from loguru import logger +from .filter import FilterConfig, matches, parse_filters from .registry import create_output, create_source @@ -19,6 +20,7 @@ def __init__(self) -> None: """Initialize the application.""" self.sources: list[Any] = [] self.outputs: list[Any] = [] + self.filters: list[FilterConfig] = [] self._shutdown_event = asyncio.Event() self._running = False @@ -239,6 +241,10 @@ async def _process_source_events(self, source: Any) -> None: f"Received event #{event_count} from {source_name}: {event}" ) + if not matches(event, self.filters): + logger.debug(f"Event {event.id} filtered out") + continue + # Send event to all outputs for output in self.outputs: output_name = output.__class__.__name__ @@ -314,6 +320,7 @@ async def run(self, config_path: Path) -> None: # Create plugins logger.debug("Creating plugins...") self.create_plugins(config) + self.filters = parse_filters(config.get("filters", [])) if not self.sources and not self.outputs: logger.error("No plugins configured") diff --git a/eoapi_notifier/core/ogc.py b/eoapi_notifier/core/ogc.py new file mode 100644 index 0000000..fe9adc9 --- /dev/null +++ b/eoapi_notifier/core/ogc.py @@ -0,0 +1,62 @@ +"""OGC PubSub CloudEvents-JSON message builder.""" + +from typing import Any + +from cloudevents.http import CloudEvent + +from .event import NotificationEvent + +OGC_ITEM_TYPE_PREFIX = "org.ogc.api.collection.item" + +# Raw pgSTAC and correlated semantic operations mapped to OGC verbs. +_CANONICAL_OPERATIONS = { + "insert": "create", + "item_created": "create", + "create": "create", + "update": "replace", + "item_updated": "replace", + "replace": "replace", + "delete": "delete", + "item_deleted": "delete", +} + + +def canonical_operation(operation: str) -> str | None: + """Map a raw or semantic operation to an OGC verb (create/replace/delete).""" + return _CANONICAL_OPERATIONS.get(operation.lower()) + + +def build_cloudevent( + event: NotificationEvent, source: str = "/eoapi/stac" +) -> CloudEvent: + """Build an OGC PubSub CloudEvent from a notification event.""" + verb = canonical_operation(event.operation) + + attributes: dict[str, Any] = { + "specversion": "1.0", + "id": event.id, + "source": source, + "type": f"{OGC_ITEM_TYPE_PREFIX}.{verb or event.operation.lower()}", + "time": event.timestamp.isoformat(), + } + + if event.item_id: + attributes["subject"] = event.item_id + if event.collection: + attributes["collection"] = event.collection + + if verb == "delete": + attributes["datacontenttype"] = "text/plain" + data: Any = event.item_id or "" + else: + attributes["datacontenttype"] = "application/json" + data = { + "type": "Feature", + "id": event.item_id, + "collection": event.collection, + "geometry": event.geometry, + "bbox": event.bbox, + "properties": event.data, + } + + return CloudEvent(attributes, data) diff --git a/eoapi_notifier/outputs/cloudevents.py b/eoapi_notifier/outputs/cloudevents.py index c129dc7..cd320a4 100644 --- a/eoapi_notifier/outputs/cloudevents.py +++ b/eoapi_notifier/outputs/cloudevents.py @@ -8,14 +8,13 @@ import json import os from typing import Any -from uuid import uuid4 import httpx -from cloudevents.conversion import to_binary -from cloudevents.http import CloudEvent +from cloudevents.conversion import to_structured from pydantic import field_validator, model_validator from ..core.event import NotificationEvent +from ..core.ogc import build_cloudevent from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata @@ -25,11 +24,9 @@ class CloudEventsConfig(BasePluginConfig): endpoint: str | None = None source: str = "/eoapi/stac" - event_type: str = "org.eoapi.stac" timeout: float = 30.0 max_retries: int = 3 retry_backoff: float = 1.0 - max_header_length: int = 4096 overrides: dict[str, str] = {} @field_validator("endpoint") @@ -56,11 +53,9 @@ def get_sample_config(cls) -> dict[str, Any]: return { "endpoint": None, # Uses K_SINK env var if not set "source": "/eoapi/stac", - "event_type": "org.eoapi.stac", "timeout": 30.0, "max_retries": 3, "retry_backoff": 1.0, - "max_header_length": 4096, } @classmethod @@ -81,7 +76,6 @@ def get_status_info(self) -> dict[str, Any]: return { "Endpoint": self.endpoint or "K_SINK env var", "Source": self.source, - "Event Type": self.event_type, "Timeout": f"{self.timeout}s", "Max Retries": self.max_retries, } @@ -173,17 +167,20 @@ async def send_event(self, event: NotificationEvent) -> bool: # Convert to CloudEvent self.logger.debug(f"Converting event {event.id} to CloudEvent format...") - cloud_event = self._convert_to_cloudevent(event) + cloud_event = build_cloudevent(event, source=self.config.source) + for key, value in self._ce_extensions.items(): + cloud_event[key] = str(value) self.logger.debug( f"CloudEvent created: id={cloud_event['id']}, " f"type={cloud_event['type']}" ) - # Convert to binary format - self.logger.debug("Converting CloudEvent to binary format...") - headers, data = to_binary(cloud_event) + # Convert to CloudEvents-JSON + self.logger.debug("Converting CloudEvent to structured format...") + structured_headers, structured_body = to_structured(cloud_event) + headers = dict(structured_headers) self.logger.debug( - f"Binary conversion complete, headers: {list(headers.keys())}" + f"Structured conversion complete, headers: {list(headers.keys())}" ) # Send HTTP POST @@ -191,7 +188,9 @@ async def send_event(self, event: NotificationEvent) -> bool: f"Sending CloudEvent {cloud_event['id']} to {endpoint} " f"(timeout: {self.config.timeout}s)" ) - response = await self._client.post(endpoint, headers=headers, data=data) + response = await self._client.post( + endpoint, headers=headers, content=structured_body + ) response.raise_for_status() self.logger.debug( @@ -219,67 +218,6 @@ async def send_event(self, event: NotificationEvent) -> bool: ) return False - def _truncate_header(self, value: str | None) -> str | None: - """Truncate header value to max_header_length if needed.""" - if not value: - return value - if len(value.encode("utf-8")) <= self.config.max_header_length: - return value - # Truncate to byte limit, ensuring valid UTF-8 - truncated = value.encode("utf-8")[: self.config.max_header_length] - return truncated.decode("utf-8", errors="ignore") - - def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent: - """Convert NotificationEvent to CloudEvent.""" - # Use config values which now include environment overrides - source = self.config.source - event_type_base = self.config.event_type - - # Use pre-parsed KNative CE overrides - ce_extensions = self._ce_extensions - - # Map operation to event type suffix - operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"} - operation = operation_map.get(event.operation.upper(), event.operation.lower()) - - attributes = { - "id": str(uuid4()), - "source": source, - "type": f"{event_type_base}.{operation}", - "time": event.timestamp.isoformat(), - "datacontenttype": "application/json", - } - - # Add subject if item_id exists - if event.item_id: - truncated_subject = self._truncate_header(event.item_id) - if truncated_subject: - attributes["subject"] = truncated_subject - - # Add collection attribute - if event.collection: - truncated_collection = self._truncate_header(event.collection) - if truncated_collection: - attributes["collection"] = truncated_collection - - # Apply KNative CE extension overrides - for key, value in ce_extensions.items(): - attributes[key] = str(value) - - # Event data payload - data = { - "id": event.id, - "source": event.source, - "type": event.type, - "operation": event.operation, - "collection": event.collection, - "item_id": event.item_id, - "timestamp": event.timestamp.isoformat(), - **event.data, - } - - return CloudEvent(attributes, data) - async def health_check(self) -> bool: """Check if the adapter is healthy.""" return self._running and self._client is not None diff --git a/eoapi_notifier/outputs/mqtt.py b/eoapi_notifier/outputs/mqtt.py index e0833ae..5a664f4 100644 --- a/eoapi_notifier/outputs/mqtt.py +++ b/eoapi_notifier/outputs/mqtt.py @@ -6,14 +6,15 @@ """ import asyncio -import json import ssl from typing import Any import paho.mqtt.client as mqtt +from cloudevents.conversion import to_structured from pydantic import field_validator from ..core.event import NotificationEvent +from ..core.ogc import build_cloudevent from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata @@ -337,17 +338,9 @@ async def send_event(self, event: NotificationEvent) -> bool: return False try: - # Convert event to JSON payload - payload = { - "id": event.id, - "source": event.source, - "type": event.type, - "operation": event.operation, - "collection": event.collection, - "item_id": event.item_id, - "timestamp": event.timestamp.isoformat(), - "data": event.data, - } + cloud_event = build_cloudevent(event) + _, structured_body = to_structured(cloud_event) + message = structured_body.decode("utf-8") # Determine topic (could be collection-specific) topic = ( @@ -357,12 +350,10 @@ async def send_event(self, event: NotificationEvent) -> bool: ) # Publish to MQTT - message = json.dumps(payload, default=str) self.logger.debug( f"Publishing event {event.id} to topic '{topic}' " f"(QoS {self.config.qos})" ) - self.logger.debug(f"Message payload: {message}") msg_info = self._client.publish( topic=topic, payload=message.encode("utf-8"), diff --git a/eoapi_notifier/sources/pgstac.py b/eoapi_notifier/sources/pgstac.py index 46954fa..5451c79 100644 --- a/eoapi_notifier/sources/pgstac.py +++ b/eoapi_notifier/sources/pgstac.py @@ -382,6 +382,11 @@ class PgSTACSourceConfig(BasePluginConfig): le=10000, ) + include_geometry: bool = Field( + default=True, + description="Query pgSTAC for item geometry and bbox on non-DELETE events", + ) + @field_validator("port") @classmethod def validate_port(cls, v: int) -> int: @@ -405,6 +410,7 @@ def get_sample_config(cls) -> dict[str, Any]: "correlation_window": DEFAULT_CORRELATION_WINDOW, "cleanup_interval": DEFAULT_CLEANUP_INTERVAL, "event_queue_size": DEFAULT_EVENT_QUEUE_SIZE, + "include_geometry": True, } @classmethod @@ -670,6 +676,12 @@ async def _process_notifications(self) -> None: event = self._create_notification_event(notification) if event: + if ( + self.config.include_geometry + and event.operation.upper() != "DELETE" + and event.item_id + ): + await self._enrich_geometry(event) await self._event_queue.put(event) self.logger.debug( "✓ Event #%d queued: %s", notification_count, event.id @@ -789,6 +801,38 @@ def _create_notification_event(self, payload: str) -> NotificationEvent | None: ) return None + async def _enrich_geometry(self, event: NotificationEvent) -> None: + """Fetch geometry and bbox from pgSTAC for the changed item.""" + if not self._connection or not self._connected: + return + try: + row = await self._connection.fetchrow( + """ + SELECT ST_AsGeoJSON(geometry)::json AS geometry, + content->'bbox' AS bbox + FROM pgstac.items + WHERE id = $1 AND collection = $2 + """, + event.item_id, + event.collection, + ) + if not row: + return + geom = row["geometry"] + if geom: + event.geometry = geom if isinstance(geom, dict) else json.loads(geom) + bbox = row["bbox"] + if bbox is not None: + bbox = bbox if isinstance(bbox, list) else json.loads(bbox) + event.bbox = [float(x) for x in bbox] + except Exception as e: + self.logger.warning( + "Failed to fetch geometry for %s/%s: %s", + event.collection, + event.item_id, + e, + ) + async def _raw_event_stream(self) -> AsyncIterator[NotificationEvent]: """Generate raw event stream from the event queue.""" self.logger.debug("Starting raw event stream from queue...") diff --git a/examples/config.yaml b/examples/config.yaml index dbb6b61..4323841 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -27,6 +27,13 @@ sources: # enable_correlation: true # PGSTAC_ENABLE_CORRELATION # correlation_window: 5.0 # PGSTAC_CORRELATION_WINDOW # cleanup_interval: 1.0 # PGSTAC_CLEANUP_INTERVAL + # include_geometry: true # PGSTAC_INCLUDE_GEOMETRY + +# Optional: filter events before forwarding to outputs +# filters: +# - collections: ["sentinel-2-l2a"] +# operations: ["create", "replace"] +# bbox: [-180, -90, 180, 90] # Outputs: Define where notifications are sent outputs: @@ -46,22 +53,17 @@ outputs: # topic: "eoapi/" # MQTT_TOPIC # qos: 1 # MQTT_QOS - # CloudEvents HTTP output for sending events as CloudEvents - # - # Besides the regular overwrite, this plugin also supports K_SINK - # https://knative.dev/docs/eventing/custom-event-source/sinkbinding/ + # CloudEvents HTTP output (OGC PubSub CloudEvents-JSON; type derived from operation) - type: cloudevents config: endpoint: https://example.com/webhook # CLOUDEVENTS_ENDPOINT or K_SINK - # Optional: CloudEventattributes + # Optional: CloudEvent attributes # source: "/eoapi/stac" # CLOUDEVENTS_SOURCE - # event_type: "org.eoapi.stac" # CLOUDEVENTS_EVENT_TYPE # Optional: HTTP settings # timeout: 30.0 # CLOUDEVENTS_TIMEOUT # max_retries: 3 # CLOUDEVENTS_MAX_RETRIES - # max_header_length: 4096 # CLOUDEVENTS_MAX_HEADER_LENGTH # Example with multiple sources and outputs # sources: diff --git a/examples/output-create.json b/examples/output-create.json new file mode 100644 index 0000000..77be55e --- /dev/null +++ b/examples/output-create.json @@ -0,0 +1,30 @@ +{ + "specversion": "1.0", + "id": "pgstac-item-1", + "source": "/eoapi/stac", + "type": "org.ogc.api.collection.item.create", + "datacontenttype": "application/json", + "subject": "item-1", + "time": "2024-01-01T00:00:00+00:00", + "collection": "sentinel-2-l2a", + "data": { + "type": "Feature", + "id": "item-1", + "collection": "sentinel-2-l2a", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [0, 0], + [1, 0], + [1, 1], + [0, 0] + ] + ] + }, + "bbox": [0, 0, 1, 1], + "properties": { + "datetime": "2024-01-01T00:00:00Z" + } + } +} diff --git a/examples/output-delete.json b/examples/output-delete.json new file mode 100644 index 0000000..aeaf836 --- /dev/null +++ b/examples/output-delete.json @@ -0,0 +1,11 @@ +{ + "specversion": "1.0", + "id": "pgstac-item-99", + "source": "/eoapi/stac", + "type": "org.ogc.api.collection.item.delete", + "datacontenttype": "text/plain", + "subject": "item-99", + "time": "2024-01-02T00:00:00+00:00", + "collection": "sentinel-2-l2a", + "data": "item-99" +} diff --git a/helm-chart/eoapi-notifier/README.md b/helm-chart/eoapi-notifier/README.md index 8806a45..3fa8f6b 100644 --- a/helm-chart/eoapi-notifier/README.md +++ b/helm-chart/eoapi-notifier/README.md @@ -1,6 +1,6 @@ # eoAPI Notifier Helm Chart -Helm chart for eoAPI Notifier - message handler for PostgreSQL/pgSTAC to MQTT. +Helm chart for eoAPI Notifier — forwards PostgreSQL/pgSTAC change notifications to MQTT and/or CloudEvents outputs. ## Installation @@ -41,7 +41,6 @@ config: - type: cloudevents config: source: /eoapi/pgstac - event_type: org.eoapi.stac.item destination: ref: apiVersion: messaging.knative.dev/v1 @@ -49,6 +48,12 @@ config: name: my-channel-1 namespace: serverless + # Optional: filter events before forwarding (OR across blocks) + # filters: + # - collections: ["sentinel-2-l2a"] + # operations: ["create", "replace"] + # bbox: [-180, -90, 180, 90] + # Connection credentials should be provided via existing Kubernetes secrets # Referenced in sources[].config.connection.existingSecret @@ -94,7 +99,6 @@ outputs: - type: cloudevents config: source: /eoapi/pgstac - event_type: org.eoapi.stac.item destination: ref: apiVersion: messaging.knative.dev/v1 @@ -102,3 +106,5 @@ outputs: name: my-broker namespace: default # optional ``` + +CloudEvent `type` values follow OGC PubSub (`org.ogc.api.collection.item.create`, etc.); they are derived from the pgSTAC operation, not from output config. diff --git a/helm-chart/eoapi-notifier/templates/configmap.yaml b/helm-chart/eoapi-notifier/templates/configmap.yaml index 9eeacda..dc3a0b2 100644 --- a/helm-chart/eoapi-notifier/templates/configmap.yaml +++ b/helm-chart/eoapi-notifier/templates/configmap.yaml @@ -31,3 +31,8 @@ data: {{- toYaml .config | nindent 10}} {{- end}} {{- end}} + + {{- with .Values.config.filters}} + filters: + {{- toYaml . | nindent 4}} + {{- end}} diff --git a/helm-chart/eoapi-notifier/values.yaml b/helm-chart/eoapi-notifier/values.yaml index ed8a259..b6a5540 100644 --- a/helm-chart/eoapi-notifier/values.yaml +++ b/helm-chart/eoapi-notifier/values.yaml @@ -76,6 +76,13 @@ config: # min_connections: 1 # max_connections: 10 # enable_correlation: true + # include_geometry: true + + # Optional: filter events before forwarding to outputs + # filters: + # - collections: ["sentinel-2-l2a"] + # operations: ["create", "replace"] + # bbox: [-180, -90, 180, 90] # Outputs: Define where notifications are sent outputs: @@ -88,7 +95,6 @@ config: - type: cloudevents config: source: /eoapi/pgstac - event_type: org.eoapi.stac.item # For KNative SinkBinding: destination: ref: diff --git a/tests/test_cloudevents_output.py b/tests/test_cloudevents_output.py index b7f467d..224f8e9 100644 --- a/tests/test_cloudevents_output.py +++ b/tests/test_cloudevents_output.py @@ -6,9 +6,9 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest -from cloudevents.http import CloudEvent from eoapi_notifier.core.event import NotificationEvent +from eoapi_notifier.core.ogc import build_cloudevent from eoapi_notifier.core.plugin import PluginMetadata from eoapi_notifier.outputs.cloudevents import CloudEventsAdapter, CloudEventsConfig @@ -31,10 +31,8 @@ def test_default_configuration(self) -> None: assert config.endpoint is None assert config.source == "/eoapi/stac" - assert config.event_type == "org.eoapi.stac" assert config.timeout == 30.0 assert config.max_retries == 3 - assert config.max_header_length == 4096 def test_endpoint_validation_error(self) -> None: """Test endpoint validation.""" @@ -149,8 +147,10 @@ async def test_send_event_success( adapter._client = mock_client adapter._running = True - with patch("eoapi_notifier.outputs.cloudevents.to_binary") as mock_to_binary: - mock_to_binary.return_value = ({"ce-id": "test"}, b"data") + with patch( + "eoapi_notifier.outputs.cloudevents.to_structured" + ) as mock_structured: + mock_structured.return_value = ({"ce-id": "test"}, b'{"data":{}}') result = await adapter.send_event(sample_event) @@ -158,6 +158,36 @@ async def test_send_event_success( mock_client.post.assert_called_once() mock_response.raise_for_status.assert_called_once() + def test_build_cloudevent_ogc_format( + self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + ) -> None: + """Test NotificationEvent to OGC CloudEvent conversion.""" + cloud_event = build_cloudevent(sample_event, source=adapter.config.source) + + assert cloud_event["source"] == "/eoapi/stac" + assert cloud_event["type"] == "org.ogc.api.collection.item.create" + assert cloud_event["subject"] == "test-item" + assert cloud_event["collection"] == "test-collection" + + def test_operation_mapping(self, adapter: CloudEventsAdapter) -> None: + """Test operation to OGC event type mapping.""" + test_cases = [ + ("INSERT", "org.ogc.api.collection.item.create"), + ("UPDATE", "org.ogc.api.collection.item.replace"), + ("DELETE", "org.ogc.api.collection.item.delete"), + ] + + for operation, expected in test_cases: + event = NotificationEvent( + source="/test", + type="test", + operation=operation, + collection="test", + item_id="item-1", + ) + cloud_event = build_cloudevent(event, source=adapter.config.source) + assert cloud_event["type"] == expected + async def test_send_event_no_client( self, adapter: CloudEventsAdapter, sample_event: NotificationEvent ) -> None: @@ -210,92 +240,33 @@ async def test_send_event_http_error( result = await adapter.send_event(sample_event) assert result is False - def test_convert_to_cloudevent( - self, adapter: CloudEventsAdapter, sample_event: NotificationEvent + @patch.dict( + os.environ, + { + "K_CE_OVERRIDES": ( + '{"extensions": {"extra": "test-value", "priority": "high"}}' + ) + }, + ) + async def test_send_event_applies_knative_overrides( + self, config: CloudEventsConfig, sample_event: NotificationEvent ) -> None: - """Test NotificationEvent to CloudEvent conversion.""" - cloud_event = adapter._convert_to_cloudevent(sample_event) - - assert isinstance(cloud_event, CloudEvent) - assert cloud_event["source"] == "/eoapi/stac" - assert cloud_event["type"] == "org.eoapi.stac.created" - assert cloud_event["subject"] == "test-item" - assert cloud_event["collection"] == "test-collection" - - def test_truncate_header(self, adapter: CloudEventsAdapter) -> None: - """Test header value truncation.""" - # Short string should not be truncated - short = "short-string" - assert adapter._truncate_header(short) == short - - # None should remain None - assert adapter._truncate_header(None) is None - - # Long string should be truncated to max_header_length bytes - long_string = "a" * 3000 - truncated = adapter._truncate_header(long_string) - assert truncated is not None - assert len(truncated.encode("utf-8")) <= adapter.config.max_header_length - assert len(truncated) <= adapter.config.max_header_length - - # UTF-8 multi-byte characters should be handled correctly - unicode_string = "测试" * 1000 # Chinese characters (3 bytes each) - truncated_unicode = adapter._truncate_header(unicode_string) - assert truncated_unicode is not None - assert ( - len(truncated_unicode.encode("utf-8")) <= adapter.config.max_header_length - ) - # Should not break in the middle of a character - assert truncated_unicode.encode("utf-8").decode("utf-8") == truncated_unicode + """Test send_event applies K_CE_OVERRIDES extensions.""" + import json - def test_convert_to_cloudevent_with_long_headers( - self, config: CloudEventsConfig - ) -> None: - """Test CloudEvent conversion with long header values.""" - config.max_header_length = 50 # Small limit for testing adapter = CloudEventsAdapter(config) + mock_client = AsyncMock() + mock_response = MagicMock() + mock_client.post.return_value = mock_response + adapter._client = mock_client + adapter._running = True - # Create event with long item_id and collection - event = NotificationEvent( - source="/test/source", - type="test.type", - operation="INSERT", - collection="a-very-long-collection-name-that-exceeds-the-limit", - item_id="a-very-long-item-id-that-also-exceeds-the-configured-limit", - ) - - cloud_event = adapter._convert_to_cloudevent(event) - - # Check that long values are truncated in headers - assert "subject" in cloud_event - assert "collection" in cloud_event - assert len(cloud_event["subject"].encode("utf-8")) <= config.max_header_length - assert ( - len(cloud_event["collection"].encode("utf-8")) <= config.max_header_length - ) - - # Original values should still be in data payload - assert cloud_event.data["item_id"] == event.item_id - assert cloud_event.data["collection"] == event.collection - - def test_operation_mapping(self, adapter: CloudEventsAdapter) -> None: - """Test operation to event type mapping.""" - test_cases = [ - ("INSERT", "created"), - ("UPDATE", "updated"), - ("DELETE", "deleted"), - ("UNKNOWN", "unknown"), - ] + await adapter.send_event(sample_event) - for operation, expected in test_cases: - event = NotificationEvent( - source="/test", - type="test", - operation=operation, - collection="test", - ) - cloud_event = adapter._convert_to_cloudevent(event) - assert cloud_event["type"].endswith(f".{expected}") + posted_body = mock_client.post.call_args.kwargs["content"] + payload = json.loads(posted_body) + assert payload.get("extra") == "test-value" + assert payload.get("priority") == "high" async def test_health_check(self, adapter: CloudEventsAdapter) -> None: """Test health check.""" @@ -309,60 +280,3 @@ async def test_health_check(self, adapter: CloudEventsAdapter) -> None: # Running with client adapter._client = MagicMock() assert await adapter.health_check() is True - - @patch.dict( - os.environ, - { - "K_CE_OVERRIDES": ( - '{"extensions": {"extra": "test-value", "priority": "high"}}' - ) - }, - ) - def test_convert_to_cloudevent_with_overrides( - self, config: CloudEventsConfig, sample_event: NotificationEvent - ) -> None: - """Test CloudEvent conversion with K_CE_OVERRIDES.""" - # Create adapter after environment variable is set - adapter = CloudEventsAdapter(config) - cloud_event = adapter._convert_to_cloudevent(sample_event) - - assert isinstance(cloud_event, CloudEvent) - assert cloud_event["extra"] == "test-value" - assert cloud_event["priority"] == "high" - - @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"extensions": {"number": 123}}'}) - def test_convert_to_cloudevent_with_number_override( - self, config: CloudEventsConfig, sample_event: NotificationEvent - ) -> None: - """Test CloudEvent conversion with number in K_CE_OVERRIDES.""" - # Create adapter after environment variable is set - adapter = CloudEventsAdapter(config) - cloud_event = adapter._convert_to_cloudevent(sample_event) - - assert cloud_event["number"] == "123" # Should be converted to string - - @patch.dict(os.environ, {"K_CE_OVERRIDES": "invalid-json"}) - def test_convert_to_cloudevent_invalid_overrides( - self, config: CloudEventsConfig, sample_event: NotificationEvent - ) -> None: - """Test CloudEvent conversion with invalid K_CE_OVERRIDES JSON.""" - # Create adapter after environment variable is set - adapter = CloudEventsAdapter(config) - cloud_event = adapter._convert_to_cloudevent(sample_event) - - # Should work normally without overrides - assert isinstance(cloud_event, CloudEvent) - assert cloud_event["source"] == "/eoapi/stac" - - @patch.dict(os.environ, {"K_CE_OVERRIDES": '{"other": "field"}'}) - def test_convert_to_cloudevent_no_extensions( - self, config: CloudEventsConfig, sample_event: NotificationEvent - ) -> None: - """Test CloudEvent conversion with K_CE_OVERRIDES but no extensions field.""" - # Create adapter after environment variable is set - adapter = CloudEventsAdapter(config) - cloud_event = adapter._convert_to_cloudevent(sample_event) - - # Should work normally without extensions - assert isinstance(cloud_event, CloudEvent) - assert cloud_event["source"] == "/eoapi/stac" diff --git a/tests/test_main.py b/tests/test_main.py index 4316e0a..4e415fa 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -11,6 +11,8 @@ import pytest import yaml +from eoapi_notifier.core.event import NotificationEvent +from eoapi_notifier.core.filter import FilterConfig from eoapi_notifier.core.main import NotifierApp, setup_logging @@ -319,6 +321,49 @@ async def trigger_shutdown() -> None: mock_output.send_event.assert_called_with(mock_event) + async def test_process_events_applies_filters(self) -> None: + """Test filtered events are not forwarded to outputs.""" + allowed = NotificationEvent( + source="/test", + type="test", + operation="INSERT", + collection="allowed", + item_id="item-1", + ) + blocked = NotificationEvent( + source="/test", + type="test", + operation="INSERT", + collection="blocked", + item_id="item-2", + ) + + mock_source = Mock() + mock_source.__class__.__name__ = "MockSource" + + async def mock_listen() -> AsyncIterator[Any]: + yield allowed + yield blocked + while not self.app._shutdown_event.is_set(): + await asyncio.sleep(0.01) + + mock_source.listen = mock_listen + mock_output = AsyncMock() + mock_output.send_event.return_value = True + mock_output.__class__.__name__ = "MockOutput" + + self.app.sources = [mock_source] + self.app.outputs = [mock_output] + self.app.filters = [FilterConfig(collections=["allowed"])] + + async def trigger_shutdown() -> None: + await asyncio.sleep(0.1) + self.app._shutdown_event.set() + + await asyncio.gather(self.app.process_events(), trigger_shutdown()) + + mock_output.send_event.assert_called_once_with(allowed) + async def test_process_events_output_error(self) -> None: """Test event processing with output error.""" mock_event = Mock() diff --git a/tests/test_mqtt_output.py b/tests/test_mqtt_output.py index 614b530..44940dd 100644 --- a/tests/test_mqtt_output.py +++ b/tests/test_mqtt_output.py @@ -403,12 +403,9 @@ async def test_send_event_success_qos0( payload_str = payload_bytes.decode("utf-8") payload_data = json.loads(payload_str) - assert payload_data["id"] == sample_event.id - assert payload_data["source"] == "/test/source" - assert payload_data["type"] == "test.event" - assert payload_data["operation"] == "INSERT" - assert payload_data["collection"] == "test-collection" - assert payload_data["item_id"] == "test-item-123" + assert payload_data["type"] == "org.ogc.api.collection.item.create" + assert payload_data["data"]["collection"] == "test-collection" + assert payload_data["data"]["id"] == "test-item-123" @patch("eoapi_notifier.outputs.mqtt.mqtt") async def test_send_event_success_qos1( diff --git a/tests/test_ogc_and_filter.py b/tests/test_ogc_and_filter.py new file mode 100644 index 0000000..fcd94ad --- /dev/null +++ b/tests/test_ogc_and_filter.py @@ -0,0 +1,110 @@ +"""Tests for OGC PubSub CloudEvents builder and event filters.""" + +import json + +import pytest +from cloudevents.conversion import to_structured + +from eoapi_notifier.core.event import NotificationEvent +from eoapi_notifier.core.filter import FilterConfig, matches, parse_filters +from eoapi_notifier.core.ogc import build_cloudevent + + +class TestBuildCloudevent: + """Test OGC CloudEvent construction.""" + + @pytest.fixture + def item_event(self) -> NotificationEvent: + return NotificationEvent( + source="/eoapi/stac/pgstac", + type="org.eoapi.stac.item", + operation="item_created", + collection="sentinel-2-l2a", + item_id="item-1", + geometry={ + "type": "Polygon", + "coordinates": [[[0, 0], [1, 0], [1, 1], [0, 0]]], + }, + bbox=[0, 0, 1, 1], + data={"datetime": "2024-01-01T00:00:00Z"}, + ) + + def test_create_event_type_and_body(self, item_event: NotificationEvent) -> None: + cloud_event = build_cloudevent(item_event) + assert cloud_event["type"] == "org.ogc.api.collection.item.create" + assert cloud_event["datacontenttype"] == "application/json" + assert "dataschema" not in cloud_event + assert cloud_event.data["geometry"] == item_event.geometry + assert cloud_event.data["bbox"] == item_event.bbox + + def test_delete_event_is_text_plain(self) -> None: + event = NotificationEvent( + source="/test", + type="test", + operation="item_deleted", + collection="c", + item_id="item-99", + ) + cloud_event = build_cloudevent(event) + assert cloud_event["type"] == "org.ogc.api.collection.item.delete" + assert cloud_event["datacontenttype"] == "text/plain" + assert "dataschema" not in cloud_event + assert cloud_event.data == "item-99" + + def test_structured_json_roundtrip(self, item_event: NotificationEvent) -> None: + _, body = to_structured(build_cloudevent(item_event)) + payload = json.loads(body) + assert payload["type"] == "org.ogc.api.collection.item.create" + assert payload["data"]["geometry"]["type"] == "Polygon" + + +class TestEventFilter: + """Test event filtering.""" + + def test_empty_filters_pass_all(self) -> None: + event = NotificationEvent( + source="/test", type="t", operation="INSERT", collection="c" + ) + assert matches(event, []) is True + + def test_collection_filter(self) -> None: + event = NotificationEvent( + source="/test", type="t", operation="INSERT", collection="a" + ) + filters = [FilterConfig(collections=["a"])] + assert matches(event, filters) is True + assert matches(event, [FilterConfig(collections=["b"])]) is False + + def test_operation_filter(self) -> None: + event = NotificationEvent( + source="/test", type="t", operation="item_updated", collection="c" + ) + assert matches(event, [FilterConfig(operations=["replace"])]) is True + assert matches(event, [FilterConfig(operations=["delete"])]) is False + + def test_bbox_filter(self) -> None: + event = NotificationEvent( + source="/test", + type="t", + operation="INSERT", + collection="c", + bbox=[0, 0, 1, 1], + ) + assert matches(event, [FilterConfig(bbox=[0, 0, 2, 2])]) is True + assert matches(event, [FilterConfig(bbox=[2, 2, 3, 3])]) is False + + def test_bbox_filter_from_geometry(self) -> None: + event = NotificationEvent( + source="/test", + type="t", + operation="INSERT", + collection="c", + geometry={"type": "Point", "coordinates": [1, 1]}, + ) + assert matches(event, [FilterConfig(bbox=[0, 0, 2, 2])]) is True + assert matches(event, [FilterConfig(bbox=[5, 5, 6, 6])]) is False + + def test_parse_filters_from_config(self) -> None: + filters = parse_filters([{"collections": ["c1"], "operations": ["create"]}]) + assert len(filters) == 1 + assert filters[0].collections == ["c1"] diff --git a/tests/test_pgstac_source.py b/tests/test_pgstac_source.py index 605e5d7..71cae58 100644 --- a/tests/test_pgstac_source.py +++ b/tests/test_pgstac_source.py @@ -243,6 +243,30 @@ def test_invalid_json_returns_none(self, basic_config: PgSTACSourceConfig) -> No source = PgSTACSource(basic_config) assert source._create_notification_event("invalid json") is None + async def test_enrich_geometry(self, basic_config: PgSTACSourceConfig) -> None: + """Test geometry enrichment from pgSTAC query.""" + source = PgSTACSource(basic_config) + mock_connection = AsyncMock() + mock_connection.fetchrow.return_value = { + "geometry": '{"type": "Point", "coordinates": [1, 2]}', + "bbox": "[1, 2, 3, 4]", + } + source._connection = mock_connection + source._connected = True + + event = NotificationEvent( + source="/test", + type="test", + operation="INSERT", + collection="test", + item_id="item-123", + ) + await source._enrich_geometry(event) + + assert event.geometry == {"type": "Point", "coordinates": [1, 2]} + assert event.bbox == [1.0, 2.0, 3.0, 4.0] + mock_connection.fetchrow.assert_awaited_once() + def test_status_without_correlation(self, basic_config: PgSTACSourceConfig) -> None: """Test status reporting without correlation.""" source = PgSTACSource(basic_config)