From 4cd8d919fff98a124252c7bbc0d09d4d006feebe Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Thu, 11 Jun 2026 15:44:43 -0400 Subject: [PATCH 1/7] WIP - completing the implementation of colocated sites. --- charts/helmfile/resources/db-setup.sql | 4 +- .../management-controller/src/api-admin.js | 39 -- .../management-controller/src/colo-sync.js | 426 +++++++++++++----- .../management-controller/src/notify.js | 27 +- .../src/resource-templates.js | 26 +- .../src/site-deployment-state.js | 2 + .../src/sync-management.js | 11 +- components/site-controller/src/ingress-v2.js | 3 + modules/src/common.js | 1 + modules/src/kube.js | 69 +-- 10 files changed, 424 insertions(+), 184 deletions(-) diff --git a/charts/helmfile/resources/db-setup.sql b/charts/helmfile/resources/db-setup.sql index 011d073..197665a 100644 --- a/charts/helmfile/resources/db-setup.sql +++ b/charts/helmfile/resources/db-setup.sql @@ -63,7 +63,7 @@ CREATE TYPE LifecycleType AS ENUM ('partial', 'new', 'skx_cr_created', 'cm_cert_ -- ready-automatic The site is ready to be deployed by the automatic process -- deployed The site is deployed and has checked in with the management plane -- -CREATE TYPE DeploymentStateType AS ENUM ('not-ready', 'ready-bootstrap', 'ready-bootfinish', 'ready-automatic', 'deployed'); +CREATE TYPE DeploymentStateType AS ENUM ('not-ready', 'ready-bootstrap', 'ready-bootfinish', 'ready-automatic', 'colo-automatic', 'deployed'); -- -- ApplicationNetworkType @@ -192,7 +192,7 @@ CREATE TABLE InteriorSites ( Backbone UUID REFERENCES Backbones, CoLocated boolean DEFAULT false, - Owner UUID REFERENCES Users, + Owner UUID REFERENCES Users, -- TODO - Remove these. It doesn't make sense for a site to have a different owner than the backbone OwnerGroup text ); diff --git a/components/management-controller/src/api-admin.js b/components/management-controller/src/api-admin.js index b3c58a9..856d4a4 100644 --- a/components/management-controller/src/api-admin.js +++ b/components/management-controller/src/api-admin.js @@ -25,7 +25,6 @@ import { SiteIngressChanged, LinkChanged, SiteDeleted } from './sync-management. import { Log } from '@skupperx/modules/log' import { ManageIngressAdded, LinkAddedOrDeleted, ManageIngressDeleted } from './site-deployment-state.js'; import { ValidateAndNormalizeFields, IsValidUuid, UniquifyName } from '@skupperx/modules/util'; -import { processColoBackbones } from './colo-sync.js'; import { NotifyTransaction } from './notify.js'; const API_PREFIX = '/api/v1alpha1/'; @@ -54,21 +53,9 @@ const createBackbone = async function(req, res) { ); backboneId = result.rows[0].id; notify.add('Backbones', backboneId); - if (!!norm.coLocatedNamespace) { - const site_result = await client.query(`INSERT INTO InteriorSites(Name, TargetPlatform, CoLocated, Backbone, Owner, OwnerGroup) ` + - `VALUES ('co-located', 'sk2', true, $1, $2, $3) RETURNING Id`, - [backboneId, userInfo.userId, norm.ownerGroup]); - siteId = site_result.rows[0].id; - notify.add('InteriorSites', siteId); - const ap_result = await client.query(`INSERT INTO BackboneAccessPoints(Name, Kind, InteriorSite, Owner, OwnerGroup, AccessType) ` + - `VALUES ('manage', 'manage', $1, $2, $3, 'local') RETURNING Id`, - [siteId, userInfo.userId, norm.ownerGroup]); - notify.add('BackboneAccessPoints', ap_result.rows[0].id); - } }); await notify.commit(); returnStatus = 201; - await processColoBackbones(); res.status(returnStatus).json({id: backboneId}); } catch (error) { returnStatus = 500; @@ -485,35 +472,10 @@ const deleteBackbone = async function(req, res) { throw new Error('Cannot delete a backbone with active application networks'); } const siteResult = await client.query("SELECT Id, Certificate, CoLocated FROM InteriorSites WHERE Backbone = $1", [bid]); - let coLocatedOnly = false; - let siteId = null; - let siteCertificate = null; if (siteResult.rowCount > 0) { if (siteResult.rowCount > 1 || !siteResult.rows[0].colocated) { throw new Error('Cannot delete a backbone with interior sites'); } - coLocatedOnly = true; - siteId = siteResult.rows[0].id; - siteCertificate = siteResult.rows[0].certificate; - } - if (coLocatedOnly) { - const apResult = await client.query("SELECT Id, Certificate FROM BackboneAccessPoints WHERE InteriorSite = $1", [siteId]); - for (const row of apResult.rows) { - if (row.certificate) { - await client.query("UPDATE BackboneAccessPoints SET Certificate = NULL WHERE Id = $1", [row.id]); - await client.query("DELETE FROM TlsCertificates WHERE Id = $1", [row.certificate]); - // not needed: notify.update('BackboneAccessPoints', row.id); - notify.delete('TlsCertificates', row.certificate); - } - await client.query("DELETE FROM BackboneAccessPoints WHERE Id = $1", [row.id]); - notify.delete('BackboneAccessPoints', row.id); - } - await client.query("DELETE FROM InteriorSites WHERE Id = $1", [siteId]); - notify.delete('InteriorSites', siteId); - if (siteCertificate) { - await client.query("DELETE FROM TlsCertificates WHERE Id = $1", [siteCertificate]) - notify.delete('TlsCertificates', siteCertificate); - } } const bbResult = await client.query("DELETE FROM Backbones WHERE Id = $1 RETURNING Certificate", [bid]); notify.delete('Backbones', bid); @@ -527,7 +489,6 @@ const deleteBackbone = async function(req, res) { }); res.status(returnStatus).end(); await notify.commit(); - await processColoBackbones(); } catch (error) { returnStatus = 400; res.status(returnStatus).send(error.message); diff --git a/components/management-controller/src/colo-sync.js b/components/management-controller/src/colo-sync.js index c8e1885..093a103 100644 --- a/components/management-controller/src/colo-sync.js +++ b/components/management-controller/src/colo-sync.js @@ -26,140 +26,364 @@ import * as kube from "@skupperx/modules/kube" import { Log } from "@skupperx/modules/log" import { ClientFromPool } from "./db.js" -import { META_ANNOTATION_SKUPPERX_CONTROLLED } from "@skupperx/modules/common" import * as resourceTemplates from "./resource-templates.js" import * as sync from "./sync-management.js" import * as common from "@skupperx/modules/common" +import { NotifyTransaction, RegisterNotification } from "./notify.js" + +const coloNamespaces = {}; // {namespace-name: {backbone, site, accesspoint}} +const backbonesWithNoNamespace = []; +const siteIndex = {}; // {siteId: namespace-name} +const apIndex = {}; // {apId: namespace-name} /** * Start the colo sync module * @returns {Promise} */ export async function Start() { - Log("[Colo-Sync Module Started]") - // sync k8s state with database state on startup and every 60 seconds thereafter (additionally on backbone creation and deletion) - await processColoBackbones() + Log("[Colo-Sync Module Started]"); + + // + // Pre-load the local list of colocated site namespaces. + // + const nsList = await kube.GetNamespaces().then(namespaces => namespaces.map(ns => ({name: ns.metadata.name, annotations: ns.metadata.annotations || {}}))); + for (const ns of nsList) { + if (ns.annotations[common.META_ANNOTATION_SKUPPERX_CONTROLLED]) { + coloNamespaces[ns.name] = { + backbone : null, + site : null, + accesspoint : null, + }; + } + } + + // + // Register the data-change notification handlers, requesting an initial sweep of all backbones for reconciliation. + // + await RegisterNotification('Backbones', onBackboneChange, true); + await RegisterNotification('InteriorSites', onSiteChange, false); + await RegisterNotification('BackboneAccessPoints', onAccessPointChange, false); + + setTimeout(visitIncompleteSites, 5000); } -/** - * Process colo backbones and reconcile namespaces - * @returns {Promise} - */ -export async function processColoBackbones() { - const client = await ClientFromPool('system') - try { - // get all backbones with colo namespaces - const coloBackbones = await client.query(`SELECT Id, CoLocatedNamespace FROM Backbones WHERE CoLocatedNamespace IS NOT NULL`).then(res => res.rows) - // sync k8s state with database state - if (coloBackbones.length > 0) { - await reconcileNamespaces(coloBackbones) +async function visitIncompleteSites() { + for (const [ns, data] of Object.entries(coloNamespaces)) { + if (data?.site?.deploymentstate != 'deployed') { + await visitNamespace(ns); } - } catch (err) { - Log(`[Colo-Sync] Error in colo backbone processing: ${err.stack || err}`) - } finally { - client.release() - setTimeout(processColoBackbones, 60000) } + + setTimeout(visitIncompleteSites, 5000); } +async function onSiteChange(action, tableName, sid) { + console.log(`onSiteChange: ${action}, ${sid}`); + const ns = siteIndex[sid]; + if (ns) { + if (action === 'UPDATE') { + const client = await ClientFromPool('system'); + try { + const result = await client.query("SELECT * FROM InteriorSites WHERE Id = $1", [sid]); + if (result.rowCount == 1) { + coloNamespaces[ns].site = result.rows[0]; + await visitNamespace(ns); + } + } catch (error) { + throw error; + } finally { + client.release(); + } + } else if (action === 'DELETE') { + coloNamespaces[ns].site = null; + await visitNamespace(ns); + } + } +} -/** - * Reconcile Kubernetes namespaces for the colo backbones - * @param {Array} coloBackbones - The colo backbones with their colo namespaces - * @returns {Promise} - */ -async function reconcileNamespaces(coloBackbones) { - const existingNamespaces = await kube.GetNamespaces().then(namespace => namespace.items.map(ns => ({name: ns.metadata.name, annotations: ns.metadata.annotations}))) - const coloNamespaces = new Set(coloBackbones.map(bb => bb.colocatednamespace)) - // create colocated namespaces if they don't exist on the cluster - for (const bb of coloBackbones) { - if (!existingNamespaces.some(existingNs => existingNs.name === bb.colocatednamespace)) { - await deploySite(bb.id, bb.colocatednamespace) +async function onAccessPointChange(action, tableName, apid) { + console.log(`onAccessPointChange: ${action}, ${apid}`); + const ns = apIndex[apid]; + if (ns) { + if (action === 'UPDATE') { + const client = await ClientFromPool('system'); + try { + const result = await client.query("SELECT * FROM BackboneAccessPoints WHERE Id = $1", [apid]); + console.log(`....loaded updated access points, rowCount: ${result.rowCount}`); + if (result.rowCount == 1) { + coloNamespaces[ns].accesspoint = result.rows[0]; + await visitNamespace(ns); + } + } catch (error) { + throw error; + } finally { + client.release(); + } + } else if (action === 'DELETE') { + coloNamespaces[ns].accesspoint = null; + await visitNamespace(ns); } } - - const vmsManagedNamespaces = existingNamespaces.filter(ns => ns.annotations?.[META_ANNOTATION_SKUPPERX_CONTROLLED] == "true").map(ns => ns.name) - // delete vms managed colocated namespaces if they are not in the database - for (const ns of vmsManagedNamespaces) { - if (!coloNamespaces.has(ns)) { - Log(`[Colo-Sync] deleting namespace ${ns}`) - await kube.deleteNamespace(ns) +} + +async function onBackboneChange(action, tableName, id, backbone) { + switch (action) { + case 'EXISTS': + const ns = backbone.colocatednamespace; + if (ns) { + if (coloNamespaces[ns]) { + coloNamespaces[ns].backbone = backbone; + } else { + backbonesWithNoNamespace.push(backbone); + } + } + break; + case 'EXISTS_COMPLETE': + await doInitialReconcile(); + break; + case 'ADD': { + const client = await ClientFromPool('system'); + try { + const backbone = await client.query("SELECT * FROM Backbones WHERE Id = $1", [id]).then(result => result.rows[0]); + await addColoNamespace(backbone); + } catch (error) { + Log('Exception in onBackbonesChange(ADD)'); + throw error; + } finally { + client.release(); + } + break; } + case 'DELETE': + await handleDeletedBackbone(id); + break; + case 'UPDATE': + // Ignore updates + break; } } -/** - * If the colocated site is ready, create the colo namespace and deploy the site in it - * @param {string} backboneId - The backbone id - * @param {string} ns - The namespace to deploy the site in - * @returns {Promise} - */ -async function deploySite(backboneId, ns) { - const client = await ClientFromPool('system') - try { - const siteId = await client.query(`SELECT Id FROM InteriorSites WHERE Backbone = $1 AND CoLocated = true AND Lifecycle = 'ready'`, [backboneId]).then(res => res.rows[0]?.id) - if (siteId) { - Log(`[Colo-Sync] deploying namespace ${ns}`) - await kube.createNamespace(ns) - - const siteYamlObjects = await fetchSiteYaml(siteId); - - Log(`[Colo-Sync] deploying site in namespace ${ns}`) - for (const obj of siteYamlObjects) { - await kube.ApplyObject(obj, ns) +async function doInitialReconcile() { + for (const backbone of backbonesWithNoNamespace) { + await addColoNamespace(backbone); + } + backbonesWithNoNamespace.length = 0; + + for (const [ns, data] of Object.entries(coloNamespaces)) { + if (!data.backbone) { + await kube.deleteNamespace(ns); + } else { + const client = await ClientFromPool('system'); + try { + const siteResult = await client.query( + "SELECT * FROM InteriorSites WHERE CoLocated = true AND Backbone = $1", + [data.backbone.id] + ); + if (siteResult.rowCount == 1) { + coloNamespaces[ns].site = siteResult.rows[0]; + siteIndex[siteResult.rows[0].id] = ns; + const apResult = await client.query( + "SELECT * FROM BackboneAccessPoints WHERE InteriorSite = $1 AND Kind = 'manage'", + [siteResult.rows[0].id] + ); + if (apResult.rowCount == 1) { + coloNamespaces[ns].accesspoint = apResult.rows[0]; + apIndex[apResult.rows[0].id] = ns; + } + } + await visitNamespace(ns); + } catch (error) { + Log(`Exception in doInitialReconcile: ${error.stack}`); + } finally { + client.release(); } } - } catch (err) { - Log(`[Colo-Sync] Error in deploying site in namespace ${ns}: ${err.stack || err}`) - } finally { - client.release() } } -/** - * Fetch the site yaml objects for the colocatedsite - * @param {string} siteId - The site id - * @returns {Promise>} - The site yaml objects - */ -async function fetchSiteYaml(siteId) { - const client = await ClientFromPool('system') +async function addColoNamespace(backbone) { + await kube.createNamespace(backbone.colocatednamespace); + coloNamespaces[backbone.colocatednamespace] = { + backbone : backbone, + site : null, + accesspoint : null, + }; + console.log(`Created colocated namespace: ${backbone.colocatednamespace}`); + await visitNamespace(backbone.colocatednamespace); +} + +async function handleDeletedBackbone(bbid) { + for (const [ns, data] of Object.entries(coloNamespaces)) { + if (data.backbone?.id === bbid) { + const client = await ClientFromPool('system'); + const notify = new NotifyTransaction(); + try { + await client.query("BEGIN"); + if (data.accesspoint) { + await client.query("DELETE FROM BackboneAccessPoints WHERE Id = $1", [data.accesspoint.id]); + notify.delete('BackboneAccessPoints', data.accesspoint.id); + delete apIndex[data.accesspoint.id]; + } + if (data.site) { + await client.query("DELETE FROM InteriorSites WHERE Id = $1", [data.site.id]); + notify.delete('InteriorSites', data.site.id); + delete siteIndex[data.site.id]; + } + await kube.deleteNamespace(ns); + delete coloNamespaces[ns]; + console.log(`Deleted colocated namespace: ${ns}`); + await client.query("COMMIT"); + await notify.commit(); + } catch (error) { + await client.query("ROLLBACK"); + throw error; + } finally { + client.release(); + } + break; + } + } +} + +async function visitNamespace(ns) { + // + // Conditions to ensure, in order: + // - site record exists in database (else create it) + // - accesspoint record exists in database (else create it) + // - Site CR is installed in the namespace (else apply it) + // - if the site record is in READY or ACTIVE state, the site certificate is installed in namespace (else apply it) + // - RouterAccess CR is installed in the namespace (else apply it) + // - accesspoint has host/port attributes matching the RouterAccess CR (else set accesspoint host/port and status to NEW) + // - accesspoint is in READY state and the server certificate is installed in namespace (else apply it) + // + console.log(`visitNamespace[${ns}]`); + const client = await ClientFromPool('system'); + const notify = new NotifyTransaction(); + const undoSite = coloNamespaces[ns].site === null; + const undoAp = coloNamespaces[ns].accesspoint === null; try { - const result = await client.query( - "SELECT Name, DeploymentState, Certificate, TlsCertificates.ObjectName " + - "FROM InteriorSites " + - "JOIN TlsCertificates ON Certificate = TlsCertificates.Id " + - "WHERE Interiorsites.Id = $1", [siteId]); - - if (result.rowCount != 1) { - throw new Error('Site secret not found'); + await client.query("BEGIN"); + + // + // Ensure site record exists in database (else create it) + // + if (!coloNamespaces[ns].site) { + const result = await client.query( + "INSERT INTO InteriorSites(Name, TargetPlatform, CoLocated, Backbone) " + + "VALUES ('co-located', 'sk2', true, $1) RETURNING *", + [coloNamespaces[ns].backbone.id] + ); + const site = result.rows[0]; + coloNamespaces[ns].site = site; + siteIndex[site.id] = ns; + notify.add('InteriorSites', site.id); + console.log('....site not found in record, created'); } - const site = result.rows[0]; - - if (site.deploymentstate == 'not-ready') { - throw new Error("Not permitted, site not ready for deployment"); + // + // Ensure accesspoint record exists in database (else create it) + // + if (!coloNamespaces[ns].accesspoint) { + const result = await client.query( + "INSERT INTO BackboneAccessPoints(Name, Kind, InteriorSite, AccessType) " + + "VALUES ('manage', 'manage', $1, 'local') RETURNING *", + [coloNamespaces[ns].site.id] + ); + const ap = result.rows[0]; + coloNamespaces[ns].accesspoint = ap; + apIndex[ap.id] = ns; + notify.add('BackboneAccessPoints', ap.id); + console.log('....access point not found in record, created'); } - const secret = await kube.LoadSecret(site.objectname); - let output = [ - resourceTemplates.ServiceAccount(), - resourceTemplates.BackboneRole(), - resourceTemplates.RoleBinding(), - resourceTemplates.Deployment(siteId, true, 'sk2'), - resourceTemplates.Secret(secret, `skx-site-${siteId}`, common.INJECT_TYPE_SITE, `tls-site-${siteId}`), - resourceTemplates.BackboneSite(site.name, siteId), - resourceTemplates.NetworkCR('mbone'), - ]; - - const accessPoints = await sync.GetBackboneAccessPoints_TX(client, siteId, true); - for (const [apId, apData] of Object.entries(accessPoints)) { - if (apData.kind == 'manage') { - output.push(resourceTemplates.AccessPointConfigMap(apId, apData)); + + // + // Ensure Site CR is installed in the namespace (else apply it) + // + const sitecrs = await kube.GetSites(ns); + if (sitecrs.length == 0) { + const resources = [ + resourceTemplates.ServiceAccount(), + resourceTemplates.BackboneRole(), + resourceTemplates.RoleBinding(), + resourceTemplates.Deployment(coloNamespaces[ns].site.id, true, 'sk2'), + resourceTemplates.BackboneSite(coloNamespaces[ns].site.name, coloNamespaces[ns].site.id), + resourceTemplates.NetworkCR('mbone'), + ]; + for (const obj of resources) { + await kube.ApplyObject(obj, ns) } + console.log('....Site CR missing, applied site resources to colo namespace'); + } + + // + // Ensure that if the site record is in READY or ACTIVE state, the site certificate is installed in namespace (else apply it) + // + // TODO: Check the contents of the secret to see if it needs to be updated (for certificate rotation) + // + if (['ready', 'active'].indexOf(coloNamespaces[ns].site.lifecycle) >= 0) { + const siteSecretName = `skx-site-${coloNamespaces[ns].site.id}`; + const siteSecret = await kube.LoadSecret(siteSecretName, ns); + if (!siteSecret) { + const cert = await client.query("SELECT objectname FROM TlsCertificates WHERE Id = $1", [coloNamespaces[ns].site.certificate]).then(res => res.rows[0]); + const secret = await kube.LoadSecret(cert.objectname); + const resource = resourceTemplates.Secret(secret, siteSecretName); + await kube.ApplyObject(resource, ns); + console.log('....Site client certificate not found in namespace, applied'); + } + } + + // + // Ensure RouterAccess CR is installed in the namespace (else apply it) + // + const apName = 'vms-colo-manage'; + const apSecretName = 'vms-colo-manage'; + let ap = await kube.LoadRouterAccess(apName, ns); + if (!ap) { + const resource = resourceTemplates.RouterAccessColoManage(apName, apSecretName); + await kube.ApplyObject(resource, ns); + console.log('....RouterAccess resource not found in namespace, applied'); } - return output; - } catch (err) { - throw new Error('Failed to fetch site yaml: ' + err.message); + + // + // Ensure accesspoint has host/port attributes matching the RouterAccess CR (else set accesspoint host/port and status to NEW) + // + if (!!ap + && ap.status?.endpoints?.length == 1 + && (ap.status.endpoints[0].host != coloNamespaces[ns].accesspoint.hostname + || ap.status.endpoints[0].port != coloNamespaces[ns].accesspoint.port) + ) { + const ep = ap.status.endpoints[0]; + const result = await client.query( + "UPDATE BackboneAccessPoints SET hostname = $2, port = $3, lifecycle = $4 WHERE Id = $1 RETURNING *", + [coloNamespaces[ns].accesspoint.id, ep.host, ep.port, 'new'] + ); + notify.update('BackboneAccessPoints', coloNamespaces[ns].accesspoint.id); + coloNamespaces[ns].accesspoint = result.rows[0]; + console.log('....Host/Port for the access point was mismatched. Updated database record'); + } + + // + // Ensure that if accesspoint is in READY state, the server certificate is installed in namespace (else apply it) + // + if (coloNamespaces[ns].accesspoint.lifecycle === 'ready') { + const apSecret = await kube.LoadSecret(apSecretName, ns); + if (!apSecret) { + const cert = await client.query("SELECT objectname FROM TlsCertificates WHERE Id = $1", [coloNamespaces[ns].accesspoint.certificate]).then(res => res.rows[0]); + const secret = await kube.LoadSecret(cert.objectname); + const resource = resourceTemplates.Secret(secret, apSecretName); + await kube.ApplyObject(resource, ns); + console.log('....Access point server certificate not found in namespace, applied'); + } + } + + await client.query("COMMIT"); + await notify.commit(); + } catch (error) { + await client.query("ROLLBACK"); + if (undoSite) { coloNamespaces[ns].site = null; } + if (undoAp) { coloNamespaces[ns].accesspoint = null; } + Log(`Exception in visitNamespace(${ns}): ${error.stack}`); } finally { - client.release() + client.release(); } } diff --git a/components/management-controller/src/notify.js b/components/management-controller/src/notify.js index 7d0d521..52a4dab 100644 --- a/components/management-controller/src/notify.js +++ b/components/management-controller/src/notify.js @@ -23,10 +23,18 @@ * This module is the central clearinghouse for database change updates. * Any module may register a handler for notification of data changes. * - * The notification handler has the arguments: (action, tableName, id) - * Where action is ADD, EXISTS, DELETE, UPDATE + * The notification handler has the arguments: (action, tableName, id, data) + * Where action is ADD, DELETE, UPDATE, EXISTS, EXISTS_COMPLETE * tableName is the name of the database table that was modified * id is the unique key of the changed row in the database + * data is the entire data record (only supplied in EXISTS notifications) + * + * Action: + * ADD - A new data row was created + * DELETE - A data row was deleted + * UPDATE - An existing data row was modified + * EXISTS - during initial-notification, indicates that a row exists in the database + * EXISTS_COMPLETE - initial-notification is complete. No further EXISTS events will occur on this handler. * * Triggering notifications mirrors the database transaction lifecycle. * Start by allocating a NotifyTransaction. In the body of the transaction, @@ -61,10 +69,11 @@ export async function RegisterNotification(tableName, handler, initialNotificati setTimeout(async () => { const client = await ClientFromPool('system'); try { - const rows = await client.query(`SELECT Id FROM ${tableName}`).then(result => result.rows); + const rows = await client.query(`SELECT * FROM ${tableName}`).then(result => result.rows); for (const row of rows) { - await handler('EXISTS', tableName, row.id); + await handler('EXISTS', tableName, row.id, row); } + await handler('EXISTS_COMPLETE', tableName); } catch (error) { Log(`Exception in initial notification: ${error.message}`); } finally { @@ -107,10 +116,14 @@ export class NotifyTransaction { for (const item of this.events) { const handlers = registeredHandlers[item.tableName] || []; for (const h of handlers) { -// Log(`Calling notify handler for table ${item.tableName}, id ${item.id}`); - await h(item.action, item.tableName, item.id); - await WatchNotify(item.tableName, item.id); + try { + await h(item.action, item.tableName, item.id); + } catch (error) { + Log('Exception in notification handler:', item); + Log(error.stack); + } } + await WatchNotify(item.tableName, item.id); } } } diff --git a/components/management-controller/src/resource-templates.js b/components/management-controller/src/resource-templates.js index a6b159f..54eda44 100644 --- a/components/management-controller/src/resource-templates.js +++ b/components/management-controller/src/resource-templates.js @@ -85,11 +85,6 @@ export function BackboneSite(name, siteId) { }, spec : { linkAccess : 'none', - settings : { - 'management-plane' : 'skupperx', - 'skupperx-site-id' : siteId, - 'skupperx-site-type' : 'backbone', - } }, }; } @@ -188,6 +183,27 @@ function getRouterAccessRole(kind) { } } +export function RouterAccessColoManage(name, secretName) { + return { + apiVersion : 'skupper.io/v2alpha1', + kind : 'RouterAccess', + metadata : { + name : name, + annotations : { + [META_ANNOTATION_SKUPPERX_CONTROLLED] : 'true', + }, + }, + spec: { + tlsCredentials: secretName, + generateTlsCredentials: false, + roles : [{ + name: getRouterAccessRole('manage'), + }], + accessType: 'local', + }, + }; +} + const accessPointRouterAccess = function(apId, data) { const name = short_access_name(`${data.kind}-${apId}`); let routerAccess = { diff --git a/components/management-controller/src/site-deployment-state.js b/components/management-controller/src/site-deployment-state.js index 0947996..0b5ac56 100644 --- a/components/management-controller/src/site-deployment-state.js +++ b/components/management-controller/src/site-deployment-state.js @@ -33,6 +33,8 @@ const evaluateSingleSite_TX = async function (client, notify, site) { if (site.lifecycle == 'active') { state = 'deployed'; + } else if (site.colocated) { + state = 'colo-automatic'; } else if (site.lifecycle == 'ready') { // // Find the links which come from this site and go to access points on sites with deployed state diff --git a/components/management-controller/src/sync-management.js b/components/management-controller/src/sync-management.js index 356f6aa..6b0f90b 100644 --- a/components/management-controller/src/sync-management.js +++ b/components/management-controller/src/sync-management.js @@ -112,8 +112,11 @@ async function onNewBackboneSite(peerId) { throw new Error(`InteriorSite not found using id ${peerId}`); } const site = siteResult.rows[0]; - const secret = await LoadSecret(site.objectname); - localState[`tls-site-${peerId}`] = HashOfSecret(secret); + if (!site.colocated) { + // Don't sync the site secret to colocated sites. + const secret = await LoadSecret(site.objectname); + localState[`tls-site-${peerId}`] = HashOfSecret(secret); + } // // Find all of the access points associated with this backbone site. @@ -121,6 +124,10 @@ async function onNewBackboneSite(peerId) { // const accessResult = await client.query("SELECT Id, Lifecycle, Certificate, Kind, BindHost, AccessType, Hostname, Port FROM BackboneAccessPoints WHERE InteriorSite = $1", [peerId]); for (const accessPoint of accessResult.rows) { + if (accessPoint.kind == 'manage' && site.colocated) { + // Don't sync the manage access point to colocated sites. + continue; + } let apData = { kind : accessPoint.kind, }; diff --git a/components/site-controller/src/ingress-v2.js b/components/site-controller/src/ingress-v2.js index 3f66cf1..3b2a0c3 100644 --- a/components/site-controller/src/ingress-v2.js +++ b/components/site-controller/src/ingress-v2.js @@ -105,6 +105,9 @@ const handleAccessResource = async function(oper, access) { return; } const apId = Annotation(access, META_ANNOTATION_STATE_ID); + if (!apId) { + return; + } const apKind = getAccessPointKindFromAccess(access); const kind = access.kind; const name = access.metadata.name; diff --git a/modules/src/common.js b/modules/src/common.js index f7bffd0..c7734b7 100644 --- a/modules/src/common.js +++ b/modules/src/common.js @@ -32,6 +32,7 @@ export const APPLICATION_ROUTER_LABEL = "skx-router" // Kubernetes annotation keys // export const META_ANNOTATION_SKUPPERX_CONTROLLED = "skupper.io/skupperx-controlled" +export const META_ANNOTATION_BACKBONE_ID = "skupper.io/backbone-id" export const META_ANNOTATION_STATE_HASH = "skx/state-hash" export const META_ANNOTATION_STATE_KEY = "skx/state-key" export const META_ANNOTATION_STATE_DIR = "skx/state-dir" diff --git a/modules/src/kube.js b/modules/src/kube.js index 175d348..09be535 100644 --- a/modules/src/kube.js +++ b/modules/src/kube.js @@ -164,11 +164,11 @@ export async function GetSecrets() { return list.items } -export async function LoadSecret(name) { +export async function LoadSecret(name, ns) { try { return await v1Api.readNamespacedSecret({ name: name, - namespace: namespace, + namespace: ns || namespace, }) } catch (e) {} return undefined @@ -216,7 +216,7 @@ export async function DeleteConfigmap(name) { export async function GetNamespaces() { try { - return await v1Api.listNamespace() + return await v1Api.listNamespace().then(data => data.items); } catch { Log("Error listing namespaces") } @@ -229,7 +229,7 @@ export async function createNamespace(name) { metadata: { name: name, annotations: { - [common.META_ANNOTATION_SKUPPERX_CONTROLLED]: "true" + [common.META_ANNOTATION_SKUPPERX_CONTROLLED] : "true", } }, } @@ -377,11 +377,11 @@ export async function DeleteDeployment(name) { }) } -export async function GetSites() { +export async function GetSites(ns) { let list = await customApi.listNamespacedCustomObject({ group: "skupper.io", version: "v2alpha1", - namespace: namespace, + namespace: ns || namespace, plural: "sites", }) return list.items @@ -408,25 +408,28 @@ export async function LoadNetworkAccess(name) { return resource } -export async function GetRouterAccesses() { +export async function GetRouterAccesses(ns) { let list = await customApi.listNamespacedCustomObject({ group: "skupper.io", version: "v2alpha1", - namespace: namespace, + namespace: ns || namespace, plural: "routeraccesses", }) return list.items } -export async function LoadRouterAccess(name) { - let resource = await customApi.getNamespacedCustomObject({ - group: "skupper.io", - version: "v2alpha1", - name: name, - namespace: namespace, - plural: "routeraccesses", - }) - return resource +export async function LoadRouterAccess(name, ns) { + try { + let resource = await customApi.getNamespacedCustomObject({ + group: "skupper.io", + version: "v2alpha1", + name: name, + namespace: ns || namespace, + plural: "routeraccesses", + }) + return resource + } catch (error) {} + return undefined; } export async function DeleteSkupperResource(plural, name) { @@ -662,30 +665,40 @@ export function WatchPods(callback) { } } -var routerAccessWatches = [] -const startWatchRouterAccesses = function () { +var routerAccessWatches = {} // {namespace => list of watches} +const startWatchRouterAccesses = function (ns) { routerAccessWatch.watch( - `/apis/skupper.io/v2alpha1/namespaces/${namespace}/routeraccesses`, + `/apis/skupper.io/v2alpha1/namespaces/${ns}/routeraccesses`, {}, (type, apiObj, watchObj) => { - for (const callback of routerAccessWatches) { - callback(type, apiObj) + const toDelete = []; + for (const callback of routerAccessWatches[ns]) { + if (callback(type, apiObj) === 'cancel') { + toDelete.push(callback); + } } + routerAccessWatches[ns] = routerAccessWatches[ns].filter(item => toDelete.indexOf(item) == -1); }, (err) => { if (err) { watchErrorCount++ lastWatchError = `RouterAccesses: ${err}` } - startWatchRouterAccesses() + if (routerAccessWatches[ns].length > 0) { + startWatchRouterAccesses(ns); + } }, - ) + ); } -export function startWatchRouterAccessesFn(callback) { - routerAccessWatches.push(callback) - if (routerAccessWatches.length == 1) { - startWatchRouterAccesses() +export function startWatchRouterAccessesFn(callback, ns) { + ns = ns || namespace; + if (!routerAccessWatches[ns]) { + routerAccessWatches[ns] = []; + } + routerAccessWatches[ns].push(callback) + if (routerAccessWatches[ns].length == 1) { + startWatchRouterAccesses(ns) } } From 70a35d872deccc1229eca10164c6b6ae28ad81d9 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 12 Jun 2026 13:57:53 -0400 Subject: [PATCH 2/7] Fixed broken links (site certificate was not recognized by the site controller) --- .../src/backbone-links.js | 24 ++++++++++++++++--- .../management-controller/src/colo-sync.js | 3 ++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/components/management-controller/src/backbone-links.js b/components/management-controller/src/backbone-links.js index 3f72a57..7f45a9a 100644 --- a/components/management-controller/src/backbone-links.js +++ b/components/management-controller/src/backbone-links.js @@ -27,6 +27,7 @@ import { LoadSecret } from '@skupperx/modules/kube' import { Log } from '@skupperx/modules/log' import { ClientFromPool } from './db.js'; import { OpenConnection, CloseConnection } from '@skupperx/modules/amqp' +import { NotifyTransaction, RegisterNotification } from './notify.js'; let controller_name; let tls_ca; @@ -67,8 +68,14 @@ async function deleteConnection(apid) { } } +async function periodicCheck() { + const normal_period = 30000; + const startup_period = 2000; + await reconcileBackboneConnections(); + setTimeout(periodicCheck, !!tls_cert ? normal_period : startup_period); +} + async function reconcileBackboneConnections() { - let reschedule_delay = 30000; const client = await ClientFromPool('system'); try { await client.query('BEGIN'); @@ -99,7 +106,6 @@ async function reconcileBackboneConnections() { reschedule_delay = 10000; } finally { client.release(); - setTimeout(reconcileBackboneConnections, reschedule_delay); } } @@ -153,17 +159,20 @@ async function resolveTLSData() { async function resolveControllerRecord() { let reschedule_delay = -1; const client = await ClientFromPool('system'); + const notify = new NotifyTransaction(); try { await client.query('BEGIN'); const result = await client.query("SELECT * FROM ManagementControllers WHERE Name = $1", [controller_name]); if (result.rowCount == 1) { setTimeout(resolveTLSData, 0); } else { - await client.query("INSERT INTO ManagementControllers (Name) VALUES ($1)", [controller_name]); + const addResult = await client.query("INSERT INTO ManagementControllers (Name) VALUES ($1) RETURNING Id", [controller_name]); + notify.add('ManagementControllers', addResult.rows[0].id); setTimeout(resolveTLSData, 1000); Log(`No management controller found for '${controller_name}', created new record`); } await client.query('COMMIT'); + await notify.commit(); } catch (err) { Log(`Rolling back resolveControllerRecord transaction: ${err.stack}`); await client.query('ROLLBACK'); @@ -176,6 +185,13 @@ async function resolveControllerRecord() { } } +async function onAccessPointChange(action, tableName, id) { + console.log(`onAccessPointChange ${action}, ${tableName}, ${id}`); + if ((action == 'DELETE' || action == 'UPDATE') && id in manageConnections) { + await reconcileBackboneConnections(); + } +} + export async function RegisterHandler(onAdded, onDeleted) { for (const [key, value] of Object.entries(manageConnections)) { await onAdded(key, value.conn); @@ -191,4 +207,6 @@ export async function Start(name) { Log(`[Backbone-links module starting for controller: ${name}]`); controller_name = name; await resolveControllerRecord(); + RegisterNotification('BackboneAccessPoints', onAccessPointChange, false); + setTimeout(periodicCheck, 5000); } diff --git a/components/management-controller/src/colo-sync.js b/components/management-controller/src/colo-sync.js index 093a103..c963a57 100644 --- a/components/management-controller/src/colo-sync.js +++ b/components/management-controller/src/colo-sync.js @@ -172,6 +172,7 @@ async function doInitialReconcile() { for (const [ns, data] of Object.entries(coloNamespaces)) { if (!data.backbone) { await kube.deleteNamespace(ns); + delete coloNamespaces[ns]; } else { const client = await ClientFromPool('system'); try { @@ -326,7 +327,7 @@ async function visitNamespace(ns) { if (!siteSecret) { const cert = await client.query("SELECT objectname FROM TlsCertificates WHERE Id = $1", [coloNamespaces[ns].site.certificate]).then(res => res.rows[0]); const secret = await kube.LoadSecret(cert.objectname); - const resource = resourceTemplates.Secret(secret, siteSecretName); + const resource = resourceTemplates.Secret(secret, siteSecretName, common.INJECT_TYPE_SITE); await kube.ApplyObject(resource, ns); console.log('....Site client certificate not found in namespace, applied'); } From 19352fafc447b9a0541929e01e744543aad2a67d Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 12 Jun 2026 15:51:01 -0400 Subject: [PATCH 3/7] Fixed the problem with deleting backbones with co-located sites. --- charts/helmfile/resources/db-setup.sql | 4 ++-- .../management-controller/src/api-admin.js | 20 +++++++++---------- .../management-controller/src/colo-sync.js | 7 +++++++ 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/charts/helmfile/resources/db-setup.sql b/charts/helmfile/resources/db-setup.sql index 197665a..e909fcb 100644 --- a/charts/helmfile/resources/db-setup.sql +++ b/charts/helmfile/resources/db-setup.sql @@ -164,7 +164,7 @@ CREATE TABLE Backbones ( Name text UNIQUE, Lifecycle LifecycleType DEFAULT 'new', Failure text, - Certificate UUID REFERENCES TlsCertificates, + Certificate UUID REFERENCES TlsCertificates ON DELETE CASCADE, CoLocatedNamespace text UNIQUE DEFAULT NULL, Owner UUID REFERENCES Users, OwnerGroup text @@ -181,7 +181,7 @@ CREATE TABLE InteriorSites ( Name text, Lifecycle LifecycleType DEFAULT 'new', Failure text, - Certificate UUID REFERENCES TlsCertificates, + Certificate UUID REFERENCES TlsCertificates ON DELETE CASCADE, DeploymentState DeploymentStateType DEFAULT 'not-ready', TargetPlatform text REFERENCES TargetPlatforms, diff --git a/components/management-controller/src/api-admin.js b/components/management-controller/src/api-admin.js index 856d4a4..388ccc5 100644 --- a/components/management-controller/src/api-admin.js +++ b/components/management-controller/src/api-admin.js @@ -471,21 +471,19 @@ const deleteBackbone = async function(req, res) { if (vanResult.rowCount > 0) { throw new Error('Cannot delete a backbone with active application networks'); } - const siteResult = await client.query("SELECT Id, Certificate, CoLocated FROM InteriorSites WHERE Backbone = $1", [bid]); + const siteResult = await client.query("SELECT Id, Certificate FROM InteriorSites WHERE Backbone = $1 AND CoLocated = false", [bid]); + console.log(siteResult.rowCount); if (siteResult.rowCount > 0) { - if (siteResult.rowCount > 1 || !siteResult.rows[0].colocated) { - throw new Error('Cannot delete a backbone with interior sites'); - } + throw new Error('Cannot delete a backbone with interior sites'); + } + const coloResult = await client.query("DELETE FROM InteriorSites WHERE Backbone = $1 AND CoLocated = true RETURNING Id, Certificate", [bid]); + console.log(coloResult.rowCount); + if (coloResult.rowCount == 1) { + const colo = coloResult.rows[0]; + notify.delete('InteriorSites', colo.id); } const bbResult = await client.query("DELETE FROM Backbones WHERE Id = $1 RETURNING Certificate", [bid]); notify.delete('Backbones', bid); - if (bbResult.rowCount == 1) { - const row = bbResult.rows[0]; - if (row.certificate) { - await client.query("DELETE FROM TlsCertificates WHERE Id = $1", [row.certificate]); - notify.delete('TlsCertificates', row.certificate); - } - } }); res.status(returnStatus).end(); await notify.commit(); diff --git a/components/management-controller/src/colo-sync.js b/components/management-controller/src/colo-sync.js index c963a57..06b1de2 100644 --- a/components/management-controller/src/colo-sync.js +++ b/components/management-controller/src/colo-sync.js @@ -53,6 +53,7 @@ export async function Start() { backbone : null, site : null, accesspoint : null, + deleting : false, }; } } @@ -96,6 +97,7 @@ async function onSiteChange(action, tableName, sid) { } } else if (action === 'DELETE') { coloNamespaces[ns].site = null; + coloNamespaces[ns].deleting = true; await visitNamespace(ns); } } @@ -121,6 +123,7 @@ async function onAccessPointChange(action, tableName, apid) { } } else if (action === 'DELETE') { coloNamespaces[ns].accesspoint = null; + coloNamespaces[ns].deleting = true; await visitNamespace(ns); } } @@ -208,6 +211,7 @@ async function addColoNamespace(backbone) { backbone : backbone, site : null, accesspoint : null, + deleting : false, }; console.log(`Created colocated namespace: ${backbone.colocatednamespace}`); await visitNamespace(backbone.colocatednamespace); @@ -258,6 +262,9 @@ async function visitNamespace(ns) { // - accesspoint is in READY state and the server certificate is installed in namespace (else apply it) // console.log(`visitNamespace[${ns}]`); + if (!coloNamespaces[ns] || coloNamespaces[ns].deleting) { + return; + } const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); const undoSite = coloNamespaces[ns].site === null; From 6b5c30f2366200cc9b89410ad2899fa6426bd69a Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Fri, 12 Jun 2026 16:07:08 -0400 Subject: [PATCH 4/7] Fixed problem with incorrect deployment states on co-located sites. --- .../management-controller/src/site-deployment-state.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/components/management-controller/src/site-deployment-state.js b/components/management-controller/src/site-deployment-state.js index 0b5ac56..023b618 100644 --- a/components/management-controller/src/site-deployment-state.js +++ b/components/management-controller/src/site-deployment-state.js @@ -69,7 +69,7 @@ const evaluateSingleSite_TX = async function (client, notify, site) { } export async function SiteLifecycleChanged_TX(client, notify, siteId, newState) { - const result = await client.query("SELECT Id, Lifecycle, DeploymentState FROM InteriorSites WHERE Id = $1", [siteId]); + const result = await client.query("SELECT Id, Lifecycle, DeploymentState, CoLocated FROM InteriorSites WHERE Id = $1", [siteId]); if (result.rowCount == 1) { const site = result.rows[0]; await evaluateSingleSite_TX(client, notify, site); @@ -81,7 +81,7 @@ export async function SiteLifecycleChanged_TX(client, notify, siteId, newState) "JOIN BackboneAccessPoints ON BackboneAccessPoints.Id = AccessPoint " + "WHERE BackboneAccessPoints.InteriorSite = $1", [siteId]); for (const row of connected.rows) { - const siteResult = await client.query("SELECT Id, Lifecycle, DeploymentState FROM InteriorSites WHERE Id = $1", [row.connectinginteriorsite]); + const siteResult = await client.query("SELECT Id, Lifecycle, DeploymentState, CoLocated FROM InteriorSites WHERE Id = $1", [row.connectinginteriorsite]); if (siteResult.rowCount == 1) { await evaluateSingleSite_TX(client, notify, siteResult.rows[0]); } @@ -102,7 +102,7 @@ export async function LinkAddedOrDeleted(connectingSiteId, accessPointId) { "JOIN InteriorSites ON InteriorSites.Id = InteriorSite " + "WHERE BackboneAccessPoints.Id = $1", [accessPointId]); if (lResult.rowCount == 1 && lResult.rows[0].deploymentstate == 'deployed') { - const cResult = await client.query("SELECT Id, Lifecycle, DeploymentState FROM InteriorSites WHERE Id = $1", [connectingSiteId]); + const cResult = await client.query("SELECT Id, Lifecycle, DeploymentState, CoLocated FROM InteriorSites WHERE Id = $1", [connectingSiteId]); if (cResult.rowCount == 1) { await evaluateSingleSite_TX(client, notify, cResult.rows[0]); } @@ -123,7 +123,7 @@ export async function ManageIngressAdded(siteId) { const notify = new NotifyTransaction(); try { await client.query("BEGIN"); - const result = await client.query("SELECT Id, Lifecycle, DeploymentState FROM InteriorSites WHERE Id = $1", [siteId]); + const result = await client.query("SELECT Id, Lifecycle, DeploymentState, CoLocated FROM InteriorSites WHERE Id = $1", [siteId]); if (result.rowCount == 1) { const site = result.rows[0]; if (site.deploymentstate == 'not-ready') { @@ -146,7 +146,7 @@ export async function ManageIngressDeleted(siteId) { const notify = new NotifyTransaction(); try { await client.query("BEGIN"); - const result = await client.query("SELECT Id, Lifecycle, DeploymentState FROM InteriorSites WHERE Id = $1", [siteId]); + const result = await client.query("SELECT Id, Lifecycle, DeploymentState, CoLocated FROM InteriorSites WHERE Id = $1", [siteId]); if (result.rowCount == 1) { const site = result.rows[0]; if (site.deploymentstate == 'ready-bootstrap') { From 1f63bb225591ceffaecd5d553c5ccc3426fbfacb Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 15 Jun 2026 13:31:58 -0400 Subject: [PATCH 5/7] Removed a bunch of console.log outputs used in initial debugging. --- components/management-controller/src/api-admin.js | 2 -- components/management-controller/src/colo-sync.js | 15 ++------------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/components/management-controller/src/api-admin.js b/components/management-controller/src/api-admin.js index 388ccc5..d9a9943 100644 --- a/components/management-controller/src/api-admin.js +++ b/components/management-controller/src/api-admin.js @@ -472,12 +472,10 @@ const deleteBackbone = async function(req, res) { throw new Error('Cannot delete a backbone with active application networks'); } const siteResult = await client.query("SELECT Id, Certificate FROM InteriorSites WHERE Backbone = $1 AND CoLocated = false", [bid]); - console.log(siteResult.rowCount); if (siteResult.rowCount > 0) { throw new Error('Cannot delete a backbone with interior sites'); } const coloResult = await client.query("DELETE FROM InteriorSites WHERE Backbone = $1 AND CoLocated = true RETURNING Id, Certificate", [bid]); - console.log(coloResult.rowCount); if (coloResult.rowCount == 1) { const colo = coloResult.rows[0]; notify.delete('InteriorSites', colo.id); diff --git a/components/management-controller/src/colo-sync.js b/components/management-controller/src/colo-sync.js index 06b1de2..3f5c8b4 100644 --- a/components/management-controller/src/colo-sync.js +++ b/components/management-controller/src/colo-sync.js @@ -79,7 +79,6 @@ async function visitIncompleteSites() { } async function onSiteChange(action, tableName, sid) { - console.log(`onSiteChange: ${action}, ${sid}`); const ns = siteIndex[sid]; if (ns) { if (action === 'UPDATE') { @@ -104,14 +103,12 @@ async function onSiteChange(action, tableName, sid) { } async function onAccessPointChange(action, tableName, apid) { - console.log(`onAccessPointChange: ${action}, ${apid}`); const ns = apIndex[apid]; if (ns) { if (action === 'UPDATE') { const client = await ClientFromPool('system'); try { const result = await client.query("SELECT * FROM BackboneAccessPoints WHERE Id = $1", [apid]); - console.log(`....loaded updated access points, rowCount: ${result.rowCount}`); if (result.rowCount == 1) { coloNamespaces[ns].accesspoint = result.rows[0]; await visitNamespace(ns); @@ -213,7 +210,7 @@ async function addColoNamespace(backbone) { accesspoint : null, deleting : false, }; - console.log(`Created colocated namespace: ${backbone.colocatednamespace}`); + Log(`Created colocated namespace: ${backbone.colocatednamespace}`); await visitNamespace(backbone.colocatednamespace); } @@ -236,7 +233,7 @@ async function handleDeletedBackbone(bbid) { } await kube.deleteNamespace(ns); delete coloNamespaces[ns]; - console.log(`Deleted colocated namespace: ${ns}`); + Log(`Deleted colocated namespace: ${ns}`); await client.query("COMMIT"); await notify.commit(); } catch (error) { @@ -261,7 +258,6 @@ async function visitNamespace(ns) { // - accesspoint has host/port attributes matching the RouterAccess CR (else set accesspoint host/port and status to NEW) // - accesspoint is in READY state and the server certificate is installed in namespace (else apply it) // - console.log(`visitNamespace[${ns}]`); if (!coloNamespaces[ns] || coloNamespaces[ns].deleting) { return; } @@ -285,7 +281,6 @@ async function visitNamespace(ns) { coloNamespaces[ns].site = site; siteIndex[site.id] = ns; notify.add('InteriorSites', site.id); - console.log('....site not found in record, created'); } // @@ -301,7 +296,6 @@ async function visitNamespace(ns) { coloNamespaces[ns].accesspoint = ap; apIndex[ap.id] = ns; notify.add('BackboneAccessPoints', ap.id); - console.log('....access point not found in record, created'); } // @@ -320,7 +314,6 @@ async function visitNamespace(ns) { for (const obj of resources) { await kube.ApplyObject(obj, ns) } - console.log('....Site CR missing, applied site resources to colo namespace'); } // @@ -336,7 +329,6 @@ async function visitNamespace(ns) { const secret = await kube.LoadSecret(cert.objectname); const resource = resourceTemplates.Secret(secret, siteSecretName, common.INJECT_TYPE_SITE); await kube.ApplyObject(resource, ns); - console.log('....Site client certificate not found in namespace, applied'); } } @@ -349,7 +341,6 @@ async function visitNamespace(ns) { if (!ap) { const resource = resourceTemplates.RouterAccessColoManage(apName, apSecretName); await kube.ApplyObject(resource, ns); - console.log('....RouterAccess resource not found in namespace, applied'); } // @@ -367,7 +358,6 @@ async function visitNamespace(ns) { ); notify.update('BackboneAccessPoints', coloNamespaces[ns].accesspoint.id); coloNamespaces[ns].accesspoint = result.rows[0]; - console.log('....Host/Port for the access point was mismatched. Updated database record'); } // @@ -380,7 +370,6 @@ async function visitNamespace(ns) { const secret = await kube.LoadSecret(cert.objectname); const resource = resourceTemplates.Secret(secret, apSecretName); await kube.ApplyObject(resource, ns); - console.log('....Access point server certificate not found in namespace, applied'); } } From d4316b756a776c92f6c2b395246aa69830c14353 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Mon, 15 Jun 2026 15:57:35 -0400 Subject: [PATCH 6/7] Fixes #130 - Sync Listeners into co-located sites for each managed VAN. --- .../src/resource-templates.js | 2 +- .../src/sync-management.js | 75 ++++++++++++++++++- .../site-controller/src/sync-site-kube.js | 39 +++++++--- modules/src/common.js | 1 + modules/src/kube.js | 24 ++++++ 5 files changed, 127 insertions(+), 14 deletions(-) diff --git a/components/management-controller/src/resource-templates.js b/components/management-controller/src/resource-templates.js index 54eda44..b91c852 100644 --- a/components/management-controller/src/resource-templates.js +++ b/components/management-controller/src/resource-templates.js @@ -341,7 +341,7 @@ export function BackboneRole() { }, { apiGroups : ["skupper.io"], - resources : ["sites", "links", "networkaccesses", "routeraccesses"], + resources : ["sites", "links", "networkaccesses", "routeraccesses", "listeners"], verbs : ["get", "list", "watch", "create", "update", "delete", "patch"], }, ], diff --git a/components/management-controller/src/sync-management.js b/components/management-controller/src/sync-management.js index 6b0f90b..656380f 100644 --- a/components/management-controller/src/sync-management.js +++ b/components/management-controller/src/sync-management.js @@ -35,7 +35,7 @@ import { onMewMember, StateRequest } from './sync-application.js'; import { RegisterHandler } from './backbone-links.js'; import { HashOfSecret, HashOfData } from './resource-templates.js'; import { SiteLifecycleChanged_TX } from './site-deployment-state.js'; -import { NotifyTransaction } from './notify.js'; +import { NotifyTransaction, RegisterNotification } from './notify.js'; var peers = {}; // {peerId: {pClass: <>, stuff}} @@ -90,6 +90,7 @@ async function onNewBackboneSite(peerId) { // - tls-server- - Certificates/CAs for the backbone's access points [ id => AccessPoint ] // - access- - Access point {kind: <>, bindHost?: <>, accessType?: <>, tls: } [ id => AccessPoint ] // - link- - Link {host: <>, port: <>, cost: <>} [ id => InterRouterLink ] + // - van- - VAN Endpoints (van-id) [ only sent to co-located bb sites ] [ id => ApplicationNetwork ] // // Remote state: // - accessstatus- - Host/Port for an access point {host: <>, port: <>} @@ -105,9 +106,13 @@ async function onNewBackboneSite(peerId) { // // Query for the site's client certificate // - const siteResult = await client.query("SELECT InteriorSites.Lifecycle, InteriorSites.FirstActiveTime, InteriorSites.Certificate, InteriorSites.CoLocated, TlsCertificates.ObjectName FROM InteriorSites " + - "JOIN TlsCertificates ON TlsCertificates.Id = InteriorSites.Certificate " + - "WHERE InteriorSites.Id = $1", [peerId]); + const siteResult = await client.query( + "SELECT S.Lifecycle, S.FirstActiveTime, S.Certificate, S.CoLocated, S.Backbone, C.ObjectName " + + "FROM InteriorSites AS S " + + "JOIN TlsCertificates AS C ON C.Id = S.Certificate " + + "WHERE S.Id = $1", + [peerId] + ); if (siteResult.rowCount != 1) { throw new Error(`InteriorSite not found using id ${peerId}`); } @@ -116,8 +121,16 @@ async function onNewBackboneSite(peerId) { // Don't sync the site secret to colocated sites. const secret = await LoadSecret(site.objectname); localState[`tls-site-${peerId}`] = HashOfSecret(secret); + } else { + // Do sync the list of managed VANs on the site's backbone + const vanResult = await client.query("SELECT Id, VanId FROM ApplicationNetworks WHERE Backbone = $1", [site.backbone]); + for (const van of vanResult.rows) { + localState[`van-${van.id}`] = HashOfData({vanid: van.vanid}); + } } + peers[peerId].colocated = site.colocated; + // // Find all of the access points associated with this backbone site. // If the access point is 'ready', include its certificate and include remote state for its host/port. @@ -393,6 +406,26 @@ async function getStateMemberLink(linkId) { return [hash, data]; } +async function getStateVanIds(vid) { + let hash = null; + let data = null; + const client = await ClientFromPool('system'); + try { + const vanResult = await client.query("SELECT VanId FROM ApplicationNetworks WHERE Id = $1", [vid]); + if (vanResult.rowCount == 1) { + data = { + vanid : vanResult.rows[0].vanid, + }; + hash = HashOfData(data); + } + } catch (error) { + Log(`Exception in getStateVanIds: ${error.stack}`); + } finally { + client.release(); + } + return [hash, data]; +} + async function onStateRequestBackbone(peerId, stateKey) { var hash = null; var data = null; @@ -405,6 +438,8 @@ async function onStateRequestBackbone(peerId, stateKey) { [hash, data] = await getStateAccessPoint(stateKey.substring(7)); } else if (stateKey.substring(0, 5) == 'link-') { [hash, data] = await getStateBackboneLink(stateKey.substring(5)); + } else if (stateKey.substring(0, 4) == 'van-') { + [hash, data] = await getStateVanIds(stateKey.substring(4)); } else { Log(`Invalid stateKey for onStateRequestBackbone processing: ${stateKey}`); } @@ -735,6 +770,37 @@ export async function LinkChanged(connectingSiteId, linkId) { } } +async function onApplicationNetworkChange(action, tableName, id) { + let hash = null; + let doUpdate = false; + + if (action == 'ADD') { + const client = await ClientFromPool('system'); + try { + const result = await client.query("SELECT vanId FROM ApplicationNetworks WHERE Id = $1", [id]); + if (result.rowCount == 1) { + hash = HashOfData({vanid: result.rows[0].vanid}); + } + doUpdate = true; + } catch (error) { + Log(`Exception in onApplicationNetworkChange: ${error.stack}`); + } finally { + client.release(); + } + doUpdate = true; + } else if (action == 'DELETE') { + doUpdate = true; + } + + if (doUpdate) { + for (const [peerId, peerData] of Object.entries(peers)) { + if (peerData.colocated) { + await UpdateLocalState(peerId, `van-${id}`, hash); + } + } + } +} + export async function NewIngressAvailable(siteId) { // // Update the links/outgoing hash for each site that connects to the indicated site @@ -764,4 +830,5 @@ export async function SiteDeleted(siteId) { export async function Start() { await StateSyncStart(CLASS_MANAGEMENT, 'mc', API_CONTROLLER_ADDRESS, onNewPeer, onPeerLost, onStateChange, onStateRequest, onPing); await RegisterHandler(onLinkAdded, onLinkDeleted); + await RegisterNotification('ApplicationNetworks', onApplicationNetworkChange, false); } diff --git a/components/site-controller/src/sync-site-kube.js b/components/site-controller/src/sync-site-kube.js index d176b18..22aa283 100644 --- a/components/site-controller/src/sync-site-kube.js +++ b/components/site-controller/src/sync-site-kube.js @@ -45,7 +45,8 @@ import { META_ANNOTATION_STATE_TYPE, META_ANNOTATION_STATE_ID, META_ANNOTATION_TLS_INJECT, - API_CONTROLLER_ADDRESS + API_CONTROLLER_ADDRESS, + STATE_TYPE_LISTENER } from '@skupperx/modules/common' import { Annotation, @@ -69,6 +70,9 @@ import { DeleteNetworkAccess, LoadRouterAccess, LoadNetworkAccess, + GetListeners, + LoadListener, + DeleteListener, } from '@skupperx/modules/kube' import { UpdateLocalState as StateSyncUpdateLocalState, @@ -134,7 +138,7 @@ const kubeObjectForState = function(stateKey, data=null) { objName = apKind + '-' + stateId.split('-')[0]; break; case 'link': - apiVersion = 'skupper.io/v2alpha1' + apiVersion = 'skupper.io/v2alpha1'; objKind = 'Link'; stateType = STATE_TYPE_LINK; stateId = stateKey.substring(5); // text following 'link-' @@ -143,13 +147,11 @@ const kubeObjectForState = function(stateKey, data=null) { objKind = 'InMemory'; objDir = 'local'; break; - case 'component': - objKind = 'Spec'; - stateId = stateKey.substring(10); // text following 'component-' - break; - case 'iface': - objKind = 'ConfigMap'; - const role = elements[1]; + case 'van': + apiVersion = 'skupper.io/v2alpha1'; + objKind = 'Listener'; + stateType = STATE_TYPE_LISTENER; + stateId = stateKey.substring(4); // text following 'van-' break; default: throw(Error(`Invalid stateKey prefix: ${elements[0]}`)) @@ -189,10 +191,12 @@ const getInitialHashState = async function() { const configmaps = await GetConfigmaps(); const deployments = await GetDeployments(); const pods = await GetPods(); + const listeners = await GetListeners(); [local, remote] = stateForList(secrets, local, remote); [local, remote] = stateForList(configmaps, local, remote); [local, remote] = stateForList(deployments, local, remote); [local, remote] = stateForList(pods, local, remote); + [local, remote] = stateForList(listeners, local, remote); if (backbone_mode) { const ingressState = await GetInitialState(); for (const [apid, state] of Object.entries(ingressState)) { @@ -218,6 +222,9 @@ const doStateChangeSpec = async function(obj, data) { case "NetworkAccess": await syncNetworkAccessSpec(obj, data); break; + case "Listener": + await syncListenerSpec(obj, data); + break; } } } @@ -244,6 +251,8 @@ const retrieveLatest = async function(apiVersion, objKind, objName) { return await LoadRouterAccess(objName); case "NetworkAccess": return await LoadNetworkAccess(objName); + case "Listener": + return await LoadListener(objName); } } catch (ex) { if ('code' in ex && ex.code != 404) { @@ -316,6 +325,16 @@ async function syncNetworkAccessSpec(obj, data) { } } +async function syncListenerSpec(obj, data) { + const vanId = data.vanid; + obj.spec = { + observer : 'none', + host : `skupper-console-${vanId}`, + port : 8080, + routingKey : `skupper-console-${vanId}`, + }; +} + async function getBackboneClientSecret() { if (!!backboneClientSecret) { return backboneClientSecret; @@ -409,6 +428,8 @@ const onStateChange = async function(peerId, stateKey, hash, data) { await DeleteRouterAccess(objName); } else if (objKind == "NetworkAccess") { await DeleteNetworkAccess(objName); + } else if (objKind == 'Listener') { + await DeleteListener(objName); } } } diff --git a/modules/src/common.js b/modules/src/common.js index c7734b7..752f172 100644 --- a/modules/src/common.js +++ b/modules/src/common.js @@ -45,6 +45,7 @@ export const META_ANNOTATION_TLS_INJECT = "skx/tls-inject" // export const STATE_TYPE_LINK = "link" export const STATE_TYPE_ACCESS_POINT = "accesspoint" +export const STATE_TYPE_LISTENER = "listener" export const INJECT_TYPE_ACCESS_POINT = "accesspoint" export const INJECT_TYPE_SITE = "site" diff --git a/modules/src/kube.js b/modules/src/kube.js index 09be535..e6a7230 100644 --- a/modules/src/kube.js +++ b/modules/src/kube.js @@ -432,6 +432,30 @@ export async function LoadRouterAccess(name, ns) { return undefined; } +export async function GetListeners(ns) { + let list = await customApi.listNamespacedCustomObject({ + group: "skupper.io", + version: "v2alpha1", + namespace: ns || namespace, + plural: "listeners", + }) + return list.items; +} + +export async function LoadListener(name, ns) { + return await customApi.getNamespacedCustomObject({ + group: "skupper.io", + version: "v2alpha1", + namespace: ns || namespace, + plural: "listeners", + name: name, + }) +} + +export async function DeleteListener(name) { + await DeleteSkupperResource("listeners", name) +} + export async function DeleteSkupperResource(plural, name) { await customApi.deleteNamespacedCustomObject({ group: "skupper.io", From 3898f1b0c889decaed0dd937256abcb2b18f4af9 Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Tue, 16 Jun 2026 16:45:01 -0400 Subject: [PATCH 7/7] Addressed changes requested in review. --- charts/management-server/templates/rbac.yaml | 1 + .../src/backbone-links.js | 4 +--- components/management-controller/src/certs.js | 18 +++++++++--------- .../management-controller/src/colo-sync.js | 18 ++++++++++-------- .../management-controller/src/mc-apiserver.js | 1 - components/management-controller/src/notify.js | 8 ++++---- .../src/sync-management.js | 2 +- 7 files changed, 26 insertions(+), 26 deletions(-) diff --git a/charts/management-server/templates/rbac.yaml b/charts/management-server/templates/rbac.yaml index 3f7ed1c..2d75c5e 100644 --- a/charts/management-server/templates/rbac.yaml +++ b/charts/management-server/templates/rbac.yaml @@ -109,6 +109,7 @@ rules: - networks - networkaccesses - routeraccesses + - listeners verbs: - get - list diff --git a/components/management-controller/src/backbone-links.js b/components/management-controller/src/backbone-links.js index 7f45a9a..d803b59 100644 --- a/components/management-controller/src/backbone-links.js +++ b/components/management-controller/src/backbone-links.js @@ -103,7 +103,6 @@ async function reconcileBackboneConnections() { } catch (err) { Log(`Rolling back reconcile-backbone-connections transaction: ${err.stack}`); await client.query('ROLLBACK'); - reschedule_delay = 10000; } finally { client.release(); } @@ -185,8 +184,7 @@ async function resolveControllerRecord() { } } -async function onAccessPointChange(action, tableName, id) { - console.log(`onAccessPointChange ${action}, ${tableName}, ${id}`); +async function onAccessPointChange(action, id) { if ((action == 'DELETE' || action == 'UPDATE') && id in manageConnections) { await reconcileBackboneConnections(); } diff --git a/components/management-controller/src/certs.js b/components/management-controller/src/certs.js index 114ff4a..23721f4 100644 --- a/components/management-controller/src/certs.js +++ b/components/management-controller/src/certs.js @@ -33,7 +33,7 @@ import { NotifyTransaction, RegisterNotification } from './notify.js'; // // When new management controllers are created, add a certificate request. // -async function onManagementControllersChange(action, tableName, id) { +async function onManagementControllersChange(action, id) { if (action != 'DELETE') { const client = await ClientFromPool('system'); try { @@ -68,7 +68,7 @@ async function onManagementControllersChange(action, tableName, id) { // // When new backbones are created, add a certificate request to begin the full setup of the network. // -async function onBackbonesChange(action, tableName, id) { +async function onBackbonesChange(action, id) { const client = await ClientFromPool('system'); try { await client.query('BEGIN'); @@ -101,7 +101,7 @@ async function onBackbonesChange(action, tableName, id) { // // // -async function onAccessPointsChange(action, tableName, id) { +async function onAccessPointsChange(action, id) { const client = await ClientFromPool('system'); try { await client.query('BEGIN'); @@ -144,7 +144,7 @@ async function onAccessPointsChange(action, tableName, id) { // // When new networks are created, add a certificate request to begin the full setup of the network. // -async function onApplicationNetworksChange(action, tableName, id) { +async function onApplicationNetworksChange(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { @@ -188,7 +188,7 @@ async function onApplicationNetworksChange(action, tableName, id) { // // processNewInteriorSites // -async function onInteriorSitesChange(action, tableName, id) { +async function onInteriorSitesChange(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { @@ -223,7 +223,7 @@ async function onInteriorSitesChange(action, tableName, id) { // // processNewInvitations // -const onInvitationsChange = async function(action, tableName, id) { +const onInvitationsChange = async function(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { @@ -258,7 +258,7 @@ const onInvitationsChange = async function(action, tableName, id) { // // processNewMemberSites // -async function onMemberSitesChange(action, tableName, id) { +async function onMemberSitesChange(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { @@ -291,7 +291,7 @@ async function onMemberSitesChange(action, tableName, id) { } -async function onNetworkCredentialsChange(action, tableName, id) { +async function onNetworkCredentialsChange(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { @@ -331,7 +331,7 @@ async function onNetworkCredentialsChange(action, tableName, id) { // // When new networks are created, add a certificate request to begin the full setup of the network. // -async function onCertificateRequestsChange(action, tableName, id) { +async function onCertificateRequestsChange(action, id) { const client = await ClientFromPool('system'); const notify = new NotifyTransaction(); try { diff --git a/components/management-controller/src/colo-sync.js b/components/management-controller/src/colo-sync.js index 3f5c8b4..b0a6e72 100644 --- a/components/management-controller/src/colo-sync.js +++ b/components/management-controller/src/colo-sync.js @@ -27,7 +27,6 @@ import * as kube from "@skupperx/modules/kube" import { Log } from "@skupperx/modules/log" import { ClientFromPool } from "./db.js" import * as resourceTemplates from "./resource-templates.js" -import * as sync from "./sync-management.js" import * as common from "@skupperx/modules/common" import { NotifyTransaction, RegisterNotification } from "./notify.js" @@ -78,7 +77,7 @@ async function visitIncompleteSites() { setTimeout(visitIncompleteSites, 5000); } -async function onSiteChange(action, tableName, sid) { +async function onSiteChange(action, sid) { const ns = siteIndex[sid]; if (ns) { if (action === 'UPDATE') { @@ -102,7 +101,7 @@ async function onSiteChange(action, tableName, sid) { } } -async function onAccessPointChange(action, tableName, apid) { +async function onAccessPointChange(action, apid) { const ns = apIndex[apid]; if (ns) { if (action === 'UPDATE') { @@ -126,9 +125,9 @@ async function onAccessPointChange(action, tableName, apid) { } } -async function onBackboneChange(action, tableName, id, backbone) { +async function onBackboneChange(action, id, unusedTableName, backbone) { switch (action) { - case 'EXISTS': + case 'EXISTS': { const ns = backbone.colocatednamespace; if (ns) { if (coloNamespaces[ns]) { @@ -138,14 +137,17 @@ async function onBackboneChange(action, tableName, id, backbone) { } } break; + } case 'EXISTS_COMPLETE': await doInitialReconcile(); break; case 'ADD': { const client = await ClientFromPool('system'); try { - const backbone = await client.query("SELECT * FROM Backbones WHERE Id = $1", [id]).then(result => result.rows[0]); - await addColoNamespace(backbone); + const bbResult = await client.query("SELECT * FROM Backbones WHERE Id = $1 AND ColocatedNamespace IS NOT NULL", [id]); + if (bbResult.rowCount == 1) { + await addColoNamespace(bbResult.rows[0]); + } } catch (error) { Log('Exception in onBackbonesChange(ADD)'); throw error; @@ -321,7 +323,7 @@ async function visitNamespace(ns) { // // TODO: Check the contents of the secret to see if it needs to be updated (for certificate rotation) // - if (['ready', 'active'].indexOf(coloNamespaces[ns].site.lifecycle) >= 0) { + if (['ready', 'active'].includes(coloNamespaces[ns].site.lifecycle)) { const siteSecretName = `skx-site-${coloNamespaces[ns].site.id}`; const siteSecret = await kube.LoadSecret(siteSecretName, ns); if (!siteSecret) { diff --git a/components/management-controller/src/mc-apiserver.js b/components/management-controller/src/mc-apiserver.js index 366861d..729df98 100644 --- a/components/management-controller/src/mc-apiserver.js +++ b/components/management-controller/src/mc-apiserver.js @@ -158,7 +158,6 @@ const fetchBackboneSiteSkupper2 = async function (req, res) { throw new Error("Not permitted, site not ready for deployment"); } const secret = await LoadSecret(site.objectname); - console.log(site.name, site.certificate, site.objectname); let output = []; output.push(resourceTemplates.ServiceAccount()); output.push(resourceTemplates.BackboneRole()); diff --git a/components/management-controller/src/notify.js b/components/management-controller/src/notify.js index 52a4dab..353df52 100644 --- a/components/management-controller/src/notify.js +++ b/components/management-controller/src/notify.js @@ -23,7 +23,7 @@ * This module is the central clearinghouse for database change updates. * Any module may register a handler for notification of data changes. * - * The notification handler has the arguments: (action, tableName, id, data) + * The notification handler has the arguments: (action, id, tableName, data) * Where action is ADD, DELETE, UPDATE, EXISTS, EXISTS_COMPLETE * tableName is the name of the database table that was modified * id is the unique key of the changed row in the database @@ -71,9 +71,9 @@ export async function RegisterNotification(tableName, handler, initialNotificati try { const rows = await client.query(`SELECT * FROM ${tableName}`).then(result => result.rows); for (const row of rows) { - await handler('EXISTS', tableName, row.id, row); + await handler('EXISTS', row.id, tableName, row); } - await handler('EXISTS_COMPLETE', tableName); + await handler('EXISTS_COMPLETE', null, tableName); } catch (error) { Log(`Exception in initial notification: ${error.message}`); } finally { @@ -117,7 +117,7 @@ export class NotifyTransaction { const handlers = registeredHandlers[item.tableName] || []; for (const h of handlers) { try { - await h(item.action, item.tableName, item.id); + await h(item.action, item.id, item.tableName); } catch (error) { Log('Exception in notification handler:', item); Log(error.stack); diff --git a/components/management-controller/src/sync-management.js b/components/management-controller/src/sync-management.js index 656380f..9230bcf 100644 --- a/components/management-controller/src/sync-management.js +++ b/components/management-controller/src/sync-management.js @@ -770,7 +770,7 @@ export async function LinkChanged(connectingSiteId, linkId) { } } -async function onApplicationNetworkChange(action, tableName, id) { +async function onApplicationNetworkChange(action, id) { let hash = null; let doUpdate = false;