Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 102 additions & 35 deletions backend/internxt/internxt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/internxt/rclone-adapter/folders"
"github.com/internxt/rclone-adapter/users"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
rclone_config "github.com/rclone/rclone/fs/config"
"github.com/rclone/rclone/fs/config/configmap"
"github.com/rclone/rclone/fs/config/configstruct"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/rclone/rclone/fs/hash"
"github.com/rclone/rclone/lib/dircache"
"github.com/rclone/rclone/lib/encoder"
"github.com/rclone/rclone/lib/multipart"
"github.com/rclone/rclone/lib/oauthutil"
"github.com/rclone/rclone/lib/pacer"
"github.com/rclone/rclone/lib/random"
Expand Down Expand Up @@ -101,6 +103,21 @@ func init() {
Default: true,
Advanced: true,
Help: "Skip hash validation when downloading files.\n\nBy default, hash validation is disabled. Set this to false to enable validation.",
}, {
Name: "chunk_size",
Help: "Chunk size for multipart uploads.\n\nLarge files will be uploaded in chunks of this size.\n\nMemory usage is approximately chunk_size * upload_concurrency.",
Default: fs.SizeSuffix(30 * 1024 * 1024),
Advanced: true,
}, {
Name: "upload_concurrency",
Help: "Concurrency for multipart uploads.\n\nThis is the number of chunks of the same file that are uploaded concurrently.\n\nNote that each chunk is buffered in memory.",
Default: 4,
Advanced: true,
}, {
Name: "max_upload_parts",
Help: "Maximum number of parts in a multipart upload.\n\nThis defines the maximum number of multipart chunks to use when uploading a file.",
Default: 10000,
Advanced: true,
}, {
Name: rclone_config.ConfigEncoding,
Help: rclone_config.ConfigEncodingHelp,
Expand Down Expand Up @@ -194,24 +211,28 @@ type Options struct {
TwoFA string `config:"2fa"`
Mnemonic string `config:"mnemonic"`
SkipHashValidation bool `config:"skip_hash_validation"`
ChunkSize fs.SizeSuffix `config:"chunk_size"`
UploadConcurrency int `config:"upload_concurrency"`
MaxUploadParts int `config:"max_upload_parts"`
Encoding encoder.MultiEncoder `config:"encoding"`
}

// Fs represents an Internxt remote
type Fs struct {
name string
root string
opt Options
m configmap.Mapper
dirCache *dircache.DirCache
cfg *config.Config
features *fs.Features
pacer *fs.Pacer
tokenRenewer *oauthutil.Renew
bridgeUser string
userID string
authMu sync.Mutex
authFailed bool
name string
root string
opt Options
m configmap.Mapper
dirCache *dircache.DirCache
cfg *config.Config
features *fs.Features
pacer *fs.Pacer
tokenRenewer *oauthutil.Renew
bridgeUser string
userID string
authMu sync.Mutex
authFailed bool
pendingSession *buckets.ChunkUploadSession // set by Update, consumed by OpenChunkWriter
}

// Object holds the data for a remote file object
Expand Down Expand Up @@ -333,6 +354,7 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e

f.features = (&fs.Features{
CanHaveEmptyDirectories: true,
OpenChunkWriter: f.OpenChunkWriter,
}).Fill(ctx, f)

if ts != nil {
Expand Down Expand Up @@ -884,32 +906,77 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
fs.Debugf(o.f, "Renamed existing file %s to backup %s.%s (UUID: %s)", remote, backupName, backupType, backupUUID)
}

size := src.Size()

var meta *buckets.CreateMetaResponse
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
src.Size(),
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})
if size >= int64(o.f.opt.ChunkSize) {
// Use multipart upload via OpenChunkWriter for large files.
// Create the session first so we can wrap in with the encrypting reader
// before UploadMultipart reads from it.
uploadParts := o.f.opt.MaxUploadParts
chunkSize := chunksize.Calculator(src, size, uploadParts, o.f.opt.ChunkSize)
var session *buckets.ChunkUploadSession
err = o.f.pacer.Call(func() (bool, error) {
var err error
session, err = buckets.NewChunkUploadSession(ctx, o.f.cfg, size, int64(chunkSize))
return o.f.shouldRetry(ctx, err)
})
if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fmt.Errorf("failed to create upload session: %w", err)
}

