diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 8372fb8..d39cc2c 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -67,6 +67,94 @@ if (opts.debug) { // implementation methods below +def handleGracefullShutdown() { + if (!conf.nifi.gracefullShutdown){ + return + } + + println "Attempting to gracefully shut down the pipeline" + + // get the processGroups that have 0 incoming connections + // and add to ArrayList + def checkProcessGroups = ['root'] + processGroups.each { pcGroup -> + if (pcGroup.inputPortCount == 0){ + checkProcessGroups.add(pcGroup.id) + } + } + + // get all processors in the found processGroups and add them to hashmap + // processorConnectionMap. This will be used to count the number of + // destinations + // the processorPgMap hashmap will be used to lookup processorIds with + // processGroupIds + def processorConnectionMap = [:] + def processorPgMap = [:] + checkProcessGroups.each { pcGroupId -> + def resp = nifi.get( + path : "controller/process-groups/$pcGroupId/processors" + ) + assert resp.status == 200 + processors = resp.data.processors + } + processors.each { processor -> + processorConnectionMap.put(processor.id,0) + processorPgMap.put(processor.id, processor.parentGroupId) + } + + // get all connections in the found processGroups + def connections = "" + checkProcessGroups.each { pcGroupId -> + def resp = nifi.get( + path : "controller/process-groups/$pcGroupId/connections" + ) + assert resp.status == 200 + connections = resp.data.connections + } + + // count destinations per processor + connections.each { connection -> + if (processorConnectionMap.containsKey(connection.destination.id)){ + processorConnectionMap[connection.destination.id] = processorConnectionMap[connection.destination.id] + 1 + } + } + + // shut down each processor that is not a destination (destination count == 0) + processorConnectionMap.each { processor -> + currentProcessGroup = processorPgMap[processor.key] + if (processor.value == 0){ + println "Stopping processor $processor.key in processGroup $currentProcessGroup" + stopProcessor(currentProcessGroup, processor.key) + } + } + + int maxWait = 1000 * 300 // up to 5 minutes + long start = System.currentTimeMillis() + // wait till all queues are empty or maxWait is reached + while ((System.currentTimeMillis() < (start + maxWait)) && (getNumberOfQueuedFiles() > 0)){ + println "Number of queued files: $numberOfQueuedFiles" + sleep(5000) + } + if (getNumberOfQueuedFiles() == 0){ + println "All queues are empty" + } + else { + println "[ERROR] Max wait time reached waiting for queues to flush. Exiting." + System.exit(-1) + } +} + +def getNumberOfQueuedFiles() { + def resp = nifi.get( + path : "controller/status" + ) + assert resp.status == 200 + status = resp.data + queued = status.controllerStatus.queued.split(' ') + numberOfQueuedFiles = queued[0].toInteger() + return numberOfQueuedFiles +} + def handleUndeploy() { if (!conf.nifi.undeploy) { return @@ -587,6 +675,8 @@ currentRevision = -1 // used for optimistic concurrency throughout the REST API processGroups = null loadProcessGroups() +handleGracefullShutdown() + handleUndeploy() templateId = null // will be assigned on import into NiFi