From caa36f55d974b6b2e1f20908edbe92dde9598036 Mon Sep 17 00:00:00 2001 From: Vladimir Bulyga Date: Tue, 22 Apr 2014 17:00:46 +0400 Subject: [PATCH 1/2] Cannot fetch job status after 24h --- bin/mjob | 53 ++------------------------------------------------- lib/client.js | 36 +++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 56 deletions(-) diff --git a/bin/mjob b/bin/mjob index fd97767..75b5da5 100755 --- a/bin/mjob +++ b/bin/mjob @@ -183,31 +183,6 @@ function jobDataCommon(opts, callback) { worker: function getJobData(id, cb) { cb = once(cb); - function handleError(func_err) { - if (func_err.statusCode !== 404) { - cb(func_err); - return; - } - - var p = sprintf(JOB_DATA_FMT, opts.user, id, opts.type); - opts.client.get(p, function (err, stream) { - if (err) { - cb(err); - return; - } - - var body = ''; - stream.setEncoding('utf8'); - stream.on('data', function (chunk) { - body += chunk; - }); - stream.once('end', function () { - process.stdout.write(body); - cb(); - }); - }); - } - opts.client[opts.func](id, function (err, out) { if (err) { handleError(err); @@ -216,7 +191,7 @@ function jobDataCommon(opts, callback) { var body = ''; - out.once('error', handleError); + out.once('error', cb); out.on(opts.watch, function (obj) { if (opts.func === 'jobErrors') { body += JSON.stringify(obj) + '\n'; @@ -401,31 +376,7 @@ function getJobInfo(opts, cb) { assert.optionalFunc(cb, 'callback'); cb = once(cb); - opts.client.job(opts.id, function (err, job) { - if (err) { - if (err.statusCode !== 404) { - cb(err); - } else { - var p = manta.jobPath(opts.id, opts.user) + '/job.json'; - opts.client.get(p, function (_err, stream) { - if (_err) { - cb(_err); - } else { - var body = ''; - stream.setEncoding('utf8'); - stream.on('data', function (chunk) { - body += chunk; - }); - stream.once('end', function () { - cb(null, JSON.parse(body)); - }); - } - }); - } - } else { - cb(null, job); - } - }); + opts.client.job(opts.id, cb); } diff --git a/lib/client.js b/lib/client.js index 02c8fb8..ea72150 100644 --- a/lib/client.js +++ b/lib/client.js @@ -320,6 +320,10 @@ function onResultLineStreamCallback(opts) { function onResult(err, res) { if (err) { + if (err.statusCode === 404 && typeof (opts.secondTry) === 'function') { + opts.secondTry(); + return; + } readError(err, res, function () { log.debug(err, '%s: error', name); emitter.emit('error', err, res); @@ -1931,7 +1935,7 @@ MantaClient.prototype.job = function getJob(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + '/live/status'; + var _path = getJobPath(j, this.user) + (opts.path || '/live/status'); var options = createOptions({ accept: 'application/json', path: _path @@ -1957,6 +1961,11 @@ MantaClient.prototype.job = function getJob(j, opts, cb) { self.jsonClient.get(options, function (err2, _, __, obj) { if (err2) { + if (err2.statusCode === 404 && !opts.path) { + opts.path = '/job.json'; + self.job(j, opts, cb); + return; + } log.debug(err, 'getJob: failed'); cb(err2); } else { @@ -2318,7 +2327,7 @@ MantaClient.prototype.jobInput = function getJobInput(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + '/live/in'; + var _path = getJobPath(j, this.user) + (opts.path || '/live/in'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2349,6 +2358,10 @@ MantaClient.prototype.jobInput = function getJobInput(j, opts, cb) { log: log, name: 'jobInput', onResult: onResultLineStreamCallback({ + secondTry: !opts.path && function () { + opts.path = '/in.txt'; + self.jobInput(j, opts, cb); + }, emitter: emitter, emitCb: function (res, line) { line = line.replace(/\r?\n$/, ''); @@ -2395,7 +2408,7 @@ MantaClient.prototype.jobOutput = function getJobOutput(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + '/live/out'; + var _path = getJobPath(j, this.user) + (opts.path || '/live/out'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2425,6 +2438,11 @@ MantaClient.prototype.jobOutput = function getJobOutput(j, opts, cb) { log: log, name: 'jobOutput', onResult: onResultLineStreamCallback({ + secondTry: !opts.path && function () { + opts.path = '/out.txt'; + opts.headers = {accept: 'text/plain'}; + self.jobOutput(j, opts, cb); + }, emitter: emitter, emitCb: function (res, line) { line = line.replace(/\r?\n$/, ''); @@ -2471,7 +2489,7 @@ MantaClient.prototype.jobFailures = function getJobFailures(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + '/live/fail'; + var _path = getJobPath(j, this.user) + (opts.path || '/live/fail'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2501,6 +2519,10 @@ MantaClient.prototype.jobFailures = function getJobFailures(j, opts, cb) { log: log, name: 'jobFailures', onResult: onResultLineStreamCallback({ + secondTry: !opts.path && function () { + opts.path = '/fail.txt'; + self.jobFailures(j, opts, cb); + }, emitter: emitter, emitCb: function (res, line) { line = line.replace(/\r?\n$/, ''); @@ -2547,7 +2569,7 @@ MantaClient.prototype.jobErrors = function getJobErrors(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + '/live/err'; + var _path = getJobPath(j, this.user) + (opts.path || '/live/err'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2588,6 +2610,10 @@ MantaClient.prototype.jobErrors = function getJobErrors(j, opts, cb) { log: log, name: 'jobErrors', onResult: onResultLineStreamCallback({ + secondTry: !opts.path && function () { + opts.path = '/err.txt'; + self.jobErrors(j, opts, cb); + }, emitter: emitter, emitCb: onLine, log: log, From 8d96a21b53bd5d4ced31191b1a4a6ee518cfc057 Mon Sep 17 00:00:00 2001 From: Vladimir Bulyga Date: Mon, 12 May 2014 19:24:04 +0400 Subject: [PATCH 2/2] useArchive instead "path" option --- lib/client.js | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/lib/client.js b/lib/client.js index ea72150..b024641 100644 --- a/lib/client.js +++ b/lib/client.js @@ -1935,7 +1935,7 @@ MantaClient.prototype.job = function getJob(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + (opts.path || '/live/status'); + var _path = getJobPath(j, this.user) + (opts.useArchive ? '/job.json' : '/live/status'); var options = createOptions({ accept: 'application/json', path: _path @@ -1961,8 +1961,8 @@ MantaClient.prototype.job = function getJob(j, opts, cb) { self.jsonClient.get(options, function (err2, _, __, obj) { if (err2) { - if (err2.statusCode === 404 && !opts.path) { - opts.path = '/job.json'; + if (err2.statusCode === 404 && !opts.useArchive) { + opts.useArchive = true; self.job(j, opts, cb); return; } @@ -2327,7 +2327,7 @@ MantaClient.prototype.jobInput = function getJobInput(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + (opts.path || '/live/in'); + var _path = getJobPath(j, this.user) + (opts.useArchive ? '/in.txt' : '/live/in'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2358,8 +2358,8 @@ MantaClient.prototype.jobInput = function getJobInput(j, opts, cb) { log: log, name: 'jobInput', onResult: onResultLineStreamCallback({ - secondTry: !opts.path && function () { - opts.path = '/in.txt'; + secondTry: !opts.useArchive && function () { + opts.useArchive = true; self.jobInput(j, opts, cb); }, emitter: emitter, @@ -2408,7 +2408,7 @@ MantaClient.prototype.jobOutput = function getJobOutput(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + (opts.path || '/live/out'); + var _path = getJobPath(j, this.user) + (opts.useArchive ? '/out.txt' : '/live/out'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2438,8 +2438,8 @@ MantaClient.prototype.jobOutput = function getJobOutput(j, opts, cb) { log: log, name: 'jobOutput', onResult: onResultLineStreamCallback({ - secondTry: !opts.path && function () { - opts.path = '/out.txt'; + secondTry: !opts.useArchive && function () { + opts.useArchive = true; opts.headers = {accept: 'text/plain'}; self.jobOutput(j, opts, cb); }, @@ -2489,7 +2489,7 @@ MantaClient.prototype.jobFailures = function getJobFailures(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + (opts.path || '/live/fail'); + var _path = getJobPath(j, this.user) + (opts.useArchive ? '/fail.txt' : '/live/fail'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2519,8 +2519,8 @@ MantaClient.prototype.jobFailures = function getJobFailures(j, opts, cb) { log: log, name: 'jobFailures', onResult: onResultLineStreamCallback({ - secondTry: !opts.path && function () { - opts.path = '/fail.txt'; + secondTry: !opts.useArchive && function () { + opts.useArchive = true; self.jobFailures(j, opts, cb); }, emitter: emitter, @@ -2569,7 +2569,7 @@ MantaClient.prototype.jobErrors = function getJobErrors(j, opts, cb) { } assert.func(cb, 'callback'); - var _path = getJobPath(j, this.user) + (opts.path || '/live/err'); + var _path = getJobPath(j, this.user) + (opts.useArchive ? '/err.txt' : '/live/err'); var emitter = new EventEmitter(); var options = createOptions({ accept: 'application/x-json-stream', @@ -2610,8 +2610,8 @@ MantaClient.prototype.jobErrors = function getJobErrors(j, opts, cb) { log: log, name: 'jobErrors', onResult: onResultLineStreamCallback({ - secondTry: !opts.path && function () { - opts.path = '/err.txt'; + secondTry: !opts.useArchive && function () { + opts.useArchive = true; self.jobErrors(j, opts, cb); }, emitter: emitter,