if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
// Wrap reader with SDK's encrypting reader (sequential AES-256-CTR + SHA-256 hash)
encReader := session.EncryptingReader(in)

if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}
// Store session for OpenChunkWriter to pick up
o.f.pendingSession = session

if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
chunkWriter, uploadErr := multipart.UploadMultipart(ctx, src, encReader, multipart.UploadMultipartOptions{
Open: o.f,
OpenOptions: options,
})

o.f.pendingSession = nil

if uploadErr != nil {
if isEmptyFileLimitError(uploadErr) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return uploadErr
}
w := chunkWriter.(*internxtChunkWriter)
meta = w.meta
} else {
// Use single-part upload for small files
err = o.f.pacer.CallNoRetry(func() (bool, error) {
var err error
meta, err = buckets.UploadFileStreamAuto(ctx,
o.f.cfg,
dirID,
o.f.opt.Encoding.FromStandardName(path.Base(remote)),
in,
size,
src.ModTime(ctx),
)
return o.f.shouldRetry(ctx, err)
})

if err != nil && isEmptyFileLimitError(err) {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return fs.ErrorCantUploadEmptyFiles
}

if err != nil {
meta, err = o.recoverFromTimeoutConflict(ctx, err, remote, dirID)
}

if err != nil {
o.restoreBackupFile(ctx, backupUUID, origName, origType)
return err
}
}

// Update object metadata
Expand Down
186 changes: 186 additions & 0 deletions backend/internxt/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package internxt

import (
"context"
"fmt"
"io"
"path"
"sort"
"strings"
"sync"

"github.com/internxt/rclone-adapter/buckets"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/chunksize"
)

var warnStreamUpload sync.Once

// internxtChunkWriter implements fs.ChunkWriter for Internxt multipart uploads.
// All encryption is handled by the SDK's ChunkUploadSession.
type internxtChunkWriter struct {
f *Fs
remote string
src fs.ObjectInfo
session *buckets.ChunkUploadSession
completedParts []buckets.CompletedPart
partsMu sync.Mutex
size int64
dirID string
meta *buckets.CreateMetaResponse
}

// OpenChunkWriter returns the chunk size and a ChunkWriter for multipart uploads.
//
// When called from Update (via multipart.UploadMultipart), the session is
// pre-created and stored in f.pendingSession so that the encrypting reader
// can be applied to the input before UploadMultipart reads from it.
func (f *Fs) OpenChunkWriter(ctx context.Context, remote string, src fs.ObjectInfo, options ...fs.OpenOption) (info fs.ChunkWriterInfo, writer fs.ChunkWriter, err error) {
size := src.Size()

uploadParts := f.opt.MaxUploadParts
if uploadParts < 1 {
uploadParts = 1
}

chunkSize := f.opt.ChunkSize
if size < 0 {
warnStreamUpload.Do(func() {
fs.Logf(f, "Streaming uploads using chunk size %v will have maximum file size of %v",
chunkSize, fs.SizeSuffix(int64(chunkSize)*int64(uploadParts)))
})
} else {
chunkSize = chunksize.Calculator(src, size, uploadParts, chunkSize)
}

// Ensure parent directory exists
_, dirID, err := f.dirCache.FindPath(ctx, remote, true)
if err != nil {
return info, nil, fmt.Errorf("failed to find parent directory: %w", err)
}

// Use pre-created session from Update() if available, otherwise create one
session := f.pendingSession
if session == nil {
err = f.pacer.Call(func() (bool, error) {
var err error
session, err = buckets.NewChunkUploadSession(ctx, f.cfg, size, int64(chunkSize))
return f.shouldRetry(ctx, err)
})
if err != nil {
return info, nil, fmt.Errorf("failed to create upload session: %w", err)
}
}

w := &internxtChunkWriter{
f: f,
remote: remote,
src: src,
session: session,
size: size,
dirID: dirID,
}

info = fs.ChunkWriterInfo{
ChunkSize: int64(chunkSize),
Concurrency: f.opt.UploadConcurrency,
LeavePartsOnError: false,
}

return info, w, nil
}

