From 98738528ca832a5b7a8e88a37d96eb3fde8b308c Mon Sep 17 00:00:00 2001 From: Fotis Paraskevopoulos Date: Thu, 5 Feb 2026 15:24:32 +0200 Subject: [PATCH] InfluxDB Application Control Plane This commit adds the ability of requesting the InfluxDB crendentials based on each specific application --- modules/application/index.js | 40 ++++++++++++++++++++++++++++++++++-- modules/exn/index.js | 28 +++++++++++++++++++++++++ modules/influxdb/index.js | 29 +++++++++++--------------- 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/modules/application/index.js b/modules/application/index.js index 4c6fb43..f32f356 100644 --- a/modules/application/index.js +++ b/modules/application/index.js @@ -514,6 +514,30 @@ module.exports = { console.error("Couldn't run migration ", e) } }); + self.apos.migration.add('fix-slo-violations', async () => { + try { + await self.apos.migration.eachDoc({ + type: 'application', + }, async (doc) => { + if(doc.sloViolations) { + try{ + let creations = JSON.parse(doc.sloViolations) + if(!Array.isArray(creations)) { + creations = [creations] + } + await self.apos.doc.db.updateOne({ + _id: doc._id + }, { + $set: {sloViolations: creations}, + }); + }catch (error) {} + } + }); + } catch (e) { + console.error("Couldn't run migration ", e) + } + }); + }, handlers(self) { return { @@ -1355,7 +1379,13 @@ module.exports = { } try { - return await self.apos.modules.influxdb.getAvailableMeasurements(doc.uuid) + const credentials = await self.apos.modules.exn.getApplicationInfluxDBCredentials(doc.uuid) + + if (!credentials) { + throw self.apos.error('error', "Could not retrieve credentials"); + } + + return await self.apos.modules.influxdb.getAvailableMeasurements(credentials, doc.uuid) } catch (error) { throw self.apos.error('error', error.message); } @@ -1385,7 +1415,13 @@ module.exports = { try { const measurements = req.query.measurement || [] const interval = req.query.interval || '-30d' - return await self.apos.modules.influxdb.getTimeSeriesForMeasurements(uuid, measurements, interval) + + const credentials = await self.apos.modules.exn.getApplicationInfluxDBCredentials(doc.uuid) + if (!credentials) { + throw self.apos.error('error', "Could not retrieve credentials"); + } + + return await self.apos.modules.influxdb.getTimeSeriesForMeasurements(credentials, uuid, measurements, interval ) } catch (error) { throw self.apos.error('error', error.message); } diff --git a/modules/exn/index.js b/modules/exn/index.js index 3edfedb..a340864 100644 --- a/modules/exn/index.js +++ b/modules/exn/index.js @@ -46,6 +46,8 @@ let sender_ui_policies_model_upsert; let sender_bqa_validate_slos; +let sender_app_influxdb; + let sender_ui_application_user_info; let sender_ui_application_info; @@ -156,6 +158,7 @@ module.exports = { sender_ui_application_info = context.connection.open_sender('topic://eu.nebulouscloud.ui.app.get.reply'); sender_bqa_validate_slos = context.connection.open_sender('topic://eu.nebulouscloud.ontology.bqa'); + sender_app_influxdb = context.connection.open_sender('topic://eu.nebulouscloud.app_cluster.influxdb.get'); }); @@ -387,6 +390,31 @@ module.exports = { sender_bqa_validate_slos.send(message) }) }, + async getApplicationInfluxDBCredentials(uuid) { + return new Promise((resolve, reject) => { + + const correlation_id = uuidv4() + correlations[correlation_id] = { + 'resolve': resolve, 'reject': reject, + }; + const req = aposSelf.apos.task.getReq() + const message = { + to: sender_app_influxdb.options.target.address, + correlation_id: correlation_id, + message_annotations: {application: uuid}, + application_properties: {application: uuid} + } + const timer = setTimeout(() => { + console.warn("InfluxDB Crendetials not retrieved for application = ",uuid) + resolve({ + 'valid':true + }) + }, 7000); + + console.log("[getApplicationInfluxDBCrendetials] Send ", JSON.stringify( message)) + sender_app_influxdb.send(message) + }) + }, get_cloud_candidates() { return new Promise((resolve, reject) => { diff --git a/modules/influxdb/index.js b/modules/influxdb/index.js index 29de653..f9a5b2b 100644 --- a/modules/influxdb/index.js +++ b/modules/influxdb/index.js @@ -1,22 +1,18 @@ require('dotenv').config(); const {InfluxDB} = require('@influxdata/influxdb-client'); -const connection_options = { - 'url': process.env.INFLUX_URL, - 'token': process.env.INFLUX_TOKEN, - 'organization': process.env.INFLUX_ORGANIZATION, -} module.exports = { methods(self) { return { - getAvailableMeasurements(uuid) { + getAvailableMeasurements(connection_options, uuid) { + const influxDB = new InfluxDB({ - url: connection_options.url, - token: connection_options.token, + url: connection_options.INFLUXDB_URL, + token: connection_options.INFLUXDB_TOKEN, }); - const queryApi = influxDB.getQueryApi(connection_options.organization); + const queryApi = influxDB.getQueryApi(connection_options.INFLUXDB_ORG); return new Promise((resolve, reject) => { @@ -27,7 +23,7 @@ module.exports = { import "influxdata/influxdb/schema" schema.measurements( - bucket: "nebulous_${uuid}_bucket" + bucket: "${connection_options.INFLUXDB_BUCKET}" ) `; @@ -48,17 +44,16 @@ module.exports = { }); }) }, - getTimeSeriesForMeasurements(uuid, measurements = [], time = '-3h') { + getTimeSeriesForMeasurements(connection_options, uuid, measurements = [], time = '-3h') { return new Promise((resolve, reject) => { - - try { const influxDB = new InfluxDB({ - url: connection_options.url, - token: connection_options.token, + url: connection_options.INFLUXDB_URL, + token: connection_options.INFLUXDB_TOKEN, }); - const queryApi = influxDB.getQueryApi(connection_options.organization); + + const queryApi = influxDB.getQueryApi(connection_options.INFLUXDB_ORG); const timeSeriesData = []; @@ -71,7 +66,7 @@ module.exports = { // Query to get time series data const fluxQuery = ` - from(bucket: "nebulous_${uuid}_bucket") + from(bucket: "${connection_options.INFLUXDB_BUCKET}") |> range(start: ${time}) ${measurementFilter} |> filter(fn: (r) => r._field == "metricValue")