This repository implements a reproducible edge-to-cloud architecture for multimodal IoT ingestion, schema governance, stream/batch processing, high-availability transactional storage, and operational observability. The stack targets real-time livestock and precision-agriculture workloads, but the design is generic enough for other industrial IoT domains.
The platform combines six logical planes:
-
Edge and ingestion
- MQTT ingestion through a 3-node VerneMQ cluster.
- An MQTT-to-Kafka bridge that forwards telemetry, GPS messages, and media metadata into Kafka topics.
-
Event backbone and schema governance
- A 3-node Apache Kafka KRaft cluster.
- Apicurio Registry for JSON schema governance and artifact lifecycle.
-
Object storage
- SeaweedFS S3 as object storage for raw and derived media objects.
- Bucket notifications that emit
media.object.eventsto Kafka.
-
Stream and batch processing
- Apache Flink for stateful stream processing with checkpointing and savepoints on SeaweedFS S3.
- Apache Beam job server and Python harness for portable pipelines.
- Apache Airflow 3 for scheduled orchestration and replay/backfill workflows.
-
Operational and analytical storage
- A 3-node Patroni/etcd-backed TimescaleDB HA cluster.
- HAProxy for read-write and read-only routing.
- PgBouncer for client pooling.
-
Observability and administration
- Prometheus, Grafana, cAdvisor, statsd-exporter, postgres-exporter.
- pgAdmin for database inspection.
flowchart LR
subgraph Edge
D[IoT devices]
M[Media producers]
end
subgraph Ingestion
V[VerneMQ cluster]
B[MQTT-Kafka bridge]
end
subgraph Backbone
K[Kafka KRaft cluster]
R[Apicurio Registry]
O[SeaweedFS S3]
end
subgraph Processing
F[Flink]
J[Beam job server]
A[Airflow 3]
end
subgraph Storage
H[HAProxy]
P[(Patroni + TimescaleDB HA)]
G[PgBouncer]
end
subgraph Observability
PR[Prometheus]
GF[Grafana]
end
D --> V --> B --> K
M --> O --> K
K --> R
K --> F
K --> J
A --> O
A --> K
F --> O
F --> H --> P
G --> H
PR --> GF
airflow/ Airflow DAGs, logs, plugins
apicurio/bootstrap/ Registry bootstrap artifacts
beam/ Beam job server and Python harness images
deploy/ Docker Swarm and Kubernetes production targets
flink/ Custom PyFlink image and jobs
grafana/ Dashboards and provisioning
haproxy/ Read-write / read-only routing
kafka-connect/ Debezium connector definitions
mqtt-kafka-bridge/ MQTT to Kafka bridge
management-console/ Internal architecture management console
orchestration/ Airflow image build
patroni/ Patroni templates
pgadmin/ pgAdmin preconfigured servers
pgbouncer/ Explicit pooling configuration
pipelines/ Replay/backfill utilities
prometheus/ Metrics scraping and alert rules
scripts/ Bootstrap and entrypoint helpers
secrets/ Docker secrets (local dev only)
This finalized version addresses the following problems found during the audit:
- The Airflow DAG passed
--window-startand--window-end, butpipelines/media_backfill.pyonly accepted--since-minutes. - The Airflow containers did not receive Kafka and S3/SeaweedFS credentials, so the backfill workflow could not read objects or publish replayed events.
- Kafka Connect internal topics (
__connect-configs,__connect-offsets,__connect-status) were missing while Kafka auto-topic creation was disabled. - The
raw.gpstopic existed but no matching schema artifact was bootstrapped into Apicurio Registry. - The MQTT bridge emitted a generic record that did not match the intended telemetry contract.
- PgBouncer shipped dedicated config files and userlists that were not mounted, which made the repository harder to reason about.
- The environment-specific compose overlays were empty.
The Airflow DAG now calls a backfill script that:
- Reads objects from SeaweedFS S3 in a bounded window.
- Maps the bucket and media kind to one of:
raw.image2d.metaraw.image3d.metaraw.video2d.metaraw.video3d.meta
- Publishes synthetic metadata events to Kafka so downstream consumers can reprocess missed media arrivals.
This makes the Airflow workflow operational instead of only printing objects to stdout.
| Topic | Purpose | Partitions | Replication |
|---|---|---|---|
raw.gps |
GPS events | 24 | 3 |
raw.sensor |
Telemetry events | 24 | 3 |
raw.image2d.meta |
2D image metadata | 12 | 3 |
raw.image3d.meta |
3D image metadata | 12 | 3 |
raw.video2d.meta |
2D video metadata | 12 | 3 |
raw.video3d.meta |
3D video metadata | 12 | 3 |
media.object.events |
S3 object notifications | 12 | 3 |
features.events |
Derived features | 24 | 3 |
alerts.events |
Alerts | 12 | 3 |
state.latest |
Compacted latest state | 12 | 3 |
dlq.events |
Dead-letter events | 12 | 3 |
governance.data.products |
DGA data product catalogue | 3 | 3 |
governance.access.requests |
DGA access requests and decisions | 6 | 3 |
governance.permission.events |
DGA consent/permission lifecycle | 6 | 3 |
governance.intermediation.log |
DGA intermediation activity log | 12 | 3 |
governance.transfer.notices |
DGA unauthorised access/transfer/use notices | 6 | 3 |
governance.research.projects |
Research project register and ethics status | 3 | 3 |
governance.research.outputs |
Research outputs and disclosure review evidence | 6 | 3 |
governance.dataset.catalog |
Dataset catalogue, FAIR metadata and access policy | 3 | 3 |
governance.data_management_plans |
Data Management Plans for research and releases | 3 | 3 |
governance.repository.exports |
Repository, Zenodo and OpenAIRE export evidence | 6 | 3 |
dataact.product.catalog |
Data Act connected-product and related-service catalogue | 3 | 3 |
dataact.user.access.requests |
Data Act user access requests and decisions | 6 | 3 |
dataact.third_party.sharing |
User-authorized third-party sharing evidence | 6 | 3 |
dataact.user.exports |
User export and delivery evidence | 6 | 3 |
dataact.safeguards |
Data Act security and trade-secret safeguards | 3 | 3 |
dataact.legal_basis.checks |
Data Act and personal-data release checks | 6 | 3 |
security.asset.inventory |
Security asset inventory and ownership | 3 | 3 |
security.incident.events |
NIS2/DORA/CRA incident evidence | 6 | 3 |
security.vulnerability.findings |
Vulnerability and remediation register | 6 | 3 |
security.sbom.attestations |
SBOM, provenance and signature evidence | 3 | 3 |
security.patch.events |
Security update and rollback evidence | 6 | 3 |
resilience.backup.tests |
Restore test, RPO and RTO evidence | 3 | 3 |
resilience.operational.risk |
ICT and operational risk register | 3 | 3 |
resilience.third_party.risk |
ICT supplier and exit-plan register | 3 | 3 |
compliance.scope.decisions |
Regulatory scope decision register | 3 | 3 |
compliance.control.assessments |
Compliance control assessment evidence | 6 | 3 |
compliance.reporting.channels |
Regulatory reporting channel register | 3 | 3 |
compliance.legal.dossier |
Legal dossier artefacts and release gates | 3 | 3 |
cra.product.lifecycle |
CRA product support and update lifecycle | 3 | 3 |
kafkasql-journal-v3 |
Apicurio KafkaSQL journal | 1 | 3 |
kafkasql-snapshots-v3 |
Apicurio KafkaSQL snapshots | 1 | 3 |
registry-events-v3 |
Apicurio registry events | 1 | 3 |
__connect-configs |
Kafka Connect internal config topic | 1 | 3 |
__connect-offsets |
Kafka Connect internal offsets topic | 12 | 3 |
__connect-status |
Kafka Connect internal status topic | 6 | 3 |
raw.sensorraw.gpsraw.image2d.metaraw.image3d.metaraw.video2d.metaraw.video3d.metamedia.object.eventsdlq.eventsgovernance.data.productsgovernance.access.requestsgovernance.permission.eventsgovernance.intermediation.loggovernance.transfer.noticesgovernance.research.projectsgovernance.research.outputsgovernance.dataset.cataloggovernance.data_management_plansgovernance.repository.exportsdataact.product.catalogdataact.user.access.requestsdataact.third_party.sharingdataact.user.exportsdataact.safeguardsdataact.legal_basis.checkssecurity.asset.inventorysecurity.incident.eventssecurity.vulnerability.findingssecurity.sbom.attestationssecurity.patch.eventsresilience.backup.testsresilience.operational.riskresilience.third_party.riskcompliance.scope.decisionscompliance.control.assessmentscompliance.reporting.channelscompliance.legal.dossiercra.product.lifecycle
The MQTT bridge subscribes to both generic devices and WildFi telemetry:
$share/ingestors/devices/#
$share/ingestors/wildfi/#
Decoded WildFi GPS/GNSS/raw GPS payloads are routed to raw.gps. Decoded IMU, environment,
proximity, movement, and metadata payloads are routed to raw.sensor. See
docs/runbooks/wildfi-ingestion.md for the expected topic and payload contract.
Native WildFi .bin logs are decoded with the packaged wildfi-decoder image, built from
https://github.com/wildlab/WildFiDecoder, not inside the MQTT bridge.
ingest_net: internal ingestion plane for MQTT and SeaweedFS producers.kafka_net: internal event backbone.patroni_backend: internal HA database control plane.patroni_frontend: client-facing database access plane.orchestration_net: Airflow, Grafana, Apicurio UI, and other control-plane services.
This segmentation limits accidental coupling between components and makes troubleshooting easier.
This repository is configured for GitHub Actions-based SonarQube analysis:
- Workflow:
.github/workflows/sonarqube.yml - Project scanner config:
sonar-project.properties - Required repository secrets:
SONAR_TOKENSONAR_HOST_URL(optional; defaults tohttps://sonarcloud.iowhen unset, set it for self-hosted SonarQube)CODACY_PROJECT_TOKEN(optional; when set, coverage is also uploaded to Codacy)
The workflow runs on pushes to main/master, on pull requests, and manually via workflow_dispatch.
The stack uses two complementary configuration channels:
-
.env/.env.example- Non-file environment variables.
- Public URLs.
- Build pins.
- SeaweedFS S3 credentials used by Flink/Airflow/Beam.
-
./secrets/*.txt- Database passwords.
- SeaweedFS S3 credentials.
- PgAdmin password.
- PgBouncer userlists.
cp .env.example .envThen set at least:
AIRFLOW_FERNET_KEYAIRFLOW_API_SECRET_KEYAIRFLOW_JWT_SECRETAIRFLOW_ADMIN_USERAIRFLOW_ADMIN_PASSWORDAIRFLOW_DB_PASSWORDVERNEMQ_DISTRIBUTED_COOKIEVERNEMQ_ADMIN_PASSWORDSEAWEEDFS_S3_ACCESS_KEYSEAWEEDFS_S3_SECRET_KEYAPICURIO_PUBLIC_URLGRAFANA_PUBLIC_URL
secrets/app_user_password.txt
secrets/debezium_password.txt
secrets/seaweedfs_s3_secret_key.txt
secrets/seaweedfs_s3_access_key.txt
secrets/patroni_repl_password.txt
secrets/patroni_rewind_password.txt
## 8. Quality checks and bug-fix workflow
Before committing changes, run the local automated checks:
```bash
pytest -q
Recent reliability hardening:
- The MQTT→Kafka bridge now logs asynchronous Kafka producer failures with the actual exception details, which makes transient broker/network issues diagnosable during operations. secrets/pgadmin_default_password.txt secrets/pgbouncer_ro_userlist.txt secrets/pgbouncer_rw_userlist.txt secrets/postgres_exporter_password.txt secrets/postgres_superuser_password.txt
### Generate new secrets locally
```bash
python - <<'PY'
import secrets
from pathlib import Path
target = Path("secrets")
target.mkdir(exist_ok=True)
files = {
"app_user_password.txt": secrets.token_urlsafe(24),
"debezium_password.txt": secrets.token_urlsafe(24),
"seaweedfs_s3_access_key.txt": secrets.token_urlsafe(24),
"seaweedfs_s3_secret_key.txt": secrets.token_urlsafe(32),
"patroni_repl_password.txt": secrets.token_urlsafe(24),
"patroni_rewind_password.txt": secrets.token_urlsafe(24),
"pgadmin_default_password.txt": secrets.token_urlsafe(24),
"postgres_exporter_password.txt": secrets.token_urlsafe(24),
"postgres_superuser_password.txt": secrets.token_urlsafe(24),
}
for name, value in files.items():
(target / name).write_text(value + "\n", encoding="utf-8")
PY
Use the base compose file plus an environment-specific overlay. The base file describes the
internal topology and does not publish host ports. Development ports live in
docker-compose.dev.yml; production-like local validation exposes only the explicit edge service.
docker compose -f docker-compose.yml -f docker-compose.dev.yml up -d --build
# or
./install_dealiot.sh dev up -d --builddocker compose -f docker-compose.yml -f docker-compose.staging.yml up -d --build
# or
./install_dealiot.sh staging up -d --builddocker compose -f docker-compose.yml -f docker-compose.prod.yml up -d --build
# or
./install_dealiot.sh prod up -d --buildThe Swarm stack lives in deploy/swarm/dealiot-stack.yml. It deploys the application/runtime
plane and expects production-grade stateful dependencies to be provisioned outside the stack:
Kafka, MQTT, S3-compatible storage, Airflow PostgreSQL, and Airflow Redis.
docker stack deploy -c deploy/swarm/dealiot-stack.yml dealiotThe CI-only stack deploy/swarm/dealiot-smoke-stack.yml verifies Swarm deployment mechanics with
the locally built bridge image.
The Kubernetes target is a Kustomize base under deploy/kubernetes/base.
kubectl apply -k deploy/kubernetes/baseCreate environment-specific overlays for real clusters to patch image tags, external endpoints,
ingress, storage classes, and secret references. The CI overlay
deploy/kubernetes/overlays/ci-smoke is intentionally minimal and validates rollout wiring on
kind.
These endpoints are available when the development overlay is active.
| Service | URL / Port |
|---|---|
| Airflow API/UI | http://localhost:8088 |
| Flink UI | http://localhost:8081 |
| Kafka Connect | http://localhost:8083 |
| Apicurio Registry API | http://localhost:8082/apis/registry/v3 |
| Apicurio Registry UI | http://localhost:8888 |
| SeaweedFS S3 API | http://localhost:8333 |
| SeaweedFS Filer UI | http://localhost:8889 |
| pgAdmin | http://localhost:5050 |
| Grafana | http://localhost:3000 |
| Management Console | http://localhost:8090 |
| Prometheus | http://localhost:9090 |
| HAProxy stats | http://localhost:7000 |
| PostgreSQL RW | localhost:5432 |
| PostgreSQL RO | localhost:5433 |
| PgBouncer RW | localhost:6432 |
| PgBouncer RO | localhost:6433 |
After rendering Compose and before accepting a platform change, run:
bash scripts/smoke-e2e.shThe smoke test starts the event-flow services, submits the minimal Flink job, publishes MQTT
fixtures, and verifies raw.sensor, dlq.events, features.events, state.latest, and Apicurio
artifacts.
After startup, validate in this order:
docker compose exec kafka1 /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka1:9092 --listdocker compose exec pg1 curl -s http://127.0.0.1:8008/patroni
docker compose exec haproxy sh -lc "nc -zv 127.0.0.1 5432"docker compose exec seaweedfs-init sh -lc 'echo "SeaweedFS bootstrap completed"'Open the registry UI and verify the expected groups/artifacts are present.
Open the Airflow UI and verify that media_backfill is visible and import-error free.
Python producers validate critical event contracts before publishing to Kafka:
- valid records go to their intended
raw.*topic - invalid records go to
dlq.events timestampis the event time;ingested_atis when the platform received or replayed it
This local validation is a guardrail. Apicurio remains the source of schema documentation and consumer compatibility governance.
The stack uses the Airflow 3 split model:
- API/UI server (
airflow-apiserver) - scheduler
- worker
- triggerer
- standalone DAG processor
This separation is intentional and aligns with the Airflow 3 deployment model.
Flink stores checkpoints and savepoints in SeaweedFS S3:
s3://flink-checkpoints/streamings3://flink-savepoints/streaming
The timescaledb-source connector captures changes from appdb via a named publication:
- publication:
dbz_publication - slot:
debezium_appdb
- RW traffic: HAProxy
5432 - RO traffic: HAProxy
5433 - pooled RW: PgBouncer
6432 - pooled RO: PgBouncer
6433
- Operations
- Backup and restore
- Security hardening
- Security resilience compliance
- Legal applicability
- Legal compliance dossier
- Legal finalization report
- Data Governance Act
- Data Act
- Dataset catalogue and Data Management Plan
- Zenodo dataset export
- OpenAIRE metadata export
- Legal readiness review
- Add TLS and authentication for Kafka, MQTT, object storage, and public UIs before any non-local deployment.
- Add backup and restore runbooks for Kafka metadata, TimescaleDB, SeaweedFS, Grafana, and Airflow metadata.
- Finalize the production stateful layer with managed services or dedicated operators for Kafka, PostgreSQL, Redis, MQTT, and object storage before go-live.
- Add downstream consumers for
features.events,alerts.events, andstate.latest. - Add one or more domain-specific Flink jobs under
flink/jobs/.
This finalized package contains:
- a patched
docker-compose.yml - functional environment overlays for dev/staging/prod
- a shared lightweight event contract module used by Python producers
- a
dlq.eventsschema and DLQ routing for invalid producer events - an E2E smoke script and runbooks for operations, backup/restore, and security hardening
- a new
README.md - a new
.env.example - a corrected MQTT bridge
- a management console for architecture health, data catalogue, runbooks, and compliance controls
- a corrected media backfill utility
- a new
raw.gpsApicurio schema - Docker Swarm and Kubernetes production deployment targets with CI smoke workflows