From 8816735820807220abcf90769928fd08c8a14be7 Mon Sep 17 00:00:00 2001 From: srmadscience Date: Fri, 12 Jun 2026 14:47:34 +0100 Subject: [PATCH 1/2] rtia: reshape iot_data to Telegraf line-protocol form Restructure rtia.iot_data from flat columns to the Telegraf outputs.cratedb shape (hash_id / timestamp / name / tags / fields), so the same table can be loaded by COPY FROM or by Telegraf's crate/postgres plugin. That plugin can't write a GEO_POINT, so the geo coords ride along as doubles in fields (geo_lon/geo_lat) and geo_location is GENERATED from them; day is GENERATED from timestamp. - sql/rtia_schema_create.sql: new iot_data DDL (tags/fields OBJECTs, generated day + geo_location, PK (hash_id, timestamp, day), partitioned by day). - sql/rtia_first_queries.sql, sql/rtia_advanced_queries.sql: rewrite iot_data column refs to tags['...'] / fields['...']; plants/devices/ maintenance_log columns unchanged. - grafana/rtia.json: remap 22 panel queries to the bracket-notation access pattern. - README.md: document the line-protocol shape and dual ingestion path. The 500k-row iot_demo_dataset.json (240 MB) is not committed; the canonical copy lives on S3 where the COPY FROM statements read it. Verified end-to-end against a CrateDB 6.3.2 cluster: drop + reload of all 6 rtia tables, and all rtia dashboard/SQL queries execute clean. Co-Authored-By: Claude Opus 4.8 --- README.md | 2 + grafana/rtia.json | 44 ++++++------- sql/rtia_advanced_queries.sql | 112 ++++++++++++++++++---------------- sql/rtia_first_queries.sql | 112 ++++++++++++++++++---------------- sql/rtia_schema_create.sql | 43 ++++++------- 5 files changed, 163 insertions(+), 150 deletions(-) diff --git a/README.md b/README.md index a80bd84..f250ef4 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,8 @@ Unlike the weather demo, RTIA has **no load generator** — it is delivered enti The maintenance-note vectors are 384-dimension `FLOAT_VECTOR` embeddings (sentence-transformers `all-MiniLM-L6-v2`), queried with `KNN_MATCH` for semantic search and combined with `MATCH` for hybrid relevance. +`iot_data` is stored in the Telegraf line-protocol shape — `hash_id`, `timestamp`, `name`, a `tags` `OBJECT` for the string dimensions (`tags['device_id']`, `tags['status']`, `tags['plant_id']`, the flattened `tags['metadata_*']`, …) and a `fields` `OBJECT` for the numeric measurements (`fields['metric_value']`, `fields['quality_score']`). The geo coordinates ride along as plain doubles (`fields['geo_lon']` / `fields['geo_lat']`) and `geo_location` is a `GEO_POINT` `GENERATED` from them. That shape lets the same table be populated either by the `COPY FROM` in the schema script or by Telegraf's `outputs.cratedb` (postgres/crate) plugin, which can't write a `GEO_POINT` directly — so the query scripts and dashboard reach into the objects with `tags['…']` / `fields['…']` bracket notation. + ### Dashboard [`grafana/rtia.json`](grafana/rtia.json) is the **"Real Time Industrial Analytics Dashboard"** — summary KPIs, critical-event tracking, device-level detail, maintenance cost by plant, OEE, KNN searches, full-text search, and geospatial panels over the `rtia` schema. Import it the same way as the weather dashboard: add a PostgreSQL datasource pointing at your CrateDB cluster, then **Dashboards > Import**. diff --git a/grafana/rtia.json b/grafana/rtia.json index 70a894e..7377e9a 100644 --- a/grafana/rtia.json +++ b/grafana/rtia.json @@ -53,7 +53,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n status,\n ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS pct\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY status\nORDER BY status DESC;", + "rawSql": "SELECT\n tags['status'] AS status,\n ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS pct\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY tags['status']\nORDER BY tags['status'] DESC;", "sql": { "columns": [ { @@ -168,7 +168,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n EXTRACT(DOW FROM \"timestamp\") AS day_of_week_number,\n date_format('%a', \"timestamp\") AS day_of_week,\n COUNT(*) FILTER (WHERE status = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY day_of_week,day_of_week_number\nORDER BY day_of_week_number;", + "rawSql": "SELECT\n EXTRACT(DOW FROM \"timestamp\") AS day_of_week_number,\n date_format('%a', \"timestamp\") AS day_of_week,\n COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY day_of_week,day_of_week_number\nORDER BY day_of_week_number;", "sql": { "columns": [ { @@ -311,7 +311,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n d.device_id AS \"Device ID\",\n d.manufacturer AS \"Manufacturer\",\n d.device_type AS \"Device Type\",\n d.warranty_expiry AS \"Warranty Expiry\",\n d.asset_value_eur AS \"Asset Value €\",\n COUNT(*) AS \"Critical Readings\",\n MAX(i.\"timestamp\") AS \"Last Critical at\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.device_id = d.device_id\nWHERE i.status = 'critical'\n AND d.warranty_expiry < TIMESTAMP '2025-09-01'\nGROUP BY d.device_id, d.manufacturer, d.device_type, d.warranty_expiry, d.asset_value_eur\nORDER BY \"Critical Readings\" DESC\nLIMIT 15;", + "rawSql": "SELECT\n d.device_id AS \"Device ID\",\n d.manufacturer AS \"Manufacturer\",\n d.device_type AS \"Device Type\",\n d.warranty_expiry AS \"Warranty Expiry\",\n d.asset_value_eur AS \"Asset Value €\",\n COUNT(*) AS \"Critical Readings\",\n MAX(i.\"timestamp\") AS \"Last Critical at\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.tags['device_id'] = d.device_id\nWHERE i.tags['status'] = 'critical'\n AND d.warranty_expiry < TIMESTAMP '2025-09-01'\nGROUP BY d.device_id, d.manufacturer, d.device_type, d.warranty_expiry, d.asset_value_eur\nORDER BY \"Critical Readings\" DESC\nLIMIT 15;", "sql": { "columns": [ { @@ -416,7 +416,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n d.device_id AS \"Device ID\",\n d.device_type AS \"Device Type\",\n initcap(replace(d.plant_id,'PLANT_','')) AS \"Plant ID\",\n d.responsible_technician AS \"Responsible Technician\",\n d.next_maintenance_due AS \"Next Maintenance Due\",\n COUNT(*) FILTER (WHERE i.status IN ('warning', 'critical')) AS \"Fault Readings\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.device_id = d.device_id\nWHERE d.next_maintenance_due < TIMESTAMP '2025-09-01'\nGROUP BY d.device_id, d.device_type, d.plant_id, d.responsible_technician, d.next_maintenance_due\nORDER BY \"Fault Readings\" DESC\nLIMIT 15;", + "rawSql": "SELECT\n d.device_id AS \"Device ID\",\n d.device_type AS \"Device Type\",\n initcap(replace(d.plant_id,'PLANT_','')) AS \"Plant ID\",\n d.responsible_technician AS \"Responsible Technician\",\n d.next_maintenance_due AS \"Next Maintenance Due\",\n COUNT(*) FILTER (WHERE i.tags['status'] IN ('warning', 'critical')) AS \"Fault Readings\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.tags['device_id'] = d.device_id\nWHERE d.next_maintenance_due < TIMESTAMP '2025-09-01'\nGROUP BY d.device_id, d.device_type, d.plant_id, d.responsible_technician, d.next_maintenance_due\nORDER BY \"Fault Readings\" DESC\nLIMIT 15;", "sql": { "columns": [ { @@ -515,7 +515,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n i.device_id AS \"Device ID\",\n d.manufacturer AS \"Manufacturer\",\n d.device_type AS \"Device Type\",\n m.maintenance_type AS \"Maintenance Type\",\n m.completed_date AS \"Completed Date\",\n m.cost_eur AS \"Cost €\",\n COUNT(*) AS \"Fault Readings After Service\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.device_id = d.device_id\nJOIN rtia.maintenance_log m ON i.device_id = m.device_id\nWHERE i.status IN ('warning', 'critical')\n AND m.status = 'completed'\n AND i.\"timestamp\" > m.completed_date::TIMESTAMP\nGROUP BY i.device_id, d.manufacturer, d.device_type,\n m.maintenance_type, m.completed_date, m.cost_eur\nORDER BY \"Fault Readings After Service\" DESC\nLIMIT 10;", + "rawSql": "SELECT\n i.tags['device_id'] AS \"Device ID\",\n d.manufacturer AS \"Manufacturer\",\n d.device_type AS \"Device Type\",\n m.maintenance_type AS \"Maintenance Type\",\n m.completed_date AS \"Completed Date\",\n m.cost_eur AS \"Cost €\",\n COUNT(*) AS \"Fault Readings After Service\"\nFROM rtia.iot_data i\nJOIN rtia.devices d ON i.tags['device_id'] = d.device_id\nJOIN rtia.maintenance_log m ON i.tags['device_id'] = m.device_id\nWHERE i.tags['status'] IN ('warning', 'critical')\n AND m.status = 'completed'\n AND i.\"timestamp\" > m.completed_date::TIMESTAMP\nGROUP BY i.tags['device_id'], d.manufacturer, d.device_type,\n m.maintenance_type, m.completed_date, m.cost_eur\nORDER BY \"Fault Readings After Service\" DESC\nLIMIT 10;", "sql": { "columns": [ { @@ -614,7 +614,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n metadata['firmware_version'] AS \"Firmware Version\",\n metadata['model'] AS Model,\n COUNT(*) AS \"Total Readings\",\n COUNT(*) FILTER (WHERE status = 'critical') AS \"Critical Count\",\n COUNT(*) FILTER (WHERE status = 'warning') AS \"Warning Count\",\n ROUND(AVG(quality_score), 1) AS \"Avg Quality\"\nFROM rtia.iot_data\nGROUP BY metadata['firmware_version'], metadata['model']\nORDER BY \"Critical Count\" DESC\nLIMIT 20;", + "rawSql": "SELECT\n tags['metadata_firmware_version'] AS \"Firmware Version\",\n tags['metadata_model'] AS Model,\n COUNT(*) AS \"Total Readings\",\n COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS \"Critical Count\",\n COUNT(*) FILTER (WHERE tags['status'] = 'warning') AS \"Warning Count\",\n ROUND(AVG(fields['quality_score']), 1) AS \"Avg Quality\"\nFROM rtia.iot_data\nGROUP BY tags['metadata_firmware_version'], tags['metadata_model']\nORDER BY \"Critical Count\" DESC\nLIMIT 20;", "sql": { "columns": [ { @@ -1271,7 +1271,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n p.plant_name AS \"Plant Name\",\n p.industry_segment AS \"Industry Segment\",\n i.device_type AS \"Device Type\",\n COUNT(*) AS \"Total Readings\",\n ROUND(\n COUNT(*) FILTER (WHERE i.status != 'offline') * 100.0\n / NULLIF(COUNT(*), 0), 1\n ) AS \"Availability %\",\n ROUND(\n COUNT(*) FILTER (WHERE i.status = 'normal') * 100.0\n / NULLIF(COUNT(*) FILTER (WHERE i.status != 'offline'), 0), 1\n ) AS \"Performance %\",\n ROUND(\n AVG(i.quality_score) FILTER (WHERE i.status != 'offline'), 1\n ) AS \"Quality Score Avg\",\n ROUND(\n (COUNT(*) FILTER (WHERE i.status != 'offline') * 1.0 / NULLIF(COUNT(*), 0))\n * (COUNT(*) FILTER (WHERE i.status = 'normal') * 1.0 / NULLIF(COUNT(*) FILTER (WHERE i.status != 'offline'), 0))\n * (AVG(i.quality_score) FILTER (WHERE i.status != 'offline') / 100.0)\n * 100, 1\n ) AS \"Oee Approx %\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.plant_id = p.plant_id\nWHERE $__timeFilter(i.\"timestamp\") \nGROUP BY i.plant_id, p.plant_name, p.industry_segment, i.device_type\nORDER BY \"Oee Approx %\" ASC;", + "rawSql": "SELECT\n p.plant_name AS \"Plant Name\",\n p.industry_segment AS \"Industry Segment\",\n i.tags['device_type'] AS \"Device Type\",\n COUNT(*) AS \"Total Readings\",\n ROUND(\n COUNT(*) FILTER (WHERE i.tags['status'] != 'offline') * 100.0\n / NULLIF(COUNT(*), 0), 1\n ) AS \"Availability %\",\n ROUND(\n COUNT(*) FILTER (WHERE i.tags['status'] = 'normal') * 100.0\n / NULLIF(COUNT(*) FILTER (WHERE i.tags['status'] != 'offline'), 0), 1\n ) AS \"Performance %\",\n ROUND(\n AVG(i.fields['quality_score']) FILTER (WHERE i.tags['status'] != 'offline'), 1\n ) AS \"Quality Score Avg\",\n ROUND(\n (COUNT(*) FILTER (WHERE i.tags['status'] != 'offline') * 1.0 / NULLIF(COUNT(*), 0))\n * (COUNT(*) FILTER (WHERE i.tags['status'] = 'normal') * 1.0 / NULLIF(COUNT(*) FILTER (WHERE i.tags['status'] != 'offline'), 0))\n * (AVG(i.fields['quality_score']) FILTER (WHERE i.tags['status'] != 'offline') / 100.0)\n * 100, 1\n ) AS \"Oee Approx %\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id\nWHERE $__timeFilter(i.\"timestamp\") \nGROUP BY i.tags['plant_id'], p.plant_name, p.industry_segment, i.tags['device_type']\nORDER BY \"Oee Approx %\" ASC;", "sql": { "columns": [ { @@ -1370,7 +1370,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n initcap(replace(device_type, '_', ' ')) AS device_type,\n COUNT(*) AS total\nFROM rtia.iot_data\nWHERE status = 'warning'\nAND $__timeFilter(\"timestamp\") \nGROUP BY device_type\nORDER BY device_type DESC;", + "rawSql": "SELECT\n initcap(replace(tags['device_type'], '_', ' ')) AS device_type,\n COUNT(*) AS total\nFROM rtia.iot_data\nWHERE tags['status'] = 'warning'\nAND $__timeFilter(\"timestamp\") \nGROUP BY tags['device_type']\nORDER BY device_type DESC;", "sql": { "columns": [ { @@ -1705,7 +1705,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_id As \"Device ID\",\n initcap(replace(device_type, '_',' ')) As \"Device Type\",\n initcap(replace(plant_id, 'PLANT_','')) AS \"Plant ID\",\n ROUND(avg_recent, 2) AS \"Avg - Latest 6 hrs\",\n ROUND(avg_prior, 2) AS \"Avg - Prior 6 hrs\",\n ROUND(avg_recent - avg_prior, 2) AS \"Abolute Delta\",\n ROUND((avg_recent - avg_prior) / NULLIF(avg_prior, 0) * 100, 1) AS \"% Change\"\n FROM (\n SELECT\n device_id,\n device_type,\n plant_id,\n AVG(CASE WHEN \"timestamp\" >= TIMESTAMP '2025-09-07 06:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 12:00:00'\n THEN metric_value END) AS avg_recent,\n AVG(CASE WHEN \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 06:00:00'\n THEN metric_value END) AS avg_prior\n FROM rtia.iot_data\n WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 12:00:00'\n GROUP BY device_id, device_type, plant_id\n ) windows\n WHERE avg_prior IS NOT NULL\n AND (avg_recent - avg_prior) / NULLIF(avg_prior, 0) > 0.10\n ORDER BY \"% Change\" DESC\n LIMIT 20\n\n", + "rawSql": "SELECT\n device_id As \"Device ID\",\n initcap(replace(device_type, '_',' ')) As \"Device Type\",\n initcap(replace(plant_id, 'PLANT_','')) AS \"Plant ID\",\n ROUND(avg_recent, 2) AS \"Avg - Latest 6 hrs\",\n ROUND(avg_prior, 2) AS \"Avg - Prior 6 hrs\",\n ROUND(avg_recent - avg_prior, 2) AS \"Abolute Delta\",\n ROUND((avg_recent - avg_prior) / NULLIF(avg_prior, 0) * 100, 1) AS \"% Change\"\n FROM (\n SELECT\n tags['device_id'] AS device_id,\n tags['device_type'] AS device_type,\n tags['plant_id'] AS plant_id,\n AVG(CASE WHEN \"timestamp\" >= TIMESTAMP '2025-09-07 06:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 12:00:00'\n THEN fields['metric_value'] END) AS avg_recent,\n AVG(CASE WHEN \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 06:00:00'\n THEN fields['metric_value'] END) AS avg_prior\n FROM rtia.iot_data\n WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-07 12:00:00'\n GROUP BY tags['device_id'], tags['device_type'], tags['plant_id']\n ) windows\n WHERE avg_prior IS NOT NULL\n AND (avg_recent - avg_prior) / NULLIF(avg_prior, 0) > 0.10\n ORDER BY \"% Change\" DESC\n LIMIT 20\n\n", "sql": { "columns": [ { @@ -1810,7 +1810,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_id As \"Device ID\",\n initcap(replace(device_type, '_',' ')) As \"Device Type\",\n initcap(replace(plant_id, 'PLANT_','')) AS \"Plant ID\",\n COUNT(*) AS \"Threshold Breaches\",\n ROUND(MAX(metric_value), 2) AS \"Peak Value\",\n ROUND(MIN(quality_score), 1) AS \"Lowest Quality\",\n MIN(\"timestamp\") AS \"First Breach At\"\nFROM rtia.iot_data\nWHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-08 00:00:00'\n AND status IN ('warning', 'critical')\nGROUP BY device_id, device_type, plant_id\nHAVING COUNT(*) >= 10\nORDER BY \"Threshold Breaches\" DESC;", + "rawSql": "SELECT\n tags['device_id'] As \"Device ID\",\n initcap(replace(tags['device_type'], '_',' ')) As \"Device Type\",\n initcap(replace(tags['plant_id'], 'PLANT_','')) AS \"Plant ID\",\n COUNT(*) AS \"Threshold Breaches\",\n ROUND(MAX(fields['metric_value']), 2) AS \"Peak Value\",\n ROUND(MIN(fields['quality_score']), 1) AS \"Lowest Quality\",\n MIN(\"timestamp\") AS \"First Breach At\"\nFROM rtia.iot_data\nWHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n AND \"timestamp\" < TIMESTAMP '2025-09-08 00:00:00'\n AND tags['status'] IN ('warning', 'critical')\nGROUP BY tags['device_id'], tags['device_type'], tags['plant_id']\nHAVING COUNT(*) >= 10\nORDER BY \"Threshold Breaches\" DESC;", "sql": { "columns": [ { @@ -1909,7 +1909,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_id As \"Device ID\",\n initcap(replace(device_type, '_',' ')) As \"Device Type\",\n initcap(replace(plant_id, 'PLANT_','')) AS \"Plant ID\",\n ROUND(AVG(quality_score) FILTER (\n WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00'\n ), 1) AS \"Baseline Quality\",\n ROUND(AVG(quality_score) FILTER (\n WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n ), 1) AS \"Recent Quality\",\n ROUND(\n (\n AVG(quality_score) FILTER (WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00')\n - AVG(quality_score) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00')\n )\n / NULLIF(\n AVG(quality_score) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00'),\n 0\n ) * 100,\n 1\n ) AS \"Quality Change Pct\"\nFROM rtia.iot_data\nGROUP BY device_id, device_type, plant_id\nHAVING AVG(quality_score) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL\n AND AVG(quality_score) FILTER (WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL\nORDER BY \"Quality Change Pct\" ASC\nLIMIT 20;", + "rawSql": "SELECT\n tags['device_id'] As \"Device ID\",\n initcap(replace(tags['device_type'], '_',' ')) As \"Device Type\",\n initcap(replace(tags['plant_id'], 'PLANT_','')) AS \"Plant ID\",\n ROUND(AVG(fields['quality_score']) FILTER (\n WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00'\n ), 1) AS \"Baseline Quality\",\n ROUND(AVG(fields['quality_score']) FILTER (\n WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00'\n ), 1) AS \"Recent Quality\",\n ROUND(\n (\n AVG(fields['quality_score']) FILTER (WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00')\n - AVG(fields['quality_score']) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00')\n )\n / NULLIF(\n AVG(fields['quality_score']) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00'),\n 0\n ) * 100,\n 1\n ) AS \"Quality Change Pct\"\nFROM rtia.iot_data\nGROUP BY tags['device_id'], tags['device_type'], tags['plant_id']\nHAVING AVG(fields['quality_score']) FILTER (WHERE \"timestamp\" < TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL\n AND AVG(fields['quality_score']) FILTER (WHERE \"timestamp\" >= TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL\nORDER BY \"Quality Change Pct\" ASC\nLIMIT 20;", "sql": { "columns": [ { @@ -2008,7 +2008,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n initcap(replace(device_type, '_', ' ')) AS device_type,\n COUNT(*) AS total\nFROM rtia.iot_data\nWHERE status = 'critical'\nAND $__timeFilter(\"timestamp\") \nGROUP BY device_type\nORDER BY device_type DESC;", + "rawSql": "SELECT\n initcap(replace(tags['device_type'], '_', ' ')) AS device_type,\n COUNT(*) AS total\nFROM rtia.iot_data\nWHERE tags['status'] = 'critical'\nAND $__timeFilter(\"timestamp\") \nGROUP BY tags['device_type']\nORDER BY device_type DESC;", "sql": { "columns": [ { @@ -2852,7 +2852,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_id AS \"Device ID\",\n device_type AS \"Device Type\",\n initcap(replace(plant_id,'PLANT_','')) AS \"Plant ID\",\n metric_unit AS \"Metric Unit\",\n metric_value AS \"Metric Value\",\n \"timestamp\",\n ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) / 1000, 1) AS \"KM from $location\"\nFROM rtia.iot_data\nWHERE status = 'critical'\n AND DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) < ($km *1000)\nORDER BY \"KM from $location\", timestamp desc\nLIMIT 20;", + "rawSql": "SELECT\n tags['device_id'] AS \"Device ID\",\n tags['device_type'] AS \"Device Type\",\n initcap(replace(tags['plant_id'],'PLANT_','')) AS \"Plant ID\",\n tags['metric_unit'] AS \"Metric Unit\",\n fields['metric_value'] AS \"Metric Value\",\n \"timestamp\",\n ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) / 1000, 1) AS \"KM from $location\"\nFROM rtia.iot_data\nWHERE tags['status'] = 'critical'\n AND DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) < ($km *1000)\nORDER BY \"KM from $location\", timestamp desc\nLIMIT 20;", "sql": { "columns": [ { @@ -2980,7 +2980,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_type AS \"Device Type\",\n COUNT(*) AS \"Critical Count\",\n ROUND(AVG(quality_score), 1) AS \"Avg Quality\"\nFROM rtia.iot_data\nWHERE status = 'critical'\n AND WITHIN(\n geo_location,\n (SELECT geo_area FROM rtia.locations WHERE location_name = '$area')\n )\nGROUP BY device_type\nORDER BY \"Critical Count\" DESC;", + "rawSql": "SELECT\n tags['device_type'] AS \"Device Type\",\n COUNT(*) AS \"Critical Count\",\n ROUND(AVG(fields['quality_score']), 1) AS \"Avg Quality\"\nFROM rtia.iot_data\nWHERE tags['status'] = 'critical'\n AND WITHIN(\n geo_location,\n (SELECT geo_area FROM rtia.locations WHERE location_name = '$area')\n )\nGROUP BY tags['device_type']\nORDER BY \"Critical Count\" DESC;", "sql": { "columns": [ { @@ -3079,7 +3079,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n p.city AS \"City\",\n p.federal_state AS \"Federal State\",\n p.industry_segment AS \"Industry Segment\",\n COUNT(*) AS \"Total Readings\",\n COUNT(*) FILTER (WHERE i.status = 'critical') AS \"Critical Count\",\n ROUND(AVG(i.quality_score), 1) AS \"Avg Quality\",\n ROUND(DISTANCE(p.geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Stuttgart')) / 1000, 0) AS \"KM From Stuttgart\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.plant_id = p.plant_id\nGROUP BY p.city, p.federal_state, p.industry_segment, p.geo_location\nORDER BY \"Critical Count\";", + "rawSql": "SELECT\n p.city AS \"City\",\n p.federal_state AS \"Federal State\",\n p.industry_segment AS \"Industry Segment\",\n COUNT(*) AS \"Total Readings\",\n COUNT(*) FILTER (WHERE i.tags['status'] = 'critical') AS \"Critical Count\",\n ROUND(AVG(i.fields['quality_score']), 1) AS \"Avg Quality\",\n ROUND(DISTANCE(p.geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Stuttgart')) / 1000, 0) AS \"KM From Stuttgart\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id\nGROUP BY p.city, p.federal_state, p.industry_segment, p.geo_location\nORDER BY \"Critical Count\";", "sql": { "columns": [ { @@ -3205,7 +3205,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n device_id As \"Device ID\",\n initcap(replace(device_type, '_',' ')) As \"Device Type\",\n initcap(replace(plant_id, 'PLANT_','')) AS \"Plant ID\",\n initcap(status) AS \"Status\",\n metric_value AS \"Value\",\n metric_unit AS \"Unit\",\n geo_location AS \"Location\",\n ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) / 1000, 2) AS \"KM from $location\"\nFROM rtia.iot_data\nWHERE status IN ('warning', 'critical')\nORDER BY DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location'))\nLIMIT 10;", + "rawSql": "SELECT\n tags['device_id'] As \"Device ID\",\n initcap(replace(tags['device_type'], '_',' ')) As \"Device Type\",\n initcap(replace(tags['plant_id'], 'PLANT_','')) AS \"Plant ID\",\n initcap(tags['status']) AS \"Status\",\n fields['metric_value'] AS \"Value\",\n tags['metric_unit'] AS \"Unit\",\n geo_location AS \"Location\",\n ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location')) / 1000, 2) AS \"KM from $location\"\nFROM rtia.iot_data\nWHERE tags['status'] IN ('warning', 'critical')\nORDER BY DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = '$location'))\nLIMIT 10;", "sql": { "columns": [ { @@ -3393,7 +3393,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n DATE_TRUNC('hour', \"timestamp\") AS hour,\n COUNT(*) FILTER (WHERE status = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour\nORDER BY hour;", + "rawSql": "SELECT\n DATE_TRUNC('hour', \"timestamp\") AS hour,\n COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour\nORDER BY hour;", "sql": { "columns": [ { @@ -4117,7 +4117,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\ninitcap(replace(plant_id, 'PLANT_', ' ')) AS plant_id,\n ANY_VALUE(geo_location) AS location,\n COUNT(*) AS \"Critical Readings\"\n FROM rtia.iot_data\nWHERE status = 'critical'\nAND $__timeFilter(\"timestamp\") \nGROUP BY plant_id\nORDER BY \"Critical Readings\" DESC;", + "rawSql": "SELECT\ninitcap(replace(tags['plant_id'], 'PLANT_', ' ')) AS plant_id,\n ANY_VALUE(geo_location) AS location,\n COUNT(*) AS \"Critical Readings\"\n FROM rtia.iot_data\nWHERE tags['status'] = 'critical'\nAND $__timeFilter(\"timestamp\") \nGROUP BY tags['plant_id']\nORDER BY \"Critical Readings\" DESC;", "sql": { "columns": [ { @@ -4245,7 +4245,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n latitude(geo_location) as latitude,\n longitude(geo_location) as longitude,\n ROUND(AVG(quality_score), 1) AS avg_quality\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY plant_id,latitude(geo_location) ,\n longitude(geo_location) ", + "rawSql": "SELECT\n latitude(geo_location) as latitude,\n longitude(geo_location) as longitude,\n ROUND(AVG(fields['quality_score']), 1) AS avg_quality\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY tags['plant_id'],latitude(geo_location) ,\n longitude(geo_location) ", "sql": { "columns": [ { @@ -4495,7 +4495,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n DATE_TRUNC('hour', \"timestamp\") AS hour,\n COUNT(*) FILTER (WHERE status = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour\nORDER BY hour;", + "rawSql": "SELECT\n DATE_TRUNC('hour', \"timestamp\") AS hour,\n COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour\nORDER BY hour;", "sql": { "columns": [ { @@ -4597,7 +4597,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n initcap(replace(p.industry_segment, '_', ' '))||' ('||ROUND(AVG(i.quality_score), 2)||'%)' AS industry_segment,\n initcap(replace(p.plant_name, '_', ' ')) AS plant_name,\n p.employee_count AS \"Employee Count\",\n COUNT(*) FILTER (WHERE i.status = 'critical') AS \"Critical Events\",\n COUNT(*) FILTER (WHERE i.status = 'warning') AS \"Warning Events\",\n ROUND(AVG(i.quality_score), 2) AS \"Avg Quality\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.plant_id = p.plant_id\nWHERE $__timeFilter(i.\"timestamp\") \nGROUP BY p.industry_segment, p.plant_name, p.employee_count\nORDER BY \"Avg Quality\" DESC;", + "rawSql": "SELECT\n initcap(replace(p.industry_segment, '_', ' '))||' ('||ROUND(AVG(i.fields['quality_score']), 2)||'%)' AS industry_segment,\n initcap(replace(p.plant_name, '_', ' ')) AS plant_name,\n p.employee_count AS \"Employee Count\",\n COUNT(*) FILTER (WHERE i.tags['status'] = 'critical') AS \"Critical Events\",\n COUNT(*) FILTER (WHERE i.tags['status'] = 'warning') AS \"Warning Events\",\n ROUND(AVG(i.fields['quality_score']), 2) AS \"Avg Quality\"\nFROM rtia.iot_data i\nJOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id\nWHERE $__timeFilter(i.\"timestamp\") \nGROUP BY p.industry_segment, p.plant_name, p.employee_count\nORDER BY \"Avg Quality\" DESC;", "sql": { "columns": [ { @@ -4757,7 +4757,7 @@ "editorMode": "code", "format": "table", "rawQuery": true, - "rawSql": "SELECT\n EXTRACT(HOUR FROM \"timestamp\") AS hour_of_day,\n COUNT(*) FILTER (WHERE status = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour_of_day\nORDER BY hour_of_day;", + "rawSql": "SELECT\n EXTRACT(HOUR FROM \"timestamp\") AS hour_of_day,\n COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS \"Critical Error Count\"\nFROM rtia.iot_data\nWHERE $__timeFilter(\"timestamp\") \nGROUP BY hour_of_day\nORDER BY hour_of_day;", "sql": { "columns": [ { diff --git a/sql/rtia_advanced_queries.sql b/sql/rtia_advanced_queries.sql index 93a34f8..7e8a85b 100644 --- a/sql/rtia_advanced_queries.sql +++ b/sql/rtia_advanced_queries.sql @@ -21,6 +21,11 @@ -- CrateDB Industrial IoT — Advanced Queries -- Full-text search and geospatial capabilities -- Tables: iot_data · plants · devices · maintenance_log +-- +-- iot_data is stored in the Telegraf line-protocol shape: string dimensions in +-- the `tags` OBJECT (tags['device_id'], tags['status'], …) and numeric +-- measurements in `fields` (fields['metric_value'], fields['quality_score']). +-- geo_location is a GEO_POINT GENERATED from fields['geo_lon'] / fields['geo_lat']. -- ───────────────────────────────────────────────────────────────────────────── @@ -45,19 +50,19 @@ SELECT ROUND((avg_recent - avg_prior) / NULLIF(avg_prior, 0) * 100, 1) AS pct_change FROM ( SELECT - device_id, - device_type, - plant_id, + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, AVG(CASE WHEN "timestamp" >= TIMESTAMP '2025-09-07 06:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 12:00:00' - THEN metric_value END) AS avg_recent, + THEN fields['metric_value'] END) AS avg_recent, AVG(CASE WHEN "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 06:00:00' - THEN metric_value END) AS avg_prior + THEN fields['metric_value'] END) AS avg_prior FROM rtia.iot_data WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 12:00:00' - GROUP BY device_id, device_type, plant_id + GROUP BY tags['device_id'], tags['device_type'], tags['plant_id'] ) windows WHERE avg_prior IS NOT NULL AND (avg_recent - avg_prior) / NULLIF(avg_prior, 0) > 0.10 @@ -69,21 +74,21 @@ SELECT WITH windows AS ( SELECT - device_id, - device_type, - plant_id, - AVG(metric_value) FILTER ( + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, + AVG(fields['metric_value']) FILTER ( WHERE "timestamp" >= TIMESTAMP '2025-09-07 06:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 12:00:00' ) AS avg_recent, - AVG(metric_value) FILTER ( + AVG(fields['metric_value']) FILTER ( WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 06:00:00' ) AS avg_prior FROM rtia.iot_data WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' AND "timestamp" < TIMESTAMP '2025-09-07 12:00:00' - GROUP BY device_id, device_type, plant_id + GROUP BY tags['device_id'], tags['device_type'], tags['plant_id'] ) SELECT device_id, @@ -105,18 +110,18 @@ LIMIT 20; -- day — distinguishing sustained degradation from transient noise spikes. SELECT - device_id, - device_type, - plant_id, - COUNT(*) AS threshold_exceedances, - ROUND(MAX(metric_value), 2) AS peak_value, - ROUND(MIN(quality_score), 1) AS lowest_quality, - MIN("timestamp") AS first_breach_at + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, + COUNT(*) AS threshold_exceedances, + ROUND(MAX(fields['metric_value']), 2) AS peak_value, + ROUND(MIN(fields['quality_score']), 1) AS lowest_quality, + MIN("timestamp") AS first_breach_at FROM rtia.iot_data WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' AND "timestamp" < TIMESTAMP '2025-09-08 00:00:00' - AND status IN ('warning', 'critical') -GROUP BY device_id, device_type, plant_id + AND tags['status'] IN ('warning', 'critical') +GROUP BY tags['device_id'], tags['device_type'], tags['plant_id'] HAVING COUNT(*) >= 10 ORDER BY threshold_exceedances DESC; @@ -127,30 +132,30 @@ ORDER BY threshold_exceedances DESC; -- historical baseline. SELECT - device_id, - device_type, - plant_id, - ROUND(AVG(quality_score) FILTER ( + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, + ROUND(AVG(fields['quality_score']) FILTER ( WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00' ), 1) AS baseline_quality, - ROUND(AVG(quality_score) FILTER ( + ROUND(AVG(fields['quality_score']) FILTER ( WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00' ), 1) AS recent_quality, ROUND( ( - AVG(quality_score) FILTER (WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00') - - AVG(quality_score) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00') + AVG(fields['quality_score']) FILTER (WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00') + - AVG(fields['quality_score']) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00') ) / NULLIF( - AVG(quality_score) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00'), + AVG(fields['quality_score']) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00'), 0 ) * 100, 1 ) AS quality_change_pct FROM rtia.iot_data -GROUP BY device_id, device_type, plant_id -HAVING AVG(quality_score) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL - AND AVG(quality_score) FILTER (WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL +GROUP BY tags['device_id'], tags['device_type'], tags['plant_id'] +HAVING AVG(fields['quality_score']) FILTER (WHERE "timestamp" < TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL + AND AVG(fields['quality_score']) FILTER (WHERE "timestamp" >= TIMESTAMP '2025-09-07 00:00:00') IS NOT NULL ORDER BY quality_change_pct ASC LIMIT 20; @@ -226,7 +231,8 @@ LIMIT 10; -- ───────────────────────────────────────────────────────────────────────────── -- GEOSPATIAL --- geo_location is a GEO_POINT stored as [longitude, latitude]. +-- geo_location is a GEO_POINT GENERATED from fields['geo_lon'] / fields['geo_lat'] +-- and stored as [longitude, latitude]. -- DISTANCE() returns meters. WITHIN() tests point-in-polygon. -- ───────────────────────────────────────────────────────────────────────────── @@ -246,15 +252,15 @@ ORDER BY km_from_stuttgart; -- All critical sensor events within 100 km of Frankfurt SELECT - device_id, - device_type, - plant_id, - metric_value, - metric_unit, + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, + fields['metric_value'] AS metric_value, + tags['metric_unit'] AS metric_unit, "timestamp", ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Frankfurt')) / 1000, 1) AS km_from_frankfurt FROM rtia.iot_data -WHERE status = 'critical' +WHERE tags['status'] = 'critical' AND DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Frankfurt')) < 150000 ORDER BY km_from_frankfurt LIMIT 20; @@ -264,16 +270,16 @@ LIMIT 20; -- Critical alerts inside the Bavaria bounding box SELECT - device_type, + tags['device_type'] AS device_type, COUNT(*) AS critical_count, - ROUND(AVG(quality_score), 1) AS avg_quality + ROUND(AVG(fields['quality_score']), 1) AS avg_quality FROM rtia.iot_data -WHERE status = 'critical' +WHERE tags['status'] = 'critical' AND WITHIN( geo_location, (SELECT geo_area FROM rtia.locations WHERE location_name = 'Bavaria') ) -GROUP BY device_type +GROUP BY tags['device_type'] ORDER BY critical_count DESC; @@ -285,11 +291,11 @@ SELECT p.federal_state, p.industry_segment, COUNT(*) AS total_readings, - COUNT(*) FILTER (WHERE i.status = 'critical') AS critical_count, - ROUND(AVG(i.quality_score), 1) AS avg_quality, + COUNT(*) FILTER (WHERE i.tags['status'] = 'critical') AS critical_count, + ROUND(AVG(i.fields['quality_score']), 1) AS avg_quality, ROUND(DISTANCE(p.geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Stuttgart')) / 1000, 0) AS km_from_stuttgart FROM rtia.iot_data i -JOIN rtia.plants p ON i.plant_id = p.plant_id +JOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id GROUP BY p.city, p.federal_state, p.industry_segment, p.geo_location ORDER BY critical_count DESC; @@ -298,16 +304,16 @@ ORDER BY critical_count DESC; -- Find the closest device currently in warning or critical state to Munich SELECT - device_id, - device_type, - plant_id, - status, - metric_value, - metric_unit, + tags['device_id'] AS device_id, + tags['device_type'] AS device_type, + tags['plant_id'] AS plant_id, + tags['status'] AS status, + fields['metric_value'] AS metric_value, + tags['metric_unit'] AS metric_unit, geo_location, ROUND(DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Munich')) / 1000, 2) AS km_from_munich FROM rtia.iot_data -WHERE status IN ('warning', 'critical') +WHERE tags['status'] IN ('warning', 'critical') ORDER BY DISTANCE(geo_location, (SELECT geo_location FROM rtia.locations WHERE location_name = 'Munich')) LIMIT 10; diff --git a/sql/rtia_first_queries.sql b/sql/rtia_first_queries.sql index ddbaa03..9b1fbe0 100644 --- a/sql/rtia_first_queries.sql +++ b/sql/rtia_first_queries.sql @@ -21,6 +21,12 @@ -- CrateDB Industrial IoT — First Queries -- Dataset: 5 German plants · 500 devices · 500,000 sensor readings -- Tables: iot_data · plants · devices · maintenance_log +-- +-- iot_data is stored in the Telegraf line-protocol shape: the string dimensions +-- live in the `tags` OBJECT (tags['device_id'], tags['status'], tags['plant_id'], +-- the flattened tags['metadata_*'], …) and the numeric measurements in `fields` +-- (fields['metric_value'], fields['quality_score']). geo_location is a GEO_POINT +-- GENERATED from fields['geo_lon'] / fields['geo_lat']. -- ───────────────────────────────────────────────────────────────────────────── @@ -29,8 +35,8 @@ SELECT COUNT(*) AS total_readings, - COUNT(DISTINCT device_id) AS devices, - COUNT(DISTINCT plant_id) AS plants, + COUNT(DISTINCT tags['device_id']) AS devices, + COUNT(DISTINCT tags['plant_id']) AS plants, MIN("timestamp") AS earliest, MAX("timestamp") AS latest FROM rtia.iot_data; @@ -40,11 +46,11 @@ FROM rtia.iot_data; -- How healthy is the fleet right now? SELECT - status, + tags['status'] AS status, COUNT(*) AS readings, ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (), 1) AS pct FROM rtia.iot_data -GROUP BY status +GROUP BY tags['status'] ORDER BY readings DESC; @@ -52,13 +58,13 @@ ORDER BY readings DESC; -- Which sensor category generates the most alerts? SELECT - device_type, + tags['device_type'] AS device_type, COUNT(*) AS total, - COUNT(*) FILTER (WHERE status = 'warning') AS warnings, - COUNT(*) FILTER (WHERE status = 'critical') AS criticals, - ROUND(AVG(quality_score), 1) AS avg_quality + COUNT(*) FILTER (WHERE tags['status'] = 'warning') AS warnings, + COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS criticals, + ROUND(AVG(fields['quality_score']), 1) AS avg_quality FROM rtia.iot_data -GROUP BY device_type +GROUP BY tags['device_type'] ORDER BY criticals DESC; @@ -66,8 +72,8 @@ ORDER BY criticals DESC; -- Critical events per hour — spot operational patterns SELECT - DATE_TRUNC('hour', "timestamp") AS hour, - COUNT(*) FILTER (WHERE status = 'critical') AS critical_count + DATE_TRUNC('hour', "timestamp") AS hour, + COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS critical_count FROM rtia.iot_data GROUP BY hour ORDER BY hour; @@ -77,12 +83,12 @@ ORDER BY hour; -- Where on the map are faults clustering? SELECT - plant_id, - ANY_VALUE(geo_location) AS location, - COUNT(*) FILTER (WHERE status = 'critical') AS critical_readings, - ROUND(AVG(quality_score), 1) AS avg_quality + tags['plant_id'] AS plant_id, + ANY_VALUE(geo_location) AS location, + COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS critical_readings, + ROUND(AVG(fields['quality_score']), 1) AS avg_quality FROM rtia.iot_data -GROUP BY plant_id +GROUP BY tags['plant_id'] ORDER BY critical_readings DESC; @@ -93,11 +99,11 @@ SELECT p.industry_segment, p.plant_name, p.employee_count, - COUNT(*) FILTER (WHERE i.status = 'critical') AS critical_events, - COUNT(*) FILTER (WHERE i.status = 'warning') AS warning_events, - ROUND(AVG(i.quality_score), 1) AS avg_quality + COUNT(*) FILTER (WHERE i.tags['status'] = 'critical') AS critical_events, + COUNT(*) FILTER (WHERE i.tags['status'] = 'warning') AS warning_events, + ROUND(AVG(i.fields['quality_score']), 1) AS avg_quality FROM rtia.iot_data i -JOIN rtia.plants p ON i.plant_id = p.plant_id +JOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id GROUP BY p.industry_segment, p.plant_name, p.employee_count ORDER BY critical_events DESC; @@ -114,8 +120,8 @@ SELECT COUNT(*) AS critical_readings, MAX(i."timestamp") AS last_critical_at FROM rtia.iot_data i -JOIN rtia.devices d ON i.device_id = d.device_id -WHERE i.status = 'critical' +JOIN rtia.devices d ON i.tags['device_id'] = d.device_id +WHERE i.tags['status'] = 'critical' AND d.warranty_expiry < TIMESTAMP '2025-09-01' GROUP BY d.device_id, d.manufacturer, d.device_type, d.warranty_expiry, d.asset_value_eur ORDER BY critical_readings DESC @@ -131,9 +137,9 @@ SELECT d.plant_id, d.responsible_technician, d.next_maintenance_due, - COUNT(*) FILTER (WHERE i.status IN ('warning', 'critical')) AS fault_readings + COUNT(*) FILTER (WHERE i.tags['status'] IN ('warning', 'critical')) AS fault_readings FROM rtia.iot_data i -JOIN rtia.devices d ON i.device_id = d.device_id +JOIN rtia.devices d ON i.tags['device_id'] = d.device_id WHERE d.next_maintenance_due < TIMESTAMP '2025-09-01' GROUP BY d.device_id, d.device_type, d.plant_id, d.responsible_technician, d.next_maintenance_due ORDER BY fault_readings DESC @@ -144,7 +150,7 @@ LIMIT 15; -- iot_data + devices + maintenance_log → maintenance that did not hold SELECT - i.device_id, + i.tags['device_id'] AS device_id, d.manufacturer, d.device_type, m.maintenance_type, @@ -152,30 +158,32 @@ SELECT m.cost_eur, COUNT(*) AS fault_readings_after_service FROM rtia.iot_data i -JOIN rtia.devices d ON i.device_id = d.device_id -JOIN rtia.maintenance_log m ON i.device_id = m.device_id -WHERE i.status IN ('warning', 'critical') +JOIN rtia.devices d ON i.tags['device_id'] = d.device_id +JOIN rtia.maintenance_log m ON i.tags['device_id'] = m.device_id +WHERE i.tags['status'] IN ('warning', 'critical') AND m.status = 'completed' AND i."timestamp" > m.completed_date::TIMESTAMP -GROUP BY i.device_id, d.manufacturer, d.device_type, +GROUP BY i.tags['device_id'], d.manufacturer, d.device_type, m.maintenance_type, m.completed_date, m.cost_eur ORDER BY fault_readings_after_service DESC LIMIT 10; -- ── 11. OBJECT FIELD ACCESS: FAULT RATE BY FIRMWARE VERSION ───────────────── --- Demonstrates bracket notation on the metadata OBJECT column. +-- Demonstrates bracket notation on the tags OBJECT column. Telegraf flattens the +-- per-device metadata into tags['metadata_*'] (it can only carry flat strings), +-- so the firmware/model dimensions live there. -- Surfaces whether a specific firmware release correlates with higher fault rates. SELECT - metadata['firmware_version'] AS firmware_version, - metadata['model'] AS model, + tags['metadata_firmware_version'] AS firmware_version, + tags['metadata_model'] AS model, COUNT(*) AS total_readings, - COUNT(*) FILTER (WHERE status = 'critical') AS critical_count, - COUNT(*) FILTER (WHERE status = 'warning') AS warning_count, - ROUND(AVG(quality_score), 1) AS avg_quality + COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS critical_count, + COUNT(*) FILTER (WHERE tags['status'] = 'warning') AS warning_count, + ROUND(AVG(fields['quality_score']), 1) AS avg_quality FROM rtia.iot_data -GROUP BY metadata['firmware_version'], metadata['model'] +GROUP BY tags['metadata_firmware_version'], tags['metadata_model'] ORDER BY critical_count DESC LIMIT 20; @@ -186,14 +194,14 @@ LIMIT 20; SELECT DATE_BIN('15 minutes'::INTERVAL, "timestamp", TIMESTAMP '2025-09-01') AS window_start, - device_type, - COUNT(*) AS readings, - COUNT(*) FILTER (WHERE status = 'critical') AS criticals, - ROUND(AVG(metric_value), 2) AS avg_value + tags['device_type'] AS device_type, + COUNT(*) AS readings, + COUNT(*) FILTER (WHERE tags['status'] = 'critical') AS criticals, + ROUND(AVG(fields['metric_value']), 2) AS avg_value FROM rtia.iot_data WHERE "timestamp" >= TIMESTAMP '2025-09-01 06:00:00' AND "timestamp" < TIMESTAMP '2025-09-01 14:00:00' -GROUP BY window_start, device_type +GROUP BY window_start, tags['device_type'] ORDER BY window_start, device_type; @@ -225,29 +233,29 @@ ORDER BY total_cost_eur DESC; -- OEE = Availability × Performance × Quality × 100 SELECT - i.plant_id, + i.tags['plant_id'] AS plant_id, p.plant_name, p.industry_segment, - i.device_type, + i.tags['device_type'] AS device_type, COUNT(*) AS total_readings, ROUND( - COUNT(*) FILTER (WHERE i.status != 'offline') * 100.0 + COUNT(*) FILTER (WHERE i.tags['status'] != 'offline') * 100.0 / NULLIF(COUNT(*), 0), 1 ) AS availability_pct, ROUND( - COUNT(*) FILTER (WHERE i.status = 'normal') * 100.0 - / NULLIF(COUNT(*) FILTER (WHERE i.status != 'offline'), 0), 1 + COUNT(*) FILTER (WHERE i.tags['status'] = 'normal') * 100.0 + / NULLIF(COUNT(*) FILTER (WHERE i.tags['status'] != 'offline'), 0), 1 ) AS performance_pct, ROUND( - AVG(i.quality_score) FILTER (WHERE i.status != 'offline'), 1 + AVG(i.fields['quality_score']) FILTER (WHERE i.tags['status'] != 'offline'), 1 ) AS quality_score_avg, ROUND( - (COUNT(*) FILTER (WHERE i.status != 'offline') * 1.0 / NULLIF(COUNT(*), 0)) - * (COUNT(*) FILTER (WHERE i.status = 'normal') * 1.0 / NULLIF(COUNT(*) FILTER (WHERE i.status != 'offline'), 0)) - * (AVG(i.quality_score) FILTER (WHERE i.status != 'offline') / 100.0) + (COUNT(*) FILTER (WHERE i.tags['status'] != 'offline') * 1.0 / NULLIF(COUNT(*), 0)) + * (COUNT(*) FILTER (WHERE i.tags['status'] = 'normal') * 1.0 / NULLIF(COUNT(*) FILTER (WHERE i.tags['status'] != 'offline'), 0)) + * (AVG(i.fields['quality_score']) FILTER (WHERE i.tags['status'] != 'offline') / 100.0) * 100, 1 ) AS oee_approx_pct FROM rtia.iot_data i -JOIN rtia.plants p ON i.plant_id = p.plant_id -GROUP BY i.plant_id, p.plant_name, p.industry_segment, i.device_type +JOIN rtia.plants p ON i.tags['plant_id'] = p.plant_id +GROUP BY i.tags['plant_id'], p.plant_name, p.industry_segment, i.tags['device_type'] ORDER BY oee_approx_pct ASC; diff --git a/sql/rtia_schema_create.sql b/sql/rtia_schema_create.sql index 398db7c..da6fbaf 100644 --- a/sql/rtia_schema_create.sql +++ b/sql/rtia_schema_create.sql @@ -81,31 +81,28 @@ CREATE TABLE IF NOT EXISTS rtia.maintenance_log ( -- ── 4. IOT_DATA ─────────────────────────────────────────────────────────────── --- Sensor readings — 500,000 rows across 500 devices and 5 plants --- Partitioned by month; clustered by device_id within each partition. +-- Sensor readings — 500,000 rows across 500 devices and 5 plants. +-- Telegraf line-protocol shape (hash_id / timestamp / name / tags / fields), so +-- the same table can be fed either by COPY FROM or by Telegraf's outputs.cratedb +-- (postgres/crate) plugin. That plugin can't write a GEO_POINT, so the geo coords +-- ride along as plain doubles in `fields` and geo_location is GENERATED from them. +-- Partitioned by day. CREATE TABLE IF NOT EXISTS rtia.iot_data ( - device_id TEXT, - device_type TEXT, - plant_id TEXT, - line_id TEXT, - "timestamp" TIMESTAMP WITH TIME ZONE, - month TIMESTAMP WITH TIME ZONE - GENERATED ALWAYS AS DATE_TRUNC('month', "timestamp"), - metric_value DOUBLE PRECISION, - metric_unit TEXT, - status TEXT, -- 'normal' | 'warning' | 'critical' | 'offline' - quality_score DOUBLE PRECISION, - geo_location GEO_POINT, - metadata OBJECT(STRICT) AS ( - firmware_version TEXT, - model TEXT, - installation_date TEXT, - last_calibration TEXT - ) -) -CLUSTERED BY (device_id) INTO 4 SHARDS -PARTITIONED BY (month); + hash_id BIGINT, + "timestamp" TIMESTAMP WITH TIME ZONE, + name TEXT, -- measurement name (= "iot_data") + tags OBJECT(DYNAMIC), -- device_id, status, metadata_*, ... (indexed strings) + fields OBJECT(DYNAMIC) AS ( + metric_value DOUBLE PRECISION, + quality_score DOUBLE PRECISION, + geo_lon DOUBLE PRECISION, + geo_lat DOUBLE PRECISION + ), + day TIMESTAMP WITH TIME ZONE GENERATED ALWAYS AS date_trunc('day', "timestamp"), + geo_location GEO_POINT GENERATED ALWAYS AS [fields['geo_lon'], fields['geo_lat']], + PRIMARY KEY (hash_id, "timestamp", day) +) PARTITIONED BY (day); -- ───────────────────────────────────────────────────────────────────────────── From 75f590094bb03339a229d5246974f0ae682b3995 Mon Sep 17 00:00:00 2001 From: srmadscience Date: Fri, 12 Jun 2026 14:51:11 +0100 Subject: [PATCH 2/2] gitignore: exclude the 240 MB RTIA sensor dataset iot_demo_dataset.json is loaded by COPY FROM from its canonical S3 copy, so it doesn't belong in git. Co-Authored-By: Claude Opus 4.8 --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 72c7f50..e7b5bcf 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ # Per-run latency chart output (canonical screenshots live in doc/) src_weather/main/*/latency_histogram.png +m # RTIA sensor dataset — 240 MB; canonical copy lives on S3 (COPY FROM reads it) +iot_demo_dataset.json + # macOS .DS_Store