Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 29 additions & 41 deletions rclone/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,61 +81,49 @@ func (p *RcloneExporter) Ping(ctx context.Context) error {
return nil
}

func (p *RcloneExporter) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) (ret error) {
func (p *RcloneExporter) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) error {
defer close(results)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(p.maxConcurrency)

loop:
for {
select {
case <-ctx.Done():
ret = ctx.Err()
break loop
var g errgroup.Group
g.SetLimit(p.maxConcurrency)

case record, ok := <-records:
if !ok {
break loop
}
for record := range records {
if record.Err != nil {
results <- record.Ok()
continue
}
Comment on lines +90 to +94

if record.Err != nil {
results <- record.Ok()
continue
}
if record.IsXattr || record.FileInfo.Lmode&os.ModeSymlink != 0 {
results <- record.Ok()
continue
}

if record.IsXattr || record.FileInfo.Lmode&os.ModeSymlink != 0 {
pathname := stdpath.Join(p.Root(), record.Pathname)
if record.FileInfo.Lmode.IsDir() {
if err := p.mkdir(ctx, pathname); err != nil {
results <- record.Error(err)
} else {
results <- record.Ok()
continue
}

pathname := stdpath.Join(p.Root(), record.Pathname)
if record.FileInfo.Lmode.IsDir() {
if err := p.mkdir(ctx, pathname); err != nil {
results <- record.Error(err)
} else {
results <- record.Ok()
}
continue
}

continue
g.Go(func() error {
if err := p.storeFile(ctx, pathname, record); err != nil {
results <- record.Error(err)
} else {
results <- record.Ok()
}

g.Go(func() error {
if err := p.storeFile(ctx, pathname, record); err != nil {
results <- record.Error(err)
} else {
results <- record.Ok()
}
return nil
})

}
return nil
})
}

if err := g.Wait(); err != nil && ret == nil {
ret = err
if err := g.Wait(); err != nil {
return err
}

return ret
return nil
}

func (p *RcloneExporter) mkdir(ctx context.Context, pathname string) error {
Expand Down
86 changes: 8 additions & 78 deletions rclone/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package importer

import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -53,7 +52,6 @@ type RcloneImporter struct {
Typee string
Base string
confFile *os.File
logFile *os.File
}

// NewRcloneImporter creates a new RcloneImporter instance. It expects the location
Expand All @@ -79,13 +77,10 @@ func NewRcloneImporter(ctx context.Context, opts *connectors.Options, providerNa

librclone.Initialize()

f, _ := os.Create("/home/ptr/dev/plakar/plakar/log2.txt")

return &RcloneImporter{
Typee: typee,
Base: base,
confFile: file,
logFile: f,
}, nil
}

Expand All @@ -109,7 +104,7 @@ func (p *RcloneImporter) Import(ctx context.Context, records chan<- *connectors.
}

wg := &sync.WaitGroup{}
p.scanFolder(records, "", response, wg)
p.scanFolder(records, response, wg)

wg.Wait()
return nil
Expand All @@ -130,45 +125,13 @@ func (p *RcloneImporter) GetPathInBackup(path string) string {
return stdpath.Clean(path)
}

// generatePathComponents is a helper function that returns a slice of strings
// containing all the hierarchical components of an absolute path, starting
// from the full path down to the root.
//
// The path given as an argument must be an absolute clean path within the
// backup.
//
// Example:
//
// Input: "/path/to/dir"
// Output: []string{"/path/to/dir", "/path/to", "/path", "/"}
//
// Input: "/relative/path"
// Output: []string{"/relative/path", "/relative", "/"}
//
// Input: "/"
// Output: []string{"/"}
func generatePathComponents(path string) []string {
components := []string{}
tmp := path

for {
components = append(components, tmp)
parent := stdpath.Dir(tmp)
if parent == tmp { // Reached the root
break
}
tmp = parent
}
return components
}

func (p *RcloneImporter) scanRecursive(results chan<- *connectors.Record, path string, wg *sync.WaitGroup) {
response, _ := p.ListFolder(results, path)
p.scanFolder(results, path, response, wg)
p.scanFolder(results, response, wg)
Comment on lines 128 to +130
}

func (p *RcloneImporter) ListFolder(results chan<- *connectors.Record, path string) (Response, error) {
payload := map[string]interface{}{
payload := map[string]string{
"fs": fmt.Sprintf("%s:%s", p.Typee, p.Base),
"remote": path,
}
Comment on lines 133 to 137
Expand All @@ -195,7 +158,7 @@ func (p *RcloneImporter) ListFolder(results chan<- *connectors.Record, path stri
return response, nil
}

func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, path string, response Response, wg *sync.WaitGroup) {
func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, response Response, wg *sync.WaitGroup) {
for _, file := range response.List {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -255,33 +218,6 @@ func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, path stri
}
}

func nextRandom() string {
b := make([]byte, 8)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return fmt.Sprintf("%x", b)
}

func createTempPath(originalPath string) (path string, err error) {
tmpPath := os.TempDir() + "/" + originalPath
prefix, suffix := "", ""
if i := strings.LastIndex(tmpPath, "*"); i >= 0 {
prefix, suffix = tmpPath[:i], tmpPath[i+1:]
} else {
prefix = tmpPath
}

for i := 0; i < 10000; i++ {
name := prefix + nextRandom() + suffix
if _, err := os.Stat(name); os.IsNotExist(err) {
return name, nil
}
}
return "", fmt.Errorf("failed to find a folder to create the temporary file")
}

// AutoremoveTmpFile is a wrapper around an os.File that removes the file when it's closed.
type AutoremoveTmpFile struct {
*os.File
Expand All @@ -296,12 +232,12 @@ func (p *RcloneImporter) NewReader(pathname string) (io.ReadCloser, error) {
// pathname is an absolute path within the backup. Let's convert it to a
// relative path to the base path.
relativePath := strings.TrimPrefix(pathname, p.GetPathInBackup(""))
name, err := createTempPath("plakar_temp_*")

fp, err := os.CreateTemp("", "plakar_rclone_tmp_*")
if err != nil {
return nil, err
}

fmt.Fprintf(p.logFile, "downloading and processing %s %s to %s\n", pathname, relativePath, name)
name := fp.Name()

payload := map[string]string{
"srcFs": fmt.Sprintf("%s:%s", p.Typee, p.Base),
Expand All @@ -322,16 +258,10 @@ func (p *RcloneImporter) NewReader(pathname string) (io.ReadCloser, error) {
return nil, fmt.Errorf("failed to copy file: %s", body)
}

tmpFile, err := os.Open(name)
if err != nil {
return nil, err
}

return &AutoremoveTmpFile{tmpFile}, nil
return &AutoremoveTmpFile{fp}, nil
}
Comment on lines 258 to 262

func (p *RcloneImporter) Close(ctx context.Context) error {
p.logFile.Close()
utils.DeleteTempConf(p.confFile.Name())
librclone.Finalize()
return nil
Expand Down
2 changes: 1 addition & 1 deletion rclone/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (r *RcloneStorage) deleteFile(pathname string) error {
}

func (r *RcloneStorage) listFolder(pathname string) ([]string, error) {
payload := map[string]interface{}{
payload := map[string]string{
"fs": fmt.Sprintf("%s:%s", r.Typee, r.Base),
"remote": pathname,
}
Expand Down