Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sources:
host: localhost
port: 5432
database: postgis
username: username
user: postgres
password: password

# Outputs: Define where notifications are sent
Expand All @@ -67,7 +67,9 @@ outputs:
broker_port: 1883
```

See [examples/config.yaml](./examples/config.yaml) for a complete configuration example with all available options.
Both outputs publish [OGC PubSub CloudEvents-JSON](https://github.com/opengeospatial/pubsub) messages (types such as `org.ogc.api.collection.item.create`). The pgSTAC source can enrich events with item geometry (`include_geometry: true`, default). Optional top-level `filters` restrict forwarding by collection, operation (`create`/`replace`/`delete`), and bbox.

See [examples/config.yaml](./examples/config.yaml) for configuration and [examples/output-create.json](./examples/output-create.json) / [examples/output-delete.json](./examples/output-delete.json) for sample MQTT and CloudEvents payloads.

## Kubernetes Deployment

Expand All @@ -87,7 +89,8 @@ See [Helm Chart README](helm-chart/eoapi-notifier/README.md) for configuration o
- `pgstac`: Monitor PostgreSQL/pgSTAC database changes

#### Outputs
- `mqtt`: Publish events to MQTT broker
- `mqtt`: Publish OGC CloudEvents-JSON to an MQTT broker
- `cloudevents`: POST OGC CloudEvents-JSON to an HTTP endpoint (or Knative `K_SINK`)

## Development

Expand Down
1 change: 1 addition & 0 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class MyOutput(BaseOutput):

async def send_event(self, event: NotificationEvent) -> bool:
try:
# Built-in mqtt/cloudevents outputs use eoapi_notifier.core.ogc.build_cloudevent().
payload = {
"id": event.id,
"source": event.source,
Expand Down
4 changes: 4 additions & 0 deletions eoapi_notifier/core/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class NotificationEvent(BaseModel):
collection: STAC collection ID
item_id: STAC item ID
timestamp: Event timestamp
geometry: GeoJSON geometry
bbox: Bounding box [minx, miny, maxx, maxy]
data: Additional event data
"""

Expand All @@ -33,4 +35,6 @@ class NotificationEvent(BaseModel):
collection: str
item_id: str | None = None
timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))
geometry: dict[str, Any] | None = None
bbox: list[float] | None = None
data: dict[str, Any] = Field(default_factory=dict)
75 changes: 75 additions & 0 deletions eoapi_notifier/core/filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Event filtering by collection, operation, and bbox."""

from typing import Any

from pydantic import BaseModel, ConfigDict, Field

from .event import NotificationEvent
from .ogc import canonical_operation


class FilterConfig(BaseModel):
"""Single filter block; unset fields are ignored."""

model_config = ConfigDict(extra="forbid")

collections: list[str] | None = None
operations: list[str] | None = None
bbox: list[float] | None = Field(default=None, min_length=4, max_length=4)


def parse_filters(raw: list[dict[str, Any]]) -> list[FilterConfig]:
"""Parse filter blocks from config."""
return [FilterConfig.model_validate(block) for block in raw]


def _event_bbox(event: NotificationEvent) -> list[float] | None:
if event.bbox and len(event.bbox) >= 4:
return event.bbox
if not event.geometry:
return None
coords = event.geometry.get("coordinates")
if not coords:
return None
# Flatten coordinate pairs for a simple bbox estimate
flat: list[float] = []

def walk(node: Any) -> None:
if isinstance(node, int | float):
flat.append(float(node))
elif isinstance(node, list):
for item in node:
walk(item)

walk(coords)
if len(flat) < 2:
return None
xs = flat[0::2]
ys = flat[1::2]
return [min(xs), min(ys), max(xs), max(ys)]


def _bbox_overlaps(a: list[float], b: list[float]) -> bool:
return not (a[2] < b[0] or a[0] > b[2] or a[3] < b[1] or a[1] > b[3])


def _matches_block(event: NotificationEvent, block: FilterConfig) -> bool:
if block.collections is not None and event.collection not in block.collections:
return False
if block.operations is not None:
event_op = canonical_operation(event.operation) or event.operation.lower()
allowed = {canonical_operation(op) or op.lower() for op in block.operations}
if event_op not in allowed:
return False
if block.bbox is not None:
event_bbox = _event_bbox(event)
if event_bbox is None or not _bbox_overlaps(event_bbox, block.bbox):
return False
return True


def matches(event: NotificationEvent, filters: list[FilterConfig]) -> bool:
"""Return True if event passes filters (OR across blocks, AND within block)."""
if not filters:
return True
return any(_matches_block(event, block) for block in filters)
7 changes: 7 additions & 0 deletions eoapi_notifier/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import yaml
from loguru import logger

from .filter import FilterConfig, matches, parse_filters
from .registry import create_output, create_source


Expand All @@ -19,6 +20,7 @@ def __init__(self) -> None:
"""Initialize the application."""
self.sources: list[Any] = []
self.outputs: list[Any] = []
self.filters: list[FilterConfig] = []
self._shutdown_event = asyncio.Event()
self._running = False

Expand Down Expand Up @@ -239,6 +241,10 @@ async def _process_source_events(self, source: Any) -> None:
f"Received event #{event_count} from {source_name}: {event}"
)

if not matches(event, self.filters):
logger.debug(f"Event {event.id} filtered out")
continue

# Send event to all outputs
for output in self.outputs:
output_name = output.__class__.__name__
Expand Down Expand Up @@ -314,6 +320,7 @@ async def run(self, config_path: Path) -> None:
# Create plugins
logger.debug("Creating plugins...")
self.create_plugins(config)
self.filters = parse_filters(config.get("filters", []))

if not self.sources and not self.outputs:
logger.error("No plugins configured")
Expand Down
62 changes: 62 additions & 0 deletions eoapi_notifier/core/ogc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""OGC PubSub CloudEvents-JSON message builder."""

from typing import Any

from cloudevents.http import CloudEvent

from .event import NotificationEvent

OGC_ITEM_TYPE_PREFIX = "org.ogc.api.collection.item"

# Raw pgSTAC and correlated semantic operations mapped to OGC verbs.
_CANONICAL_OPERATIONS = {
"insert": "create",
"item_created": "create",
"create": "create",
"update": "replace",
"item_updated": "replace",
"replace": "replace",
"delete": "delete",
"item_deleted": "delete",
}


def canonical_operation(operation: str) -> str | None:
"""Map a raw or semantic operation to an OGC verb (create/replace/delete)."""
return _CANONICAL_OPERATIONS.get(operation.lower())


def build_cloudevent(
event: NotificationEvent, source: str = "/eoapi/stac"
) -> CloudEvent:
"""Build an OGC PubSub CloudEvent from a notification event."""
verb = canonical_operation(event.operation)

attributes: dict[str, Any] = {
"specversion": "1.0",
"id": event.id,
"source": source,
"type": f"{OGC_ITEM_TYPE_PREFIX}.{verb or event.operation.lower()}",
"time": event.timestamp.isoformat(),
}

if event.item_id:
attributes["subject"] = event.item_id
if event.collection:
attributes["collection"] = event.collection

if verb == "delete":
attributes["datacontenttype"] = "text/plain"
data: Any = event.item_id or ""
else:
attributes["datacontenttype"] = "application/json"
data = {
"type": "Feature",
"id": event.item_id,
"collection": event.collection,
"geometry": event.geometry,
"bbox": event.bbox,
"properties": event.data,
}

return CloudEvent(attributes, data)
88 changes: 13 additions & 75 deletions eoapi_notifier/outputs/cloudevents.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
import json
import os
from typing import Any
from uuid import uuid4

import httpx
from cloudevents.conversion import to_binary
from cloudevents.http import CloudEvent
from cloudevents.conversion import to_structured
from pydantic import field_validator, model_validator

from ..core.event import NotificationEvent
from ..core.ogc import build_cloudevent
from ..core.plugin import BaseOutput, BasePluginConfig, PluginMetadata


Expand All @@ -25,11 +24,9 @@ class CloudEventsConfig(BasePluginConfig):

endpoint: str | None = None
source: str = "/eoapi/stac"
event_type: str = "org.eoapi.stac"
timeout: float = 30.0
max_retries: int = 3
retry_backoff: float = 1.0
max_header_length: int = 4096
overrides: dict[str, str] = {}

@field_validator("endpoint")
Expand All @@ -56,11 +53,9 @@ def get_sample_config(cls) -> dict[str, Any]:
return {
"endpoint": None, # Uses K_SINK env var if not set
"source": "/eoapi/stac",
"event_type": "org.eoapi.stac",
"timeout": 30.0,
"max_retries": 3,
"retry_backoff": 1.0,
"max_header_length": 4096,
}

@classmethod
Expand All @@ -81,7 +76,6 @@ def get_status_info(self) -> dict[str, Any]:
return {
"Endpoint": self.endpoint or "K_SINK env var",
"Source": self.source,
"Event Type": self.event_type,
"Timeout": f"{self.timeout}s",
"Max Retries": self.max_retries,
}
Expand Down Expand Up @@ -173,25 +167,30 @@ async def send_event(self, event: NotificationEvent) -> bool:

# Convert to CloudEvent
self.logger.debug(f"Converting event {event.id} to CloudEvent format...")
cloud_event = self._convert_to_cloudevent(event)
cloud_event = build_cloudevent(event, source=self.config.source)
for key, value in self._ce_extensions.items():
cloud_event[key] = str(value)
self.logger.debug(
f"CloudEvent created: id={cloud_event['id']}, "
f"type={cloud_event['type']}"
)

# Convert to binary format
self.logger.debug("Converting CloudEvent to binary format...")
headers, data = to_binary(cloud_event)
# Convert to CloudEvents-JSON
self.logger.debug("Converting CloudEvent to structured format...")
structured_headers, structured_body = to_structured(cloud_event)
headers = dict(structured_headers)
self.logger.debug(
f"Binary conversion complete, headers: {list(headers.keys())}"
f"Structured conversion complete, headers: {list(headers.keys())}"
)

# Send HTTP POST
self.logger.debug(
f"Sending CloudEvent {cloud_event['id']} to {endpoint} "
f"(timeout: {self.config.timeout}s)"
)
response = await self._client.post(endpoint, headers=headers, data=data)
response = await self._client.post(
endpoint, headers=headers, content=structured_body
)
response.raise_for_status()

self.logger.debug(
Expand Down Expand Up @@ -219,67 +218,6 @@ async def send_event(self, event: NotificationEvent) -> bool:
)
return False

def _truncate_header(self, value: str | None) -> str | None:
"""Truncate header value to max_header_length if needed."""
if not value:
return value
if len(value.encode("utf-8")) <= self.config.max_header_length:
return value
# Truncate to byte limit, ensuring valid UTF-8
truncated = value.encode("utf-8")[: self.config.max_header_length]
return truncated.decode("utf-8", errors="ignore")

def _convert_to_cloudevent(self, event: NotificationEvent) -> CloudEvent:
"""Convert NotificationEvent to CloudEvent."""
# Use config values which now include environment overrides
source = self.config.source
event_type_base = self.config.event_type

# Use pre-parsed KNative CE overrides
ce_extensions = self._ce_extensions

# Map operation to event type suffix
operation_map = {"INSERT": "created", "UPDATE": "updated", "DELETE": "deleted"}
operation = operation_map.get(event.operation.upper(), event.operation.lower())

attributes = {
"id": str(uuid4()),
"source": source,
"type": f"{event_type_base}.{operation}",
"time": event.timestamp.isoformat(),
"datacontenttype": "application/json",
}

# Add subject if item_id exists
if event.item_id:
truncated_subject = self._truncate_header(event.item_id)
if truncated_subject:
attributes["subject"] = truncated_subject

# Add collection attribute
if event.collection:
truncated_collection = self._truncate_header(event.collection)
if truncated_collection:
attributes["collection"] = truncated_collection

# Apply KNative CE extension overrides
for key, value in ce_extensions.items():
attributes[key] = str(value)

# Event data payload
data = {
"id": event.id,
"source": event.source,
"type": event.type,
"operation": event.operation,
"collection": event.collection,
"item_id": event.item_id,
"timestamp": event.timestamp.isoformat(),
**event.data,
}

return CloudEvent(attributes, data)

async def health_check(self) -> bool:
"""Check if the adapter is healthy."""
return self._running and self._client is not None
Loading
Loading