Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions NiFiDeploy.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,94 @@ if (opts.debug) {

// implementation methods below

def handleGracefullShutdown() {
if (!conf.nifi.gracefullShutdown){
return
}

println "Attempting to gracefully shut down the pipeline"

// get the processGroups that have 0 incoming connections
// and add to ArrayList
def checkProcessGroups = ['root']
processGroups.each { pcGroup ->
if (pcGroup.inputPortCount == 0){
checkProcessGroups.add(pcGroup.id)
}
}

// get all processors in the found processGroups and add them to hashmap
// processorConnectionMap. This will be used to count the number of
// destinations
// the processorPgMap hashmap will be used to lookup processorIds with
// processGroupIds
def processorConnectionMap = [:]
def processorPgMap = [:]
checkProcessGroups.each { pcGroupId ->
def resp = nifi.get(
path : "controller/process-groups/$pcGroupId/processors"
)
assert resp.status == 200
processors = resp.data.processors
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} should be outside of processors loop to get all processors in the map

processors.each { processor ->
processorConnectionMap.put(processor.id,0)
processorPgMap.put(processor.id, processor.parentGroupId)
}

// get all connections in the found processGroups
def connections = ""
checkProcessGroups.each { pcGroupId ->
def resp = nifi.get(
path : "controller/process-groups/$pcGroupId/connections"
)
assert resp.status == 200
connections = resp.data.connections
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} should be outside of connections loop to count for connections of each process groups


// count destinations per processor
connections.each { connection ->
if (processorConnectionMap.containsKey(connection.destination.id)){
processorConnectionMap[connection.destination.id] = processorConnectionMap[connection.destination.id] + 1
}
}

// shut down each processor that is not a destination (destination count == 0)
processorConnectionMap.each { processor ->
currentProcessGroup = processorPgMap[processor.key]
if (processor.value == 0){
println "Stopping processor $processor.key in processGroup $currentProcessGroup"
stopProcessor(currentProcessGroup, processor.key)
}
}

int maxWait = 1000 * 300 // up to 5 minutes
long start = System.currentTimeMillis()
// wait till all queues are empty or maxWait is reached
while ((System.currentTimeMillis() < (start + maxWait)) && (getNumberOfQueuedFiles() > 0)){
println "Number of queued files: $numberOfQueuedFiles"
sleep(5000)
}
if (getNumberOfQueuedFiles() == 0){
println "All queues are empty"
}
else {
println "[ERROR] Max wait time reached waiting for queues to flush. Exiting."
System.exit(-1)
}
}

def getNumberOfQueuedFiles() {
def resp = nifi.get(
path : "controller/status"
)
assert resp.status == 200
status = resp.data
queued = status.controllerStatus.queued.split(' ')
numberOfQueuedFiles = queued[0].toInteger()
return numberOfQueuedFiles
}

def handleUndeploy() {
if (!conf.nifi.undeploy) {
return
Expand Down Expand Up @@ -587,6 +675,8 @@ currentRevision = -1 // used for optimistic concurrency throughout the REST API
processGroups = null
loadProcessGroups()

handleGracefullShutdown()

handleUndeploy()

templateId = null // will be assigned on import into NiFi
Expand Down