Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 259 additions & 1 deletion bin/mmpu
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var fs = require('fs');
var path = require('path');
var strsplit = require('strsplit');
var util = require('util');
var vasync = require('vasync');

var bunyan = require('bunyan');
var cmdln = require('cmdln');
Expand All @@ -29,6 +30,7 @@ var LOG = bunyan.createLogger({
stream: process.stderr
});

var DEFAULT_PART_SIZE = 5242880;

///-- Helpers

Expand Down Expand Up @@ -272,7 +274,6 @@ MMpu.prototype.do_upload = function do_upload(subcmd, opts, args, cb) {
console.log(res.headers.etag);
cb();
}

client.close();
});
}
Expand Down Expand Up @@ -477,6 +478,263 @@ MMpu.prototype.do_commit.help = [
''
].join('\n');

//-- multi-Put
MMpu.prototype.do_put = function do_put(subcmd, opts, args, cb) {
if (!assertOpts(this, subcmd, opts, cb)) {
return;
}
if (args.length < 1) {
cb(new Error('Object path must be specified'));
return;
} else if (args.length > 1) {
this.do_help('help', {}, [subcmd], cb);
return;
}

var objectPath = args[0];
var client = createClient(opts);
var headers = {};
(opts.header || []).forEach(function (h) {
if (h.indexOf(':') === -1) {
cb(new Error('Header must be of the form "[header]: value"'));
return;
}
var tmp = strsplit(h, ':', 2);
headers[tmp[0]] = tmp[1].trim();
});

var filename = opts.file;
if (!filename) {
cb(new Error('Need a file to send'));
}
var partSize = opts.chunksize || DEFAULT_PART_SIZE;
if (partSize < DEFAULT_PART_SIZE) {
cb(new Error('Invalid part size'));
}

function calculateMd5(ctx, callback) {
var f_opts = {
start: 0,
end: ctx.opts.size - 1
};
var fstream = fs.createReadStream(ctx.filename, f_opts);
var hash = crypto.createHash('md5');
fstream.on('data', hash.update.bind(hash));
fstream.once('end', function () {
ctx.calculatedMd5 = hash.digest('base64');
callback();
});
}

function splitFile(ctx, callback) {
var offset = 0;
fs.stat(ctx.filename, function (err, stats) {
if (!stats.isFile()) {
client.close();
callback(new Error(ctx.filename + ' is not a file'));
return;
}
var size = stats.size;
ctx.opts.size = size;
while (offset < size) {
var chunkLength = (offset + partSize < size) ?
(offset + partSize) : size;
ctx.parts.push({
start: offset,
end: chunkLength - 1
});
offset += partSize;
}
if (ctx.opts.md5) {
calculateMd5(ctx, callback);
} else {
callback();
}
});
}

function initUpload(ctx, callback) {
opts = ctx.opts;
var createUploadOpts = {
headers: headers,
account: opts.account,
copies: opts.copies,
size: opts.size // populated by splitFile().
};

var drawProgBar = !opts.quiet && Boolean(process.stderr.isTTY);
if (opts.progress || drawProgBar) {
ctx.progbar = new manta.ProgressBar({
filename: ctx.filename,
size: opts.size,
nosize: false,
devtty: opts.progress
});
}

if (opts.md5) {
assert(ctx.calculatedMd5);
createUploadOpts.md5 = ctx.calculatedMd5;
}

client.createUpload(ctx.objectPath, createUploadOpts,
function (err, obj) {
if (err) {
// If we failed in create, there isn't anything to clean up.
callback(err);
} else {
ctx.multiput_obj = obj.id;
callback();
}
});
}

function logUploadWarning(index, objectId) {
console.log('Warning: upload of part %d for object %s failed',
index, objectId);
}

function doUploads(ctx, callback) {
var funcs = [];
ctx.parts.forEach(function (h, i) {
var partStream = fs.createReadStream(filename, h);
var upload_part = function (_, partCallback) {
ctx.client.uploadPart(partStream, ctx.multiput_obj, i, ctx.opts,
function (err, res) {
if (err) {
logUploadWarning(i, ctx.multiput_obj);
partCallback(err);
return;
}
ctx.etags.push(res.headers.etag);
if (ctx.progbar) {
ctx.progbar.advance(h.end - h.start);
}
partCallback();
});
};
funcs.push(upload_part);
});
vasync.pipeline({
'funcs': funcs
}, function (err, results) {
if (err) {
console.log('failed: ', err);
callback(err);
}
callback();
});
}

function commitUpload(ctx, callback) {
// commit
var commitUploadOpts = {
account: opts.account
};

client.commitUpload(ctx.multiput_obj, ctx.etags, commitUploadOpts,
function (err) {
if (ctx.progbar) {
ctx.progbar.end();
}
if (err) {
// abort ?
console.log('Warning: mpu commit failed for object %s.',
ctx.multiput_obj);
callback(err);
} else {
callback();
}
});
}

var pipelineCtx = {
opts: opts,
filename: filename,
objectPath: objectPath,
parts: [],
etags: [],
multiput_obj: null,
client: client
};

// Do our operations in order.
vasync.pipeline({
'arg' : pipelineCtx,
'funcs' : [
splitFile,
initUpload,
doUploads,
commitUpload
]
}, function (err, results) {
if (err) {
console.log('failed: ', err);
}

client.close();
cb(err);
});
};


MMpu.prototype.do_put.options = manta.DEFAULT_CLI_OPTIONS.concat([
{
group: 'mmpu put options'
},
{
names: ['copies', 'c'],
type: 'positiveInteger',
default: 2,
help: 'number of copies to make',
helpArg: 'COPIES'
},
{
names: ['file', 'f'],
type: 'string',
help: 'local file to upload',
helpArg: 'FILE',
completionType: 'file'
},
{
names: ['partsize', 'S'],
type: 'positiveInteger',
help: 'object part size',
helpArg: 'PARTSIZE'
},
{
names: ['header', 'H'],
type: 'arrayOfString',
help: 'HTTP headers to include',
helpArg: 'HEADER'
},
{
names: ['md5', 'm'],
type: 'bool',
help: 'Calculated md sum of the object to be uploaded. The ' +
'server will validate this md5 on commit, and ' +
'will reject the commit if it does not match.',
helpArg: 'MD5'
}
]);

MMpu.prototype.do_put.help = [
'Given a local file and a path in Manta, upload the file to Manta as',
'a multipart upload.',
'',
'The arguments to this command are first the object path, and second',
'(optionally) the input file.',
'',
'Additionally, mmpu put accepts most of the same options as accepted',
'by mmpu create (the --md5 option differs: it matches the mput',
'semantics.)',
'',
'Usage:',
' mmpu put [OPTIONS] PATH [FILENAME]',
'',
'{{options}}',
''
].join('\n');

//-- GetMPU

Expand Down
Loading