diff --git a/backend/internxt/internxt.go b/backend/internxt/internxt.go index a221abe323012..e8bffc3a28e1c 100644 --- a/backend/internxt/internxt.go +++ b/backend/internxt/internxt.go @@ -21,6 +21,7 @@ import ( "github.com/internxt/rclone-adapter/folders" "github.com/internxt/rclone-adapter/users" "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" rclone_config "github.com/rclone/rclone/fs/config" "github.com/rclone/rclone/fs/config/configmap" "github.com/rclone/rclone/fs/config/configstruct" @@ -30,6 +31,7 @@ import ( "github.com/rclone/rclone/fs/hash" "github.com/rclone/rclone/lib/dircache" "github.com/rclone/rclone/lib/encoder" + "github.com/rclone/rclone/lib/multipart" "github.com/rclone/rclone/lib/oauthutil" "github.com/rclone/rclone/lib/pacer" "github.com/rclone/rclone/lib/random" @@ -101,6 +103,21 @@ func init() { Default: true, Advanced: true, Help: "Skip hash validation when downloading files.\n\nBy default, hash validation is disabled. Set this to false to enable validation.", + }, { + Name: "chunk_size", + Help: "Chunk size for multipart uploads.\n\nLarge files will be uploaded in chunks of this size.\n\nMemory usage is approximately chunk_size * upload_concurrency.", + Default: fs.SizeSuffix(30 * 1024 * 1024), + Advanced: true, + }, { + Name: "upload_concurrency", + Help: "Concurrency for multipart uploads.\n\nThis is the number of chunks of the same file that are uploaded concurrently.\n\nNote that each chunk is buffered in memory.", + Default: 4, + Advanced: true, + }, { + Name: "max_upload_parts", + Help: "Maximum number of parts in a multipart upload.\n\nThis defines the maximum number of multipart chunks to use when uploading a file.", + Default: 10000, + Advanced: true, }, { Name: rclone_config.ConfigEncoding, Help: rclone_config.ConfigEncodingHelp, @@ -194,24 +211,28 @@ type Options struct { TwoFA string `config:"2fa"` Mnemonic string `config:"mnemonic"` SkipHashValidation bool `config:"skip_hash_validation"` + ChunkSize fs.SizeSuffix `config:"chunk_size"` + UploadConcurrency int `config:"upload_concurrency"` + MaxUploadParts int `config:"max_upload_parts"` Encoding encoder.MultiEncoder `config:"encoding"` } // Fs represents an Internxt remote type Fs struct { - name string - root string - opt Options - m configmap.Mapper - dirCache *dircache.DirCache - cfg *config.Config - features *fs.Features - pacer *fs.Pacer - tokenRenewer *oauthutil.Renew - bridgeUser string - userID string - authMu sync.Mutex - authFailed bool + name string + root string + opt Options + m configmap.Mapper + dirCache *dircache.DirCache + cfg *config.Config + features *fs.Features + pacer *fs.Pacer + tokenRenewer *oauthutil.Renew + bridgeUser string + userID string + authMu sync.Mutex + authFailed bool + pendingSession *buckets.ChunkUploadSession // set by Update, consumed by OpenChunkWriter } // Object holds the data for a remote file object @@ -333,6 +354,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e f.features = (&fs.Features{ CanHaveEmptyDirectories: true, + OpenChunkWriter: f.OpenChunkWriter, }).Fill(ctx, f) if ts != nil { @@ -884,32 +906,77 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op fs.Debugf(o.f, "Renamed existing file %s to backup %s.%s (UUID: %s)", remote, backupName, backupType, backupUUID) } + size := src.Size() + var meta *buckets.CreateMetaResponse - err = o.f.pacer.CallNoRetry(func() (bool, error) { - var err error - meta, err = buckets.UploadFileStreamAuto(ctx, - o.f.cfg, - dirID, - o.f.opt.Encoding.FromStandardName(path.Base(remote)), - in, - src.Size(), - src.ModTime(ctx), - ) - return o.f.shouldRetry(ctx, err) - }) + if size >= int64(o.f.opt.ChunkSize) { + // Use multipart upload via OpenChunkWriter for large files. + // Create the session first so we can wrap in with the encrypting reader + // before UploadMultipart reads from it. + uploadParts := o.f.opt.MaxUploadParts + chunkSize := chunksize.Calculator(src, size, uploadParts, o.f.opt.ChunkSize) + var session *buckets.ChunkUploadSession + err = o.f.pacer.Call(func() (bool, error) { + var err error + session, err = buckets.NewChunkUploadSession(ctx, o.f.cfg, size, int64(chunkSize)) + return o.f.shouldRetry(ctx, err) + }) + if err != nil { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fmt.Errorf("failed to create upload session: %w", err) + } - if err != nil && isEmptyFileLimitError(err) { - o.restoreBackupFile(ctx, backupUUID, origName, origType) - return fs.ErrorCantUploadEmptyFiles - } + // Wrap reader with SDK's encrypting reader (sequential AES-256-CTR + SHA-256 hash) + encReader := session.EncryptingReader(in) - if err != nil { - meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID) - } + // Store session for OpenChunkWriter to pick up + o.f.pendingSession = session - if err != nil { - o.restoreBackupFile(ctx, backupUUID, origName, origType) - return err + chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, encReader, multipart.UploadMultipartOptions{ + Open: o.f, + OpenOptions: options, + }) + + o.f.pendingSession = nil + + if uploadErr != nil { + if isEmptyFileLimitError(uploadErr) { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fs.ErrorCantUploadEmptyFiles + } + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return uploadErr + } + w := chunkWriter.(*internxtChunkWriter) + meta = w.meta + } else { + // Use single-part upload for small files + err = o.f.pacer.CallNoRetry(func() (bool, error) { + var err error + meta, err = buckets.UploadFileStreamAuto(ctx, + o.f.cfg, + dirID, + o.f.opt.Encoding.FromStandardName(path.Base(remote)), + in, + size, + src.ModTime(ctx), + ) + return o.f.shouldRetry(ctx, err) + }) + + if err != nil && isEmptyFileLimitError(err) { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return fs.ErrorCantUploadEmptyFiles + } + + if err != nil { + meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID) + } + + if err != nil { + o.restoreBackupFile(ctx, backupUUID, origName, origType) + return err + } } // Update object metadata diff --git a/backend/internxt/upload.go b/backend/internxt/upload.go new file mode 100644 index 0000000000000..445cbc203b671 --- /dev/null +++ b/backend/internxt/upload.go @@ -0,0 +1,186 @@ +package internxt + +import ( + "context" + "fmt" + "io" + "path" + "sort" + "strings" + "sync" + + "github.com/internxt/rclone-adapter/buckets" + "github.com/rclone/rclone/fs" + "github.com/rclone/rclone/fs/chunksize" +) + +var warnStreamUpload sync.Once + +// internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads. +// All encryption is handled by the SDK's ChunkUploadSession. +type internxtChunkWriter struct { + f *Fs + remote string + src fs.ObjectInfo + session *buckets.ChunkUploadSession + completedParts []buckets.CompletedPart + partsMu sync.Mutex + size int64 + dirID string + meta *buckets.CreateMetaResponse +} + +// OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads. +// +// When called from Update (via multipart.UploadMultipart), the session is +// pre-created and stored in f.pendingSession so that the encrypting reader +// can be applied to the input before UploadMultipart reads from it. +func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) { + size := src.Size() + + uploadParts := f.opt.MaxUploadParts + if uploadParts < 1 { + uploadParts = 1 + } + + chunkSize := f.opt.ChunkSize + if size < 0 { + warnStreamUpload.Do(func() { + fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v", + chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts))) + }) + } else { + chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize) + } + + // Ensure parent directory exists + _, dirID, err := f.dirCache.FindPath(ctx, remote, true) + if err != nil { + return info, nil, fmt.Errorf("failed to find parent directory: %w", err) + } + + // Use pre-created session from Update() if available, otherwise create one + session := f.pendingSession + if session == nil { + err = f.pacer.Call(func() (bool, error) { + var err error + session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize)) + return f.shouldRetry(ctx, err) + }) + if err != nil { + return info, nil, fmt.Errorf("failed to create upload session: %w", err) + } + } + + w := &internxtChunkWriter{ + f: f, + remote: remote, + src: src, + session: session, + size: size, + dirID: dirID, + } + + info = fs.ChunkWriterInfo{ + ChunkSize: int64(chunkSize), + Concurrency: f.opt.UploadConcurrency, + LeavePartsOnError: false, + } + + return info, w, nil +} + +// WriteChunk uploads chunk number with reader bytes. +// The data has already been encrypted by the EncryptingReader applied +// to the input stream before UploadMultipart started reading. +func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) { + // Determine chunk size from the reader + currentPos, err := reader.Seek(0, io.SeekCurrent) + if err != nil { + return 0, fmt.Errorf("failed to get current position: %w", err) + } + end, err := reader.Seek(0, io.SeekEnd) + if err != nil { + return 0, fmt.Errorf("failed to seek to end: %w", err) + } + size := end - currentPos + if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { + return 0, fmt.Errorf("failed to seek back: %w", err) + } + + if size == 0 { + return 0, nil + } + + var etag string + err = w.f.pacer.Call(func() (bool, error) { + // Seek back to start for retries + if _, err := reader.Seek(currentPos, io.SeekStart); err != nil { + return false, err + } + var uploadErr error + etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, reader, size) + return w.f.shouldRetry(ctx, uploadErr) + }) + if err != nil { + return 0, err + } + + w.partsMu.Lock() + w.completedParts = append(w.completedParts, buckets.CompletedPart{ + PartNumber: chunkNumber + 1, + ETag: etag, + }) + w.partsMu.Unlock() + + return size, nil +} + +// Close completes the multipart upload and registers the file in Internxt Drive. +func (w *internxtChunkWriter) Close(ctx context.Context) error { + // Sort parts by part number + w.partsMu.Lock() + sort.Slice(w.completedParts, func(i, j int) bool { + return w.completedParts[i].PartNumber < w.completedParts[j].PartNumber + }) + parts := make([]buckets.CompletedPart, len(w.completedParts)) + copy(parts, w.completedParts) + w.partsMu.Unlock() + + // Finish multipart upload (SDK computes hash + calls FinishMultipartUpload) + var finishResp *buckets.FinishUploadResp + err := w.f.pacer.Call(func() (bool, error) { + var err error + finishResp, err = w.session.Finish(ctx, parts) + return w.f.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to finish multipart upload: %w", err) + } + + // Create file metadata in Internxt Drive + baseName := w.f.opt.Encoding.FromStandardName(path.Base(w.remote)) + name := strings.TrimSuffix(baseName, path.Ext(baseName)) + ext := strings.TrimPrefix(path.Ext(baseName), ".") + + var meta *buckets.CreateMetaResponse + err = w.f.pacer.Call(func() (bool, error) { + var err error + meta, err = buckets.CreateMetaFile(ctx, w.f.cfg, + name, w.f.cfg.Bucket, &finishResp.ID, "03-aes", + w.dirID, name, ext, w.size, w.src.ModTime(ctx)) + return w.f.shouldRetry(ctx, err) + }) + if err != nil { + return fmt.Errorf("failed to create file metadata: %w", err) + } + w.meta = meta + + return nil +} + +// Abort cleans up after a failed upload. +func (w *internxtChunkWriter) Abort(ctx context.Context) error { + // Multipart uploads on Internxt are cleaned up server-side. + return nil +}