Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 150 additions & 34 deletions NiFiDeploy.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,119 @@ def handleUndeploy() {
}
}

// reorder the processGroup arraylist and put 'root' at the end
// this makes sure the root process group is undeployed last, preventing
// dependency issues with process groups that connect to processors in
// the root process group
def rootIndex = conf.nifi?.undeploy?.processGroups?.indexOf('root')
if (rootIndex >= 0){
assert conf.nifi?.undeploy?.processGroups?.remove(rootIndex)
assert conf.nifi?.undeploy?.processGroups?.add('root')
}

conf.nifi?.undeploy?.processGroups?.each { pgName ->
println "Undeploying Process Group: $pgName"
def pg = processGroups.findAll { it.name == pgName }
if (pg.isEmpty()) {
println "[WARN] No such process group found in NiFi"
return
if (pgName == 'root'){
resp = nifi.get (
path: "controller/process-groups/root"
)
assert resp.status == 200
pg = resp.data.processGroup
}
else {
pgAll = processGroups.findAll { it.name == pgName }
if (pgAll.isEmpty()){
println "[WARN] No such process group found in NiFi"
return
}
assert pgAll.size() == 1 : "Ambiguous process group name"
pg = pgAll[0]
}
assert pg.size() == 1 : "Ambiguous process group name"

def id = pg[0].id

stopProcessGroup(id)

// now delete it
def id = pg.id
stopProcessGroup(pg)
updateToLatestRevision()
resp = nifi.delete(
path: "controller/process-groups/root/process-group-references/$id",
query: [
clientId: client,
version: currentRevision
]
)
assert resp.status == 200

if (pgName == 'root'){
// get and delete all connections
resp = nifi.get(
path: "controller/process-groups/root/connections"
)
assert resp.status == 200
rootConnections = resp.data.connections
rootConnections.each { rootConnection ->
deleteConnection('root', rootConnection.id)
}
// get and delete all processors
resp = nifi.get(
path: "controller/process-groups/root/processors"
)
assert resp.status == 200
rootProcessors = resp.data.processors
rootProcessors.each { rootProcessor ->
stopProcessor('root',rootProcessor.id)
def resp = nifi.delete(
path: "controller/process-groups/root/processors/$rootProcessor.id",
query: [
clientId: client,
version: currentRevision
]
)
assert resp.status == 200
}
// get and delete all funnels
resp = nifi.get(
path: "controller/process-groups/root/funnels"
)
assert resp.status == 200
rootFunnels = resp.data.funnels
rootFunnels.each { rootFunnel ->
def resp = nifi.delete(
path: "controller/process-groups/root/funnels/$rootFunnel.id",
query: [
clientId: client,
version: currentRevision
]
)
assert resp.status == 200
}
return
}
else {
// get all incoming and outgoing connections and delete them
processGroupParentId = pg.parent.id
resp = nifi.get(
path: "controller/process-groups/$processGroupParentId/connections"
)
assert resp.status == 200
processGroupConnections = resp.data.connections
processGroupConnections.each { groupConnection ->
// if process group is destination for connection, stop and delete source connections
if (groupConnection.destination.groupId == id){
if (groupConnection.destination.type != "FUNNEL"){
stopProcessor(groupConnection.source.groupId,groupConnection.source.id)
}
deleteConnection(groupConnection.parentGroupId,groupConnection.id)
}
// if process group is source of connection, stop and delete destination connections
if (groupConnection.source.groupId == id){
if (groupConnection.destination.type != "FUNNEL"){
stopProcessor(groupConnection.destination.groupId,groupConnection.destination.id)
}
deleteConnection(groupConnection.parentGroupId,groupConnection.id)
}
}
// now delete the process group
updateToLatestRevision()
resp = nifi.delete(
path: "controller/process-groups/root/process-group-references/$id",
query: [
clientId: client,
version: currentRevision
]
)
assert resp.status == 200
}
}

conf.nifi?.undeploy?.templates?.each { tName ->
Expand Down Expand Up @@ -261,9 +351,18 @@ def handleProcessGroup(Map.Entry pgConfig) {
updateToLatestRevision()

def pgName = pgConfig.key
def pg = processGroups.find { it.name == pgName }
assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?"
def pgId = pg.id
if (pgName == 'root'){
resp = nifi.get (
path: "controller/process-groups/root"
)
assert resp.status == 200
pg = resp.data.processGroup
}
else {
pg = processGroups.find { it.name == pgName }
assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?"
}
pgId = pg.id

println "Process Group: $pgConfig.key ($pgId)"
//println pgConfig
Expand All @@ -289,7 +388,7 @@ def handleProcessGroup(Map.Entry pgConfig) {

resp = nifi.put (
path: "controller/process-groups/$pgId",
body: builder.toPrettyString(),
body: builder.toString(),
requestContentType: JSON
)
assert resp.status == 200
Expand Down Expand Up @@ -331,7 +430,9 @@ def handleProcessGroup(Map.Entry pgConfig) {
properties {
procProps.each { p ->
// check if it's a ${referenceToControllerServiceName}
def ref = p.value =~ /\$\{(.*)}/
// does not work with function names like ${event_type:toLower():equals('ad')}
// so accept all characters except :, ( and )
def ref = p.value =~ /\$\{([^:()]+)}/
if (ref) {
def name = ref[0][1] // grab the first capture group (nested inside ArrayList)
// lookup the CS by name and get the newly generated ID instead of the one in a template
Expand All @@ -353,7 +454,7 @@ def handleProcessGroup(Map.Entry pgConfig) {

resp = nifi.put (
path: "controller/process-groups/$pgId/processors/$procId",
body: builder.toPrettyString(),
body: builder.toString(),
requestContentType: JSON
)
assert resp.status == 200
Expand All @@ -368,7 +469,7 @@ def handleProcessGroup(Map.Entry pgConfig) {
}

println "Starting Process Group: $pgName ($pgId)"
startProcessGroup(pgId)
startProcessGroup(pg)
}

def handleControllerService(Map.Entry cfg) {
Expand Down Expand Up @@ -412,7 +513,7 @@ def handleControllerService(Map.Entry cfg) {

resp = nifi.put (
path: "controller/controller-services/NODE/$cs.id",
body: builder.toPrettyString(),
body: builder.toString(),
requestContentType: JSON
)
assert resp.status == 200
Expand Down Expand Up @@ -456,20 +557,21 @@ private _changeProcessorState(processGroupId, processorId, boolean running) {
//println builder.toPrettyString()
resp = nifi.put (
path: "controller/process-groups/$processGroupId/processors/$processorId",
body: builder.toPrettyString(),
body: builder.toString(),
requestContentType: JSON
)
assert resp.status == 200
currentRevision = resp.data.revision.version
}

def startProcessGroup(pgId) {
_changeProcessGroupState(pgId, true)
def startProcessGroup(pg) {
_changeProcessGroupState(pg, true)
}

def stopProcessGroup(pgId) {
def stopProcessGroup(pg) {
def pgId = pg.id
print "Waiting for a Process Group to stop: $pgId "
_changeProcessGroupState(pgId, false)
_changeProcessGroupState(pg, false)


int maxWait = 1000 * 30 // up to X seconds
Expand All @@ -493,10 +595,13 @@ def stopProcessGroup(pgId) {
}
}

private _changeProcessGroupState(pgId, boolean running) {
private _changeProcessGroupState(pg, boolean running) {
def pgId = pg.id
if (pg.name == 'NiFi Flow'){ path = "controller/process-groups/root" }
else { path = "controller/process-groups/root/process-group-references/$pgId" }
updateToLatestRevision()
def resp = nifi.put(
path: "controller/process-groups/root/process-group-references/$pgId",
path: path,
body: [
running: running,
client: client,
Expand Down Expand Up @@ -548,12 +653,23 @@ private _changeControllerServiceState(csId, boolean enabled) {

resp = nifi.put(
path: "controller/controller-services/NODE/$csId",
body: builder.toPrettyString(),
body: builder.toString(),
requestContentType: JSON
)
assert resp.status == 200
}

def deleteConnection(pGId,cId){
resp = nifi.delete(
path: "controller/process-groups/$pGId/connections/$cId",
query: [
clientId: client,
version: currentRevision
]
)
assert resp.status == 200
}

// script flow below

conf = new Yaml().load(new File(deploymentSpec).text)
Expand Down