diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 8372fb8..9a4605d 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -97,29 +97,119 @@ def handleUndeploy() { } } + // reorder the processGroup arraylist and put 'root' at the end + // this makes sure the root process group is undeployed last, preventing + // dependency issues with process groups that connect to processors in + // the root process group + def rootIndex = conf.nifi?.undeploy?.processGroups?.indexOf('root') + if (rootIndex >= 0){ + assert conf.nifi?.undeploy?.processGroups?.remove(rootIndex) + assert conf.nifi?.undeploy?.processGroups?.add('root') + } + conf.nifi?.undeploy?.processGroups?.each { pgName -> println "Undeploying Process Group: $pgName" - def pg = processGroups.findAll { it.name == pgName } - if (pg.isEmpty()) { - println "[WARN] No such process group found in NiFi" - return + if (pgName == 'root'){ + resp = nifi.get ( + path: "controller/process-groups/root" + ) + assert resp.status == 200 + pg = resp.data.processGroup + } + else { + pgAll = processGroups.findAll { it.name == pgName } + if (pgAll.isEmpty()){ + println "[WARN] No such process group found in NiFi" + return + } + assert pgAll.size() == 1 : "Ambiguous process group name" + pg = pgAll[0] } - assert pg.size() == 1 : "Ambiguous process group name" - - def id = pg[0].id - - stopProcessGroup(id) - // now delete it + def id = pg.id + stopProcessGroup(pg) updateToLatestRevision() - resp = nifi.delete( - path: "controller/process-groups/root/process-group-references/$id", - query: [ - clientId: client, - version: currentRevision - ] - ) - assert resp.status == 200 + + if (pgName == 'root'){ + // get and delete all connections + resp = nifi.get( + path: "controller/process-groups/root/connections" + ) + assert resp.status == 200 + rootConnections = resp.data.connections + rootConnections.each { rootConnection -> + deleteConnection('root', rootConnection.id) + } + // get and delete all processors + resp = nifi.get( + path: "controller/process-groups/root/processors" + ) + assert resp.status == 200 + rootProcessors = resp.data.processors + rootProcessors.each { rootProcessor -> + stopProcessor('root',rootProcessor.id) + def resp = nifi.delete( + path: "controller/process-groups/root/processors/$rootProcessor.id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } + // get and delete all funnels + resp = nifi.get( + path: "controller/process-groups/root/funnels" + ) + assert resp.status == 200 + rootFunnels = resp.data.funnels + rootFunnels.each { rootFunnel -> + def resp = nifi.delete( + path: "controller/process-groups/root/funnels/$rootFunnel.id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } + return + } + else { + // get all incoming and outgoing connections and delete them + processGroupParentId = pg.parent.id + resp = nifi.get( + path: "controller/process-groups/$processGroupParentId/connections" + ) + assert resp.status == 200 + processGroupConnections = resp.data.connections + processGroupConnections.each { groupConnection -> + // if process group is destination for connection, stop and delete source connections + if (groupConnection.destination.groupId == id){ + if (groupConnection.destination.type != "FUNNEL"){ + stopProcessor(groupConnection.source.groupId,groupConnection.source.id) + } + deleteConnection(groupConnection.parentGroupId,groupConnection.id) + } + // if process group is source of connection, stop and delete destination connections + if (groupConnection.source.groupId == id){ + if (groupConnection.destination.type != "FUNNEL"){ + stopProcessor(groupConnection.destination.groupId,groupConnection.destination.id) + } + deleteConnection(groupConnection.parentGroupId,groupConnection.id) + } + } + // now delete the process group + updateToLatestRevision() + resp = nifi.delete( + path: "controller/process-groups/root/process-group-references/$id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } } conf.nifi?.undeploy?.templates?.each { tName -> @@ -261,9 +351,18 @@ def handleProcessGroup(Map.Entry pgConfig) { updateToLatestRevision() def pgName = pgConfig.key - def pg = processGroups.find { it.name == pgName } - assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?" - def pgId = pg.id + if (pgName == 'root'){ + resp = nifi.get ( + path: "controller/process-groups/root" + ) + assert resp.status == 200 + pg = resp.data.processGroup + } + else { + pg = processGroups.find { it.name == pgName } + assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?" + } + pgId = pg.id println "Process Group: $pgConfig.key ($pgId)" //println pgConfig @@ -289,7 +388,7 @@ def handleProcessGroup(Map.Entry pgConfig) { resp = nifi.put ( path: "controller/process-groups/$pgId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -331,7 +430,9 @@ def handleProcessGroup(Map.Entry pgConfig) { properties { procProps.each { p -> // check if it's a ${referenceToControllerServiceName} - def ref = p.value =~ /\$\{(.*)}/ + // does not work with function names like ${event_type:toLower():equals('ad')} + // so accept all characters except :, ( and ) + def ref = p.value =~ /\$\{([^:()]+)}/ if (ref) { def name = ref[0][1] // grab the first capture group (nested inside ArrayList) // lookup the CS by name and get the newly generated ID instead of the one in a template @@ -353,7 +454,7 @@ def handleProcessGroup(Map.Entry pgConfig) { resp = nifi.put ( path: "controller/process-groups/$pgId/processors/$procId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -368,7 +469,7 @@ def handleProcessGroup(Map.Entry pgConfig) { } println "Starting Process Group: $pgName ($pgId)" - startProcessGroup(pgId) + startProcessGroup(pg) } def handleControllerService(Map.Entry cfg) { @@ -412,7 +513,7 @@ def handleControllerService(Map.Entry cfg) { resp = nifi.put ( path: "controller/controller-services/NODE/$cs.id", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -456,20 +557,21 @@ private _changeProcessorState(processGroupId, processorId, boolean running) { //println builder.toPrettyString() resp = nifi.put ( path: "controller/process-groups/$processGroupId/processors/$processorId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 currentRevision = resp.data.revision.version } -def startProcessGroup(pgId) { - _changeProcessGroupState(pgId, true) +def startProcessGroup(pg) { + _changeProcessGroupState(pg, true) } -def stopProcessGroup(pgId) { +def stopProcessGroup(pg) { + def pgId = pg.id print "Waiting for a Process Group to stop: $pgId " - _changeProcessGroupState(pgId, false) + _changeProcessGroupState(pg, false) int maxWait = 1000 * 30 // up to X seconds @@ -493,10 +595,13 @@ def stopProcessGroup(pgId) { } } -private _changeProcessGroupState(pgId, boolean running) { +private _changeProcessGroupState(pg, boolean running) { + def pgId = pg.id + if (pg.name == 'NiFi Flow'){ path = "controller/process-groups/root" } + else { path = "controller/process-groups/root/process-group-references/$pgId" } updateToLatestRevision() def resp = nifi.put( - path: "controller/process-groups/root/process-group-references/$pgId", + path: path, body: [ running: running, client: client, @@ -548,12 +653,23 @@ private _changeControllerServiceState(csId, boolean enabled) { resp = nifi.put( path: "controller/controller-services/NODE/$csId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 } +def deleteConnection(pGId,cId){ + resp = nifi.delete( + path: "controller/process-groups/$pGId/connections/$cId", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 +} + // script flow below conf = new Yaml().load(new File(deploymentSpec).text)