// WriteChunk uploads chunk number with reader bytes.
// The data has already been encrypted by the EncryptingReader applied
// to the input stream before UploadMultipart started reading.
func (w *internxtChunkWriter) WriteChunk(ctx context.Context, chunkNumber int, reader io.ReadSeeker) (int64, error) {
// Determine chunk size from the reader
currentPos, err := reader.Seek(0, io.SeekCurrent)
if err != nil {
return 0, fmt.Errorf("failed to get current position: %w", err)
}
end, err := reader.Seek(0, io.SeekEnd)
if err != nil {
return 0, fmt.Errorf("failed to seek to end: %w", err)
}
size := end - currentPos
if _, err := reader.Seek(currentPos, io.SeekStart); err != nil {
return 0, fmt.Errorf("failed to seek back: %w", err)
}

if size == 0 {
return 0, nil
}

var etag string
err = w.f.pacer.Call(func() (bool, error) {
// Seek back to start for retries
if _, err := reader.Seek(currentPos, io.SeekStart); err != nil {
return false, err
}
var uploadErr error
etag, uploadErr = w.session.UploadChunk(ctx, chunkNumber, reader, size)
return w.f.shouldRetry(ctx, uploadErr)
})
if err != nil {
return 0, err
}

w.partsMu.Lock()
w.completedParts = append(w.completedParts, buckets.CompletedPart{
PartNumber: chunkNumber + 1,
ETag: etag,
})
w.partsMu.Unlock()

return size, nil
}

// Close completes the multipart upload and registers the file in Internxt Drive.
func (w *internxtChunkWriter) Close(ctx context.Context) error {
// Sort parts by part number
w.partsMu.Lock()
sort.Slice(w.completedParts, func(i, j int) bool {
return w.completedParts[i].PartNumber < w.completedParts[j].PartNumber
})
parts := make([]buckets.CompletedPart, len(w.completedParts))
copy(parts, w.completedParts)
w.partsMu.Unlock()

// Finish multipart upload (SDK computes hash + calls FinishMultipartUpload)
var finishResp *buckets.FinishUploadResp
err := w.f.pacer.Call(func() (bool, error) {
var err error
finishResp, err = w.session.Finish(ctx, parts)
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to finish multipart upload: %w", err)
}

// Create file metadata in Internxt Drive
baseName := w.f.opt.Encoding.FromStandardName(path.Base(w.remote))
name := strings.TrimSuffix(baseName, path.Ext(baseName))
ext := strings.TrimPrefix(path.Ext(baseName), ".")

var meta *buckets.CreateMetaResponse
err = w.f.pacer.Call(func() (bool, error) {
var err error
meta, err = buckets.CreateMetaFile(ctx, w.f.cfg,
name, w.f.cfg.Bucket, &finishResp.ID, "03-aes",
w.dirID, name, ext, w.size, w.src.ModTime(ctx))
return w.f.shouldRetry(ctx, err)
})
if err != nil {
return fmt.Errorf("failed to create file metadata: %w", err)
}
w.meta = meta

return nil
}

// Abort cleans up after a failed upload.
func (w *internxtChunkWriter) Abort(ctx context.Context) error {
// Multipart uploads on Internxt are cleaned up server-side.
return nil
}