From 79101fa93d4f93f1fb63b4dcf6c97686ef375184 Mon Sep 17 00:00:00 2001 From: Jan van Bemmelen Date: Tue, 17 May 2016 16:22:24 +0200 Subject: [PATCH 1/2] Trying to import this template results in a number of errors related to: - processors in the root process group. The import of templates with processors in the root process group fails because such a process group is internally know as 'NiFi Flow' but cannot be reached by that name through the API. This change ff ixes this issue by setting, depending on the pgName, the correct path in _changee ProcessGroupState(). - literal json blocks are not correctly accepted with toPrettyString(). I replacc ed those calls with toString(). Look for the '|-' string in sample_template.yml for examples of where this error occurs. - the controller service name regex lookup fails when a function name is used inn the name. For instance a name like ${event_type:toLower():equals('ad')} cannot be looked up and will result in an error. I changed the regex to accept any namee , except ones that hold :, ( or ) characters. --- NiFiDeploy.groovy | 49 +++++++++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 17 deletions(-) diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 8372fb8..507a4bb 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -108,7 +108,7 @@ def handleUndeploy() { def id = pg[0].id - stopProcessGroup(id) + stopProcessGroup(pg) // now delete it updateToLatestRevision() @@ -261,9 +261,18 @@ def handleProcessGroup(Map.Entry pgConfig) { updateToLatestRevision() def pgName = pgConfig.key - def pg = processGroups.find { it.name == pgName } - assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?" - def pgId = pg.id + if (pgName == 'root'){ + resp = nifi.get ( + path: "controller/process-groups/root" + ) + assert resp.status == 200 + pg = resp.data.processGroup + } + else { + pg = processGroups.find { it.name == pgName } + assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?" + } + pgId = pg.id println "Process Group: $pgConfig.key ($pgId)" //println pgConfig @@ -289,7 +298,7 @@ def handleProcessGroup(Map.Entry pgConfig) { resp = nifi.put ( path: "controller/process-groups/$pgId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -331,7 +340,9 @@ def handleProcessGroup(Map.Entry pgConfig) { properties { procProps.each { p -> // check if it's a ${referenceToControllerServiceName} - def ref = p.value =~ /\$\{(.*)}/ + // does not work with function names like ${event_type:toLower():equals('ad')} + // so accept all characters except :, ( and ) + def ref = p.value =~ /\$\{([^:()]+)}/ if (ref) { def name = ref[0][1] // grab the first capture group (nested inside ArrayList) // lookup the CS by name and get the newly generated ID instead of the one in a template @@ -353,7 +364,7 @@ def handleProcessGroup(Map.Entry pgConfig) { resp = nifi.put ( path: "controller/process-groups/$pgId/processors/$procId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -368,7 +379,7 @@ def handleProcessGroup(Map.Entry pgConfig) { } println "Starting Process Group: $pgName ($pgId)" - startProcessGroup(pgId) + startProcessGroup(pg) } def handleControllerService(Map.Entry cfg) { @@ -412,7 +423,7 @@ def handleControllerService(Map.Entry cfg) { resp = nifi.put ( path: "controller/controller-services/NODE/$cs.id", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 @@ -456,20 +467,21 @@ private _changeProcessorState(processGroupId, processorId, boolean running) { //println builder.toPrettyString() resp = nifi.put ( path: "controller/process-groups/$processGroupId/processors/$processorId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 currentRevision = resp.data.revision.version } -def startProcessGroup(pgId) { - _changeProcessGroupState(pgId, true) +def startProcessGroup(pg) { + _changeProcessGroupState(pg, true) } -def stopProcessGroup(pgId) { +def stopProcessGroup(pg) { + def pgId = pg.id print "Waiting for a Process Group to stop: $pgId " - _changeProcessGroupState(pgId, false) + _changeProcessGroupState(pg, false) int maxWait = 1000 * 30 // up to X seconds @@ -493,10 +505,13 @@ def stopProcessGroup(pgId) { } } -private _changeProcessGroupState(pgId, boolean running) { +private _changeProcessGroupState(pg, boolean running) { + def pgId = pg.id + if (pg.name == 'NiFi Flow'){ path = "controller/process-groups/root" } + else { path = "controller/process-groups/root/process-group-references/$pgId" } updateToLatestRevision() def resp = nifi.put( - path: "controller/process-groups/root/process-group-references/$pgId", + path: path, body: [ running: running, client: client, @@ -548,7 +563,7 @@ private _changeControllerServiceState(csId, boolean enabled) { resp = nifi.put( path: "controller/controller-services/NODE/$csId", - body: builder.toPrettyString(), + body: builder.toString(), requestContentType: JSON ) assert resp.status == 200 From 889ffe6866a892a36db1e8254f7d9b5fbb90372b Mon Sep 17 00:00:00 2001 From: Jan van Bemmelen Date: Thu, 19 May 2016 15:34:53 +0200 Subject: [PATCH 2/2] Get handleUndeploy() to correctly handle undeploying root PGs --- NiFiDeploy.groovy | 135 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 118 insertions(+), 17 deletions(-) diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 507a4bb..9a4605d 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -97,29 +97,119 @@ def handleUndeploy() { } } + // reorder the processGroup arraylist and put 'root' at the end + // this makes sure the root process group is undeployed last, preventing + // dependency issues with process groups that connect to processors in + // the root process group + def rootIndex = conf.nifi?.undeploy?.processGroups?.indexOf('root') + if (rootIndex >= 0){ + assert conf.nifi?.undeploy?.processGroups?.remove(rootIndex) + assert conf.nifi?.undeploy?.processGroups?.add('root') + } + conf.nifi?.undeploy?.processGroups?.each { pgName -> println "Undeploying Process Group: $pgName" - def pg = processGroups.findAll { it.name == pgName } - if (pg.isEmpty()) { - println "[WARN] No such process group found in NiFi" - return + if (pgName == 'root'){ + resp = nifi.get ( + path: "controller/process-groups/root" + ) + assert resp.status == 200 + pg = resp.data.processGroup + } + else { + pgAll = processGroups.findAll { it.name == pgName } + if (pgAll.isEmpty()){ + println "[WARN] No such process group found in NiFi" + return + } + assert pgAll.size() == 1 : "Ambiguous process group name" + pg = pgAll[0] } - assert pg.size() == 1 : "Ambiguous process group name" - - def id = pg[0].id + def id = pg.id stopProcessGroup(pg) - - // now delete it updateToLatestRevision() - resp = nifi.delete( - path: "controller/process-groups/root/process-group-references/$id", - query: [ - clientId: client, - version: currentRevision - ] - ) - assert resp.status == 200 + + if (pgName == 'root'){ + // get and delete all connections + resp = nifi.get( + path: "controller/process-groups/root/connections" + ) + assert resp.status == 200 + rootConnections = resp.data.connections + rootConnections.each { rootConnection -> + deleteConnection('root', rootConnection.id) + } + // get and delete all processors + resp = nifi.get( + path: "controller/process-groups/root/processors" + ) + assert resp.status == 200 + rootProcessors = resp.data.processors + rootProcessors.each { rootProcessor -> + stopProcessor('root',rootProcessor.id) + def resp = nifi.delete( + path: "controller/process-groups/root/processors/$rootProcessor.id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } + // get and delete all funnels + resp = nifi.get( + path: "controller/process-groups/root/funnels" + ) + assert resp.status == 200 + rootFunnels = resp.data.funnels + rootFunnels.each { rootFunnel -> + def resp = nifi.delete( + path: "controller/process-groups/root/funnels/$rootFunnel.id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } + return + } + else { + // get all incoming and outgoing connections and delete them + processGroupParentId = pg.parent.id + resp = nifi.get( + path: "controller/process-groups/$processGroupParentId/connections" + ) + assert resp.status == 200 + processGroupConnections = resp.data.connections + processGroupConnections.each { groupConnection -> + // if process group is destination for connection, stop and delete source connections + if (groupConnection.destination.groupId == id){ + if (groupConnection.destination.type != "FUNNEL"){ + stopProcessor(groupConnection.source.groupId,groupConnection.source.id) + } + deleteConnection(groupConnection.parentGroupId,groupConnection.id) + } + // if process group is source of connection, stop and delete destination connections + if (groupConnection.source.groupId == id){ + if (groupConnection.destination.type != "FUNNEL"){ + stopProcessor(groupConnection.destination.groupId,groupConnection.destination.id) + } + deleteConnection(groupConnection.parentGroupId,groupConnection.id) + } + } + // now delete the process group + updateToLatestRevision() + resp = nifi.delete( + path: "controller/process-groups/root/process-group-references/$id", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 + } } conf.nifi?.undeploy?.templates?.each { tName -> @@ -569,6 +659,17 @@ private _changeControllerServiceState(csId, boolean enabled) { assert resp.status == 200 } +def deleteConnection(pGId,cId){ + resp = nifi.delete( + path: "controller/process-groups/$pGId/connections/$cId", + query: [ + clientId: client, + version: currentRevision + ] + ) + assert resp.status == 200 +} + // script flow below conf = new Yaml().load(new File(deploymentSpec).text)