diff --git a/bin/mmpu b/bin/mmpu index 306e00a..419c1de 100755 --- a/bin/mmpu +++ b/bin/mmpu @@ -11,6 +11,7 @@ var fs = require('fs'); var path = require('path'); var strsplit = require('strsplit'); var util = require('util'); +var vasync = require('vasync'); var bunyan = require('bunyan'); var cmdln = require('cmdln'); @@ -29,6 +30,7 @@ var LOG = bunyan.createLogger({ stream: process.stderr }); +var DEFAULT_PART_SIZE = 5242880; ///-- Helpers @@ -272,7 +274,6 @@ MMpu.prototype.do_upload = function do_upload(subcmd, opts, args, cb) { console.log(res.headers.etag); cb(); } - client.close(); }); } @@ -477,6 +478,263 @@ MMpu.prototype.do_commit.help = [ '' ].join('\n'); +//-- multi-Put +MMpu.prototype.do_put = function do_put(subcmd, opts, args, cb) { + if (!assertOpts(this, subcmd, opts, cb)) { + return; + } + if (args.length < 1) { + cb(new Error('Object path must be specified')); + return; + } else if (args.length > 1) { + this.do_help('help', {}, [subcmd], cb); + return; + } + + var objectPath = args[0]; + var client = createClient(opts); + var headers = {}; + (opts.header || []).forEach(function (h) { + if (h.indexOf(':') === -1) { + cb(new Error('Header must be of the form "[header]: value"')); + return; + } + var tmp = strsplit(h, ':', 2); + headers[tmp[0]] = tmp[1].trim(); + }); + + var filename = opts.file; + if (!filename) { + cb(new Error('Need a file to send')); + } + var partSize = opts.chunksize || DEFAULT_PART_SIZE; + if (partSize < DEFAULT_PART_SIZE) { + cb(new Error('Invalid part size')); + } + + function calculateMd5(ctx, callback) { + var f_opts = { + start: 0, + end: ctx.opts.size - 1 + }; + var fstream = fs.createReadStream(ctx.filename, f_opts); + var hash = crypto.createHash('md5'); + fstream.on('data', hash.update.bind(hash)); + fstream.once('end', function () { + ctx.calculatedMd5 = hash.digest('base64'); + callback(); + }); + } + + function splitFile(ctx, callback) { + var offset = 0; + fs.stat(ctx.filename, function (err, stats) { + if (!stats.isFile()) { + client.close(); + callback(new Error(ctx.filename + ' is not a file')); + return; + } + var size = stats.size; + ctx.opts.size = size; + while (offset < size) { + var chunkLength = (offset + partSize < size) ? + (offset + partSize) : size; + ctx.parts.push({ + start: offset, + end: chunkLength - 1 + }); + offset += partSize; + } + if (ctx.opts.md5) { + calculateMd5(ctx, callback); + } else { + callback(); + } + }); + } + + function initUpload(ctx, callback) { + opts = ctx.opts; + var createUploadOpts = { + headers: headers, + account: opts.account, + copies: opts.copies, + size: opts.size // populated by splitFile(). + }; + + var drawProgBar = !opts.quiet && Boolean(process.stderr.isTTY); + if (opts.progress || drawProgBar) { + ctx.progbar = new manta.ProgressBar({ + filename: ctx.filename, + size: opts.size, + nosize: false, + devtty: opts.progress + }); + } + + if (opts.md5) { + assert(ctx.calculatedMd5); + createUploadOpts.md5 = ctx.calculatedMd5; + } + + client.createUpload(ctx.objectPath, createUploadOpts, + function (err, obj) { + if (err) { + // If we failed in create, there isn't anything to clean up. + callback(err); + } else { + ctx.multiput_obj = obj.id; + callback(); + } + }); + } + + function logUploadWarning(index, objectId) { + console.log('Warning: upload of part %d for object %s failed', + index, objectId); + } + + function doUploads(ctx, callback) { + var funcs = []; + ctx.parts.forEach(function (h, i) { + var partStream = fs.createReadStream(filename, h); + var upload_part = function (_, partCallback) { + ctx.client.uploadPart(partStream, ctx.multiput_obj, i, ctx.opts, + function (err, res) { + if (err) { + logUploadWarning(i, ctx.multiput_obj); + partCallback(err); + return; + } + ctx.etags.push(res.headers.etag); + if (ctx.progbar) { + ctx.progbar.advance(h.end - h.start); + } + partCallback(); + }); + }; + funcs.push(upload_part); + }); + vasync.pipeline({ + 'funcs': funcs + }, function (err, results) { + if (err) { + console.log('failed: ', err); + callback(err); + } + callback(); + }); + } + + function commitUpload(ctx, callback) { + // commit + var commitUploadOpts = { + account: opts.account + }; + + client.commitUpload(ctx.multiput_obj, ctx.etags, commitUploadOpts, + function (err) { + if (ctx.progbar) { + ctx.progbar.end(); + } + if (err) { + // abort ? + console.log('Warning: mpu commit failed for object %s.', + ctx.multiput_obj); + callback(err); + } else { + callback(); + } + }); + } + + var pipelineCtx = { + opts: opts, + filename: filename, + objectPath: objectPath, + parts: [], + etags: [], + multiput_obj: null, + client: client + }; + + // Do our operations in order. + vasync.pipeline({ + 'arg' : pipelineCtx, + 'funcs' : [ + splitFile, + initUpload, + doUploads, + commitUpload + ] + }, function (err, results) { + if (err) { + console.log('failed: ', err); + } + + client.close(); + cb(err); + }); +}; + + +MMpu.prototype.do_put.options = manta.DEFAULT_CLI_OPTIONS.concat([ + { + group: 'mmpu put options' + }, + { + names: ['copies', 'c'], + type: 'positiveInteger', + default: 2, + help: 'number of copies to make', + helpArg: 'COPIES' + }, + { + names: ['file', 'f'], + type: 'string', + help: 'local file to upload', + helpArg: 'FILE', + completionType: 'file' + }, + { + names: ['partsize', 'S'], + type: 'positiveInteger', + help: 'object part size', + helpArg: 'PARTSIZE' + }, + { + names: ['header', 'H'], + type: 'arrayOfString', + help: 'HTTP headers to include', + helpArg: 'HEADER' + }, + { + names: ['md5', 'm'], + type: 'bool', + help: 'Calculated md sum of the object to be uploaded. The ' + + 'server will validate this md5 on commit, and ' + + 'will reject the commit if it does not match.', + helpArg: 'MD5' + } +]); + +MMpu.prototype.do_put.help = [ + 'Given a local file and a path in Manta, upload the file to Manta as', + 'a multipart upload.', + '', + 'The arguments to this command are first the object path, and second', + '(optionally) the input file.', + '', + 'Additionally, mmpu put accepts most of the same options as accepted', + 'by mmpu create (the --md5 option differs: it matches the mput', + 'semantics.)', + '', + 'Usage:', + ' mmpu put [OPTIONS] PATH [FILENAME]', + '', + '{{options}}', + '' +].join('\n'); //-- GetMPU diff --git a/test/mmpu.test.js b/test/mmpu.test.js index e447a95..7834d00 100644 --- a/test/mmpu.test.js +++ b/test/mmpu.test.js @@ -45,6 +45,7 @@ var LIST = 'list'; var PARTS = 'parts'; var ABORT = 'abort'; var COMMIT = 'commit'; +var MULTIPUT = 'put'; // object paths var C_OBJ_PATH = format('/%s/stor/node-manta-test-mmpu-%s-commit', @@ -578,8 +579,6 @@ test('mmpu list: post part upload', function (t) { }); }); - - // Commit the object, do an mget of it to verify it's the object we expect, // and remove it to clean up. test('mmpu commit C_ID C_ETAG0', function (t) { @@ -664,6 +663,110 @@ test('mmpu commit C_ID C_ETAG0', function (t) { }); }); +// Upload and commit in one step and then remove to clean up +test('mmpu put C_OBJ_PATH', function (t) { + if (!MPU_ENABLED) { + console.log('WARNING: skipping test: multipart ' + + 'upload is not enabled on this Manta deployment'); + t.done(); + return; + } + + var tmpFile = '/var/tmp/node-manta-mmpu-test-tmp-file-' + process.pid; + + var s = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'; + var largeTmpFileContents = Array(6553600).join().split(',').map( + function () { + return (s.charAt(Math.floor(Math.random() * s.length))); + }).join(''); + + function mkLargeTmpFile(_, cb) { + fs.writeFile(tmpFile, largeTmpFileContents, cb); + } + + var LARGE_C_OBJ_PATH = format( + '/%s/stor/node-manta-test-mmpu-%s-commit-large', + MANTA_USER, MANTA_USER); + + function multiput(_, cb) { + var argv = [ MMPU, MULTIPUT, '-f', tmpFile, LARGE_C_OBJ_PATH ]; + + forkExecWait({ + argv: argv + }, function (err, info) { + if (err) { + cb(err); + } else { + cb(); + } + }); + } + + function getHash(filenameToHash, cb) { + var hash = crypto.createHash('md5'); + hash.setEncoding('hex'); + var inFile = fs.createReadStream(filenameToHash); + inFile.on('end', function () { + hash.end(); + cb(hash.read()); + }); + inFile.pipe(hash); + } + + function getCommitObj(_, cb) { + var argv = [ MGET, '-o', tmpFile + '_dl', LARGE_C_OBJ_PATH ]; + + forkExecWait({ + argv: argv + }, function (err, info) { + if (err) { + cb(err); + } else { + var origName = tmpFile; + var retrievedName = tmpFile + '_dl'; + getHash(origName, function (origHash) { + getHash(retrievedName, function (retrievedHash) { + t.equal(origHash, retrievedHash); + cb(); + }); + }); + } + }); + } + + function rmTmpFile(_, cb) { + fs.unlink(tmpFile, function () { + fs.unlink(tmpFile + '_dl', cb); + }); + } + + function rmCommitObj(_, cb) { + var argv = [ MRM, LARGE_C_OBJ_PATH ]; + + forkExecWait({ + argv: argv + }, function (err, info) { + if (err) { + cb(err); + } else { + cb(); + } + }); + } + + vasync.pipeline({ + funcs: [ + mkLargeTmpFile, + multiput, + getCommitObj, + rmCommitObj, + rmTmpFile + ] + }, function (err, results) { + t.ifError(err, err); + t.done(); + }); +}); // Abort the object being uploaded to A_OBJ_PATH. test('mmpu abort A_ID', function (t) {