From dbbfbdf1f9d2e3a102150475ab2bb478655f5526 Mon Sep 17 00:00:00 2001 From: Jan van Bemmelen Date: Thu, 19 May 2016 16:37:36 +0200 Subject: [PATCH] gracefullShutdown: wait for queues to empty before undeploying This change allows the deployment of a template to be dependable on the status of the queues. If gracefullShutdown is set to true it will wait for all queues to become empty before continuing with the undeployment. Here's the flow of the gracefullShutdown() function: - find all processors that are only listed as source and not as destination - stop the found processors. This should stop the input from external systems into the NiFi flow. - wait for a maximum amount of time for all queues to empty. - with all queues empty the undeploy and import of the template can continue. --- NiFiDeploy.groovy | 90 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 8372fb8..d39cc2c 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -67,6 +67,94 @@ if (opts.debug) { // implementation methods below +def handleGracefullShutdown() { + if (!conf.nifi.gracefullShutdown){ + return + } + + println "Attempting to gracefully shut down the pipeline" + + // get the processGroups that have 0 incoming connections + // and add to ArrayList + def checkProcessGroups = ['root'] + processGroups.each { pcGroup -> + if (pcGroup.inputPortCount == 0){ + checkProcessGroups.add(pcGroup.id) + } + } + + // get all processors in the found processGroups and add them to hashmap + // processorConnectionMap. This will be used to count the number of + // destinations + // the processorPgMap hashmap will be used to lookup processorIds with + // processGroupIds + def processorConnectionMap = [:] + def processorPgMap = [:] + checkProcessGroups.each { pcGroupId -> + def resp = nifi.get( + path : "controller/process-groups/$pcGroupId/processors" + ) + assert resp.status == 200 + processors = resp.data.processors + } + processors.each { processor -> + processorConnectionMap.put(processor.id,0) + processorPgMap.put(processor.id, processor.parentGroupId) + } + + // get all connections in the found processGroups + def connections = "" + checkProcessGroups.each { pcGroupId -> + def resp = nifi.get( + path : "controller/process-groups/$pcGroupId/connections" + ) + assert resp.status == 200 + connections = resp.data.connections + } + + // count destinations per processor + connections.each { connection -> + if (processorConnectionMap.containsKey(connection.destination.id)){ + processorConnectionMap[connection.destination.id] = processorConnectionMap[connection.destination.id] + 1 + } + } + + // shut down each processor that is not a destination (destination count == 0) + processorConnectionMap.each { processor -> + currentProcessGroup = processorPgMap[processor.key] + if (processor.value == 0){ + println "Stopping processor $processor.key in processGroup $currentProcessGroup" + stopProcessor(currentProcessGroup, processor.key) + } + } + + int maxWait = 1000 * 300 // up to 5 minutes + long start = System.currentTimeMillis() + // wait till all queues are empty or maxWait is reached + while ((System.currentTimeMillis() < (start + maxWait)) && (getNumberOfQueuedFiles() > 0)){ + println "Number of queued files: $numberOfQueuedFiles" + sleep(5000) + } + if (getNumberOfQueuedFiles() == 0){ + println "All queues are empty" + } + else { + println "[ERROR] Max wait time reached waiting for queues to flush. Exiting." + System.exit(-1) + } +} + +def getNumberOfQueuedFiles() { + def resp = nifi.get( + path : "controller/status" + ) + assert resp.status == 200 + status = resp.data + queued = status.controllerStatus.queued.split(' ') + numberOfQueuedFiles = queued[0].toInteger() + return numberOfQueuedFiles +} + def handleUndeploy() { if (!conf.nifi.undeploy) { return @@ -587,6 +675,8 @@ currentRevision = -1 // used for optimistic concurrency throughout the REST API processGroups = null loadProcessGroups() +handleGracefullShutdown() + handleUndeploy() templateId = null // will be assigned on import into NiFi