diff --git a/NiFiDeploy.groovy b/NiFiDeploy.groovy index 8372fb8..799917d 100644 --- a/NiFiDeploy.groovy +++ b/NiFiDeploy.groovy @@ -13,33 +13,33 @@ import static groovyx.net.http.Method.POST @Grab(group='org.codehaus.groovy.modules.http-builder', - module='http-builder', - version='0.7.1') + module='http-builder', + version='0.7.1') @Grab(group='org.yaml', - module='snakeyaml', - version='1.17') + module='snakeyaml', + version='1.17') @Grab(group='org.apache.httpcomponents', - module='httpmime', - version='4.2.1') + module='httpmime', + version='4.2.1') // see actual script content at the bottom of the text, // after every implementation method. Groovy compiler likes these much better def cli = new CliBuilder(usage: 'groovy NiFiDeploy.groovy [options]', - header: 'Options:') + header: 'Options:') cli.with { f longOpt: 'file', - 'Deployment specification file in a YAML format', - args:1, argName:'name', type:String.class + 'Deployment specification file in a YAML format', + args:1, argName:'name', type:String.class h longOpt: 'help', 'This usage screen' d longOpt: 'debug', 'Debug underlying HTTP wire communication' n longOpt: 'nifi-api', 'NiFi REST API (override), e.g. http://example.com:9090', - args:1, argName:'http://host:port', type:String.class + args:1, argName:'http://host:port', type:String.class t longOpt: 'template', 'Template URI (override)', - args:1, argName:'uri', type:String.class + args:1, argName:'uri', type:String.class c longOpt: 'client-id', 'Client ID for API calls, any unique string (override)', - args:1, argName:'id', type:String.class + args:1, argName:'id', type:String.class } def opts = cli.parse(args) @@ -71,115 +71,121 @@ def handleUndeploy() { if (!conf.nifi.undeploy) { return } + processGroups = loadProcessGroups() + // delete templates // stop & remove controller services // stop & remove process groups - // delete templates - // TODO not optimal (would rather save all CS in state), but ok for now - conf.nifi?.undeploy?.controllerServices?.each { csName -> - print "Undeploying Controller Service: $csName" - def cs = lookupControllerService(csName) - if (cs) { - println " ($cs.id)" - stopControllerService(cs.id) - updateToLatestRevision() + + conf.nifi?.undeploy?.templates?.each { tName -> + println "Deleting template: $tName" + def t = lookupTemplate(tName) + if (t) { def resp = nifi.delete( - path: "controller/controller-services/NODE/$cs.id", - query: [ - clientId: client, - version: currentRevision - ] + path: "templates/$t.id" ) assert resp.status == 200 - } else { - println '' + println "Deleted template : $tName" } } - conf.nifi?.undeploy?.processGroups?.each { pgName -> + + conf.nifi?.undeploy?.processGroups?.each { pgConfig -> + pgName = pgConfig.key println "Undeploying Process Group: $pgName" - def pg = processGroups.findAll { it.name == pgName } + + // getting pgId + def pg = processGroups.findAll { it.component.name == pgName } if (pg.isEmpty()) { println "[WARN] No such process group found in NiFi" return } assert pg.size() == 1 : "Ambiguous process group name" - def id = pg[0].id + def pgId = pg[0].id + + pgConfig.value.controllerServices.each { csName -> + print "Undeploying Controller Service: $csName" + def cs = lookupControllerService(pgId, csName) + if (cs) { + println " ($cs.id)" + stopControllerService(cs.id) + + def resp_get = nifi.get ( + path: "controller-services/$cs.id" + ) + + def resp = nifi.delete( + path: "controller-services/$cs.id", + query: [ + clientId: client, + version: resp_get.data.revision.version + ] + ) + assert resp.status == 200 + } else { + println '' + } + } - stopProcessGroup(id) + stopProcessGroup(pgId) // now delete it - updateToLatestRevision() resp = nifi.delete( - path: "controller/process-groups/root/process-group-references/$id", - query: [ - clientId: client, - version: currentRevision - ] + path: "process-groups/$pgId", + query: [ + clientId: client, + version: pg[0].revision.version + ] ) + println "[INFO] Deleted process group ${pgName}" assert resp.status == 200 } - conf.nifi?.undeploy?.templates?.each { tName -> - println "Deleting template: $tName" - def t = lookupTemplate(tName) - if (t) { - updateToLatestRevision() - def resp = nifi.delete( - path: "controller/templates/$t.id", - query: [ - clientId: client, - version: currentRevision - ] - ) - assert resp.status == 200 - } - } } /** - Returns a json-backed controller service structure from NiFi -*/ -def lookupControllerService(String name) { + Returns a json-backed controller service structure from NiFi + */ +def lookupControllerService(pgId, name) { def resp = nifi.get( - path: 'controller/controller-services/NODE' + path: "flow/process-groups/${pgId}/controller-services" ) assert resp.status == 200 - if (resp.data.controllerServices.name.grep(name).isEmpty()) { + if (resp.data.controllerServices.component.name.grep(name).isEmpty()) { return } - assert resp.data.controllerServices.name.grep(name).size() == 1 : - "Multiple controller services found named '$name'" + assert resp.data.controllerServices.component.name.grep(name).size().toInteger() == 1 : + "Multiple controller services found named '$name'" // println prettyPrint(toJson(resp.data)) - def cs = resp.data.controllerServices.find { it.name == name } + def cs = resp.data.controllerServices.component.find { it.name == name } assert cs != null return cs } /** - Returns a json-backed template structure from NiFi. Null if not found. -*/ + Returns a json-backed template structure from NiFi. Null if not found. + */ def lookupTemplate(String name) { def resp = nifi.get( - path: 'controller/templates' + path: 'flow/templates' ) assert resp.status == 200 - if (resp.data.templates.name.grep(name).isEmpty()) { + if (resp.data.templates.template.name.grep(name).isEmpty()) { return null } - assert resp.data.templates.name.grep(name).size() == 1 : - "Multiple templates found named '$name'" + assert resp.data.templates.template.name.grep(name).size().toInteger() == 1 : + "Multiple templates found named '$name'" // println prettyPrint(toJson(resp.data)) - def t = resp.data.templates.find { it.name == name } + def t = resp.data.templates.find { it.template.name == name } assert t != null return t @@ -190,7 +196,7 @@ def importTemplate(String templateUri) { def templateBody = templateUri.toURL().text nifi.request(POST) { request -> - uri.path = '/nifi-api/controller/templates' + uri.path = 'process-groups/root/templates/upload' requestContentType = 'multipart/form-data' MultipartEntity entity = new MultipartEntity() @@ -201,15 +207,14 @@ def importTemplate(String templateUri) { switch (resp.statusLine.statusCode) { case 200: println "[WARN] Template already exists, skipping for now" +// templateId = "aa11dd74-7898-4ee8-9ee5-c3c04c5985c8" // TODO delete template, CS and, maybe a PG break case 201: // grab the trailing UUID part of the location URL header - def location = resp.headers.Location - templateId = location[++location.lastIndexOf('/')..-1] + templateId = xml.template.id.text() println "Template successfully imported into NiFi. ID: $templateId" - updateToLatestRevision() // ready to make further changes - break + return templateId default: throw new Exception("Error importing template") break @@ -219,17 +224,15 @@ def importTemplate(String templateUri) { } def instantiateTemplate(String id) { - updateToLatestRevision() def resp = nifi.post ( - path: 'controller/process-groups/root/template-instance', - body: [ - templateId: id, - // TODO add slight randomization to the XY to avoid hiding PG behind each other - originX: 100, - originY: 100, - version: currentRevision - ], - requestContentType: URLENC + path: 'process-groups/root/template-instance', + body: [ + templateId: id, + // TODO add slight randomization to the XY to avoid hiding PG behind each other + originX: 100, + originY: 100 + ], + requestContentType: JSON ) assert resp.status == 201 @@ -238,46 +241,53 @@ def instantiateTemplate(String id) { def loadProcessGroups() { println "Loading Process Groups from NiFi" def resp = nifi.get( - path: 'controller/process-groups/root/process-group-references' + path: 'process-groups/root/process-groups' ) assert resp.status == 200 // println resp.data - processGroups = resp.data.processGroups + return resp.data.processGroups +} + +def findProcessGroup(processGroups, name) { + processGroups.find { it.component.name == name} } /** - read the desired pgConfig - locate the processor according to the nesting structure in YAML - (intentionally not using 'search') to pick up a specific PG->Proc + (intentionally not using 'search') to pick up a specific PG->Proc - update via a partial PUT constructed from the pgConfig -*/ + */ def handleProcessGroup(Map.Entry pgConfig) { //println pgConfig + processGroups = loadProcessGroups() + if (!pgConfig.value) { return } - updateToLatestRevision() - def pgName = pgConfig.key - def pg = processGroups.find { it.name == pgName } + def pg = findProcessGroup(processGroups, pgName) assert pg : "Processing Group '$pgName' not found in this instance, check your deployment config?" def pgId = pg.id + // handle the controller services + pgConfig.value.controllerServices.each {handleControllerService(pgId, it) } + println "Process Group: $pgConfig.key ($pgId)" //println pgConfig if (!pg.comments) { - updateToLatestRevision() // update process group comments with a deployment timestamp def builder = new JsonBuilder() builder { revision { clientId client - version currentRevision + version pg.revision.version } - processGroup { + id pgId + component { id pgId comments defaultComment } @@ -285,23 +295,21 @@ def handleProcessGroup(Map.Entry pgConfig) { // println builder.toPrettyString() - updateToLatestRevision() - resp = nifi.put ( - path: "controller/process-groups/$pgId", - body: builder.toPrettyString(), - requestContentType: JSON + path: "process-groups/$pgId", + body: builder.toPrettyString(), + requestContentType: JSON ) assert resp.status == 200 } // load processors in this group - resp = nifi.get(path: "controller/process-groups/$pgId/processors") + resp = nifi.get(path: "process-groups/$pgId/processors") assert resp.status == 200 // construct a quick map of "procName -> [id, fullUri]" def processors = resp.data.processors.collectEntries { - [(it.name): [it.id, it.uri, it.comments]] + [(it.component.name): [it.id, it.uri, it.component.config.comments]] } pgConfig.value.processors.each { proc -> @@ -318,13 +326,18 @@ def handleProcessGroup(Map.Entry pgConfig) { def procProps = proc.value.config.entrySet() println "Applying processor configuration" + + resp_get = nifi.get ( + path: "processors/$procId" + ) + def builder = new JsonBuilder() builder { revision { clientId client - version currentRevision + version resp_get.data.revision.version } - processor { + component { id procId config { comments existingComments ?: defaultComment @@ -349,12 +362,10 @@ def handleProcessGroup(Map.Entry pgConfig) { // println builder.toPrettyString() - updateToLatestRevision() - resp = nifi.put ( - path: "controller/process-groups/$pgId/processors/$procId", - body: builder.toPrettyString(), - requestContentType: JSON + path: "processors/$procId", + body: builder.toPrettyString(), + requestContentType: JSON ) assert resp.status == 200 @@ -371,13 +382,12 @@ def handleProcessGroup(Map.Entry pgConfig) { startProcessGroup(pgId) } -def handleControllerService(Map.Entry cfg) { +def handleControllerService(String pgId, Map.Entry cfg) { //println config def name = cfg.key println "Looking up a controller service '$name'" - def cs = lookupControllerService(name) - updateToLatestRevision() + def cs = lookupControllerService(pgId, name) println "Found the controller service '$cs.name'. Current state is ${cs.state}." @@ -388,13 +398,18 @@ def handleControllerService(Map.Entry cfg) { if (cfg.value?.config) { println "Applying controller service '$cs.name' configuration" + + def resp_get = nifi.get ( + path: "controller-services/$cs.id" + ) + def builder = new JsonBuilder() builder { revision { clientId client - version currentRevision + version resp_get.data.revision.version } - controllerService { + component { id cs.id comments cs.comments ?: defaultComment properties { @@ -405,15 +420,10 @@ def handleControllerService(Map.Entry cfg) { } } - - // println builder.toPrettyString() - - updateToLatestRevision() - resp = nifi.put ( - path: "controller/controller-services/NODE/$cs.id", - body: builder.toPrettyString(), - requestContentType: JSON + path: "controller-services/$cs.id", + body: builder.toPrettyString(), + requestContentType: JSON ) assert resp.status == 200 } @@ -423,13 +433,6 @@ def handleControllerService(Map.Entry cfg) { startControllerService(cs.id) } -def updateToLatestRevision() { - def resp = nifi.get( - path: 'controller/revision' - ) - assert resp.status == 200 - currentRevision = resp.data.revision.version -} def stopProcessor(processGroupId, processorId) { _changeProcessorState(processGroupId, processorId, false) @@ -440,52 +443,57 @@ def startProcessor(processGroupId, processorId) { } private _changeProcessorState(processGroupId, processorId, boolean running) { - updateToLatestRevision() + + resp_get = nifi.get ( + path: "processors/$processorId" + ) + def builder = new JsonBuilder() builder { - revision { - clientId client - version currentRevision - } - processor { - id processorId - state running ? 'RUNNING' : 'STOPPED' - } + revision { + clientId client + version resp_get.data.revision.version + } + id processorId + component { + id processorId + state running ? 'RUNNING' : 'STOPPED' + } } + //println builder.toPrettyString() resp = nifi.put ( - path: "controller/process-groups/$processGroupId/processors/$processorId", - body: builder.toPrettyString(), - requestContentType: JSON + path: "processors/$processorId", + body: builder.toPrettyString(), + requestContentType: JSON ) assert resp.status == 200 - currentRevision = resp.data.revision.version } def startProcessGroup(pgId) { - _changeProcessGroupState(pgId, true) + _changeProcessGroupState(pgId, "RUNNING") } def stopProcessGroup(pgId) { print "Waiting for a Process Group to stop: $pgId " - _changeProcessGroupState(pgId, false) + _changeProcessGroupState(pgId, "STOPPED") int maxWait = 1000 * 30 // up to X seconds - def resp = nifi.get(path: "controller/process-groups/$pgId/status") + def resp = nifi.get(path: "process-groups/$pgId") assert resp.status == 200 long start = System.currentTimeMillis() // keep polling till active threads shut down, but no more than maxWait time while ((System.currentTimeMillis() < (start + maxWait)) && - resp.data.processGroupStatus.activeThreadCount > 0) { + resp.data.runningCount > 0) { sleep(1000) - resp = nifi.get(path: "controller/process-groups/$pgId/status") + resp = nifi.get(path: "process-groups/$pgId") assert resp.status == 200 print '.' } - if (resp.data.processGroupStatus.activeThreadCount == 0) { + if (resp.data.runningCount == 0) { println 'Done' } else { println "Failed to stop the processing group, request timed out after ${maxWait/1000} seconds" @@ -493,18 +501,17 @@ def stopProcessGroup(pgId) { } } -private _changeProcessGroupState(pgId, boolean running) { - updateToLatestRevision() +private _changeProcessGroupState(pgId, String state) { def resp = nifi.put( - path: "controller/process-groups/root/process-group-references/$pgId", - body: [ - running: running, - client: client, - version: currentRevision - ], - requestContentType: URLENC + path: "flow/process-groups/$pgId", + body: [ + id: pgId, + state: state + ], + requestContentType: JSON ) assert resp.status == 200 + return } def stopControllerService(csId) { @@ -516,18 +523,24 @@ def startControllerService(csId) { } private _changeControllerServiceState(csId, boolean enabled) { - updateToLatestRevision() + + + def resp_get = nifi.get ( + path: "controller-services/$csId" + ) + + def test = resp_get.data.component.referencingComponents.collect{[(it.id): it.revision]}.collectEntries{ it } if (!enabled) { // gotta stop all CS references first when disabling a CS def resp = nifi.put ( - path: "controller/controller-services/node/$csId/references", - body: [ - clientId: client, - version: currentRevision, - state: 'STOPPED' - ], - requestContentType: URLENC + path: "controller-services/$csId/references", + body: [ + referencingComponentRevisions: test, + id: csId, + state: 'STOPPED' + ], + requestContentType: JSON ) assert resp.status == 200 } @@ -536,9 +549,9 @@ private _changeControllerServiceState(csId, boolean enabled) { builder { revision { clientId client - version currentRevision + version resp_get.data.revision.version } - controllerService { + component { id csId state enabled ? 'ENABLED' : 'DISABLED' } @@ -547,9 +560,9 @@ private _changeControllerServiceState(csId, boolean enabled) { // println builder.toPrettyString() resp = nifi.put( - path: "controller/controller-services/NODE/$csId", - body: builder.toPrettyString(), - requestContentType: JSON + path: "controller-services/$csId", + body: builder.toPrettyString(), + requestContentType: JSON ) assert resp.status == 200 } @@ -569,10 +582,10 @@ assert nifiHostPort : "No NiFI REST API endpoint provided" nifi = new RESTClient("$nifiHostPort/nifi-api/") nifi.handler.failure = { resp, data -> - resp.setData(data?.text) - println "[ERROR] HTTP call failed. Status code: $resp.statusLine: $resp.data" - // fail gracefully with a more sensible groovy stacktrace - assert null : "Terminated script execution" + resp.setData(data?.text) + println "[ERROR] HTTP call failed. Status code: $resp.statusLine: $resp.data" + // fail gracefully with a more sensible groovy stacktrace + assert null : "Terminated script execution" } @@ -582,28 +595,19 @@ assert client : 'Client ID must be provided' thisHost = InetAddress.localHost defaultComment = "Last updated by '$client' on ${new Date()} from $thisHost" -currentRevision = -1 // used for optimistic concurrency throughout the REST API - -processGroups = null -loadProcessGroups() handleUndeploy() -templateId = null // will be assigned on import into NiFi def tUri = opts.template ?: conf.nifi.templateUri assert tUri : "Template URI not provided" -importTemplate(tUri) +templateId = importTemplate(tUri) instantiateTemplate(templateId) -// reload after template instantiation -loadProcessGroups() - println "Configuring Controller Services" // controller services are dependencies of processors, // configure them first -conf.controllerServices.each { handleControllerService(it) } println "Configuring Process Groups and Processors" conf.processGroups.each { handleProcessGroup(it) } diff --git a/README.md b/README.md index eb9ddbf..9d1c19a 100644 --- a/README.md +++ b/README.md @@ -65,14 +65,13 @@ nifi: # Tell NiFi we want some things removed to make way for this (re-) deployment undeploy: - # Names of controller services to remove. Ignores any missing ones - controllerServices: - - StandardHttpContextMap - - SomeOtherControllerService - # Names of process groups to remove. These are in your template processGroups: - - Hello NiFi Web Service + Hello NiFi Web Service: + # Names of controller services to remove. Ignores any missing ones + controllerServices: + - StandardHttpContextMap + - SomeOtherControllerService # Template names to remove. Because we're updating with a new version templates: @@ -82,10 +81,6 @@ nifi: Next, one describes what configuration changes need to be applied to the template in this deployment: ``` -# Instantiate these controller services, our template uses them -controllerServices: - StandardHttpContextMap: - state: ENABLED # Processors belong to process groups. # This way random ones won't be picked up (unlike a search api, @@ -113,6 +108,10 @@ processGroups: "Update Request Body with a greeting!": config: Replacement Value: Dynamically Configured NiFi! + # Instantiate these controller services, our template uses them + controllerServices: + StandardHttpContextMap: + state: ENABLED ``` diff --git a/nifi-deploy.yml b/nifi-deploy.yml index 55bd103..d25d38f 100644 --- a/nifi-deploy.yml +++ b/nifi-deploy.yml @@ -1,23 +1,17 @@ nifi: - url: http://192.168.99.103:9091 + url: http://127.0.0.1:8080 clientId: Deployment Script v1 templateUri: "https://cwiki.apache.org/confluence/download/attachments/57904847/Hello_NiFi_Web_Service.xml?version=1&modificationDate=1449369797000&api=v2" # templateUri: file:./Hello_nifi.xml undeploy: - controllerServices: - - StandardHttpContextMap - - SomeOtherControllerService processGroups: - - Hello NiFi Web Service + Hello NiFi Web Service: + controllerServices: + - StandardHttpContextMap + - SomeOtherControllerService templates: - Hello NiFi Web Service -controllerServices: - StandardHttpContextMap: - state: ENABLED - config: - Maximum Outstanding Requests: 20 - processGroups: root: ~ @@ -31,3 +25,8 @@ processGroups: "Update Request Body with a greeting!": config: Replacement Value: Dynamically Configured NiFi! + controllerServices: + StandardHttpContextMap: + state: ENABLED + config: + Maximum Outstanding Requests: 20