diff --git a/rclone/exporter/exporter.go b/rclone/exporter/exporter.go index 2c47dc4..ae48454 100644 --- a/rclone/exporter/exporter.go +++ b/rclone/exporter/exporter.go @@ -81,61 +81,49 @@ func (p *RcloneExporter) Ping(ctx context.Context) error { return nil } -func (p *RcloneExporter) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) (ret error) { +func (p *RcloneExporter) Export(ctx context.Context, records <-chan *connectors.Record, results chan<- *connectors.Result) error { defer close(results) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(p.maxConcurrency) -loop: - for { - select { - case <-ctx.Done(): - ret = ctx.Err() - break loop + var g errgroup.Group + g.SetLimit(p.maxConcurrency) - case record, ok := <-records: - if !ok { - break loop - } + for record := range records { + if record.Err != nil { + results <- record.Ok() + continue + } - if record.Err != nil { - results <- record.Ok() - continue - } + if record.IsXattr || record.FileInfo.Lmode&os.ModeSymlink != 0 { + results <- record.Ok() + continue + } - if record.IsXattr || record.FileInfo.Lmode&os.ModeSymlink != 0 { + pathname := stdpath.Join(p.Root(), record.Pathname) + if record.FileInfo.Lmode.IsDir() { + if err := p.mkdir(ctx, pathname); err != nil { + results <- record.Error(err) + } else { results <- record.Ok() - continue } - pathname := stdpath.Join(p.Root(), record.Pathname) - if record.FileInfo.Lmode.IsDir() { - if err := p.mkdir(ctx, pathname); err != nil { - results <- record.Error(err) - } else { - results <- record.Ok() - } + continue + } - continue + g.Go(func() error { + if err := p.storeFile(ctx, pathname, record); err != nil { + results <- record.Error(err) + } else { + results <- record.Ok() } - - g.Go(func() error { - if err := p.storeFile(ctx, pathname, record); err != nil { - results <- record.Error(err) - } else { - results <- record.Ok() - } - return nil - }) - - } + return nil + }) } - if err := g.Wait(); err != nil && ret == nil { - ret = err + if err := g.Wait(); err != nil { + return err } - return ret + return nil } func (p *RcloneExporter) mkdir(ctx context.Context, pathname string) error { diff --git a/rclone/importer/importer.go b/rclone/importer/importer.go index c6d53ee..ecfbad7 100644 --- a/rclone/importer/importer.go +++ b/rclone/importer/importer.go @@ -2,7 +2,6 @@ package importer import ( "context" - "crypto/rand" "encoding/json" "fmt" "io" @@ -53,7 +52,6 @@ type RcloneImporter struct { Typee string Base string confFile *os.File - logFile *os.File } // NewRcloneImporter creates a new RcloneImporter instance. It expects the location @@ -79,13 +77,10 @@ func NewRcloneImporter(ctx context.Context, opts *connectors.Options, providerNa librclone.Initialize() - f, _ := os.Create("/home/ptr/dev/plakar/plakar/log2.txt") - return &RcloneImporter{ Typee: typee, Base: base, confFile: file, - logFile: f, }, nil } @@ -109,7 +104,7 @@ func (p *RcloneImporter) Import(ctx context.Context, records chan<- *connectors. } wg := &sync.WaitGroup{} - p.scanFolder(records, "", response, wg) + p.scanFolder(records, response, wg) wg.Wait() return nil @@ -130,45 +125,13 @@ func (p *RcloneImporter) GetPathInBackup(path string) string { return stdpath.Clean(path) } -// generatePathComponents is a helper function that returns a slice of strings -// containing all the hierarchical components of an absolute path, starting -// from the full path down to the root. -// -// The path given as an argument must be an absolute clean path within the -// backup. -// -// Example: -// -// Input: "/path/to/dir" -// Output: []string{"/path/to/dir", "/path/to", "/path", "/"} -// -// Input: "/relative/path" -// Output: []string{"/relative/path", "/relative", "/"} -// -// Input: "/" -// Output: []string{"/"} -func generatePathComponents(path string) []string { - components := []string{} - tmp := path - - for { - components = append(components, tmp) - parent := stdpath.Dir(tmp) - if parent == tmp { // Reached the root - break - } - tmp = parent - } - return components -} - func (p *RcloneImporter) scanRecursive(results chan<- *connectors.Record, path string, wg *sync.WaitGroup) { response, _ := p.ListFolder(results, path) - p.scanFolder(results, path, response, wg) + p.scanFolder(results, response, wg) } func (p *RcloneImporter) ListFolder(results chan<- *connectors.Record, path string) (Response, error) { - payload := map[string]interface{}{ + payload := map[string]string{ "fs": fmt.Sprintf("%s:%s", p.Typee, p.Base), "remote": path, } @@ -195,7 +158,7 @@ func (p *RcloneImporter) ListFolder(results chan<- *connectors.Record, path stri return response, nil } -func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, path string, response Response, wg *sync.WaitGroup) { +func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, response Response, wg *sync.WaitGroup) { for _, file := range response.List { wg.Add(1) go func() { @@ -255,33 +218,6 @@ func (p *RcloneImporter) scanFolder(results chan<- *connectors.Record, path stri } } -func nextRandom() string { - b := make([]byte, 8) - _, err := rand.Read(b) - if err != nil { - panic(err) - } - return fmt.Sprintf("%x", b) -} - -func createTempPath(originalPath string) (path string, err error) { - tmpPath := os.TempDir() + "/" + originalPath - prefix, suffix := "", "" - if i := strings.LastIndex(tmpPath, "*"); i >= 0 { - prefix, suffix = tmpPath[:i], tmpPath[i+1:] - } else { - prefix = tmpPath - } - - for i := 0; i < 10000; i++ { - name := prefix + nextRandom() + suffix - if _, err := os.Stat(name); os.IsNotExist(err) { - return name, nil - } - } - return "", fmt.Errorf("failed to find a folder to create the temporary file") -} - // AutoremoveTmpFile is a wrapper around an os.File that removes the file when it's closed. type AutoremoveTmpFile struct { *os.File @@ -296,12 +232,12 @@ func (p *RcloneImporter) NewReader(pathname string) (io.ReadCloser, error) { // pathname is an absolute path within the backup. Let's convert it to a // relative path to the base path. relativePath := strings.TrimPrefix(pathname, p.GetPathInBackup("")) - name, err := createTempPath("plakar_temp_*") + + fp, err := os.CreateTemp("", "plakar_rclone_tmp_*") if err != nil { return nil, err } - - fmt.Fprintf(p.logFile, "downloading and processing %s %s to %s\n", pathname, relativePath, name) + name := fp.Name() payload := map[string]string{ "srcFs": fmt.Sprintf("%s:%s", p.Typee, p.Base), @@ -322,16 +258,10 @@ func (p *RcloneImporter) NewReader(pathname string) (io.ReadCloser, error) { return nil, fmt.Errorf("failed to copy file: %s", body) } - tmpFile, err := os.Open(name) - if err != nil { - return nil, err - } - - return &AutoremoveTmpFile{tmpFile}, nil + return &AutoremoveTmpFile{fp}, nil } func (p *RcloneImporter) Close(ctx context.Context) error { - p.logFile.Close() utils.DeleteTempConf(p.confFile.Name()) librclone.Finalize() return nil diff --git a/rclone/storage/storage.go b/rclone/storage/storage.go index 2a2b24c..8464c0d 100644 --- a/rclone/storage/storage.go +++ b/rclone/storage/storage.go @@ -170,7 +170,7 @@ func (r *RcloneStorage) deleteFile(pathname string) error { } func (r *RcloneStorage) listFolder(pathname string) ([]string, error) { - payload := map[string]interface{}{ + payload := map[string]string{ "fs": fmt.Sprintf("%s:%s", r.Typee, r.Base), "remote": pathname, }