From 0caad1e890bd016cfac2e76a39c0de1840520579 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 01:41:22 +0100 Subject: [PATCH] copy: parallelize tree walk --- cmd/restic/cmd_copy.go | 107 +++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 216dab836..37cf45ec3 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -6,6 +6,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" "github.com/spf13/cobra" ) @@ -184,57 +185,59 @@ type treeCloner struct { buf []byte } -func (t *treeCloner) copyTree(ctx context.Context, treeID restic.ID) error { - // We have already processed this tree - if t.visitedTrees.Has(treeID) { +func (t *treeCloner) copyTree(ctx context.Context, rootTreeID restic.ID) error { + wg, ctx := errgroup.WithContext(ctx) + + treeStream := restic.StreamTrees(ctx, wg, t.srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { + visited := t.visitedTrees.Has(treeID) + t.visitedTrees.Insert(treeID) + return visited + }) + + wg.Go(func() error { + for tree := range treeStream { + if tree.Error != nil { + return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) + } + + // Do we already have this tree blob? + if !t.dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) { + newTreeID, err := t.dstRepo.SaveTree(ctx, tree.Tree) + if err != nil { + return fmt.Errorf("SaveTree(%v) returned error %v", tree.ID.Str(), err) + } + // Assurance only. + if newTreeID != tree.ID { + return fmt.Errorf("SaveTree(%v) returned unexpected id %s", tree.ID.Str(), newTreeID.Str()) + } + } + + // TODO: parallelize blob down/upload + + for _, entry := range tree.Nodes { + // Recursion into directories is handled by StreamTrees + // Copy the blobs for this file. + for _, blobID := range entry.Content { + // Do we already have this data blob? + if t.dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { + continue + } + debug.Log("Copying blob %s\n", blobID.Str()) + var err error + t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) + if err != nil { + return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) + } + + _, _, err = t.dstRepo.SaveBlob(ctx, restic.DataBlob, t.buf, blobID, false) + if err != nil { + return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + } + } + } + + } return nil - } - - tree, err := t.srcRepo.LoadTree(ctx, treeID) - if err != nil { - return fmt.Errorf("LoadTree(%v) returned error %v", treeID.Str(), err) - } - t.visitedTrees.Insert(treeID) - - // Do we already have this tree blob? - if !t.dstRepo.Index().Has(restic.BlobHandle{ID: treeID, Type: restic.TreeBlob}) { - newTreeID, err := t.dstRepo.SaveTree(ctx, tree) - if err != nil { - return fmt.Errorf("SaveTree(%v) returned error %v", treeID.Str(), err) - } - // Assurance only. - if newTreeID != treeID { - return fmt.Errorf("SaveTree(%v) returned unexpected id %s", treeID.Str(), newTreeID.Str()) - } - } - - // TODO: parellize this stuff, likely only needed inside a tree. - - for _, entry := range tree.Nodes { - // If it is a directory, recurse - if entry.Type == "dir" && entry.Subtree != nil { - if err := t.copyTree(ctx, *entry.Subtree); err != nil { - return err - } - } - // Copy the blobs for this file. - for _, blobID := range entry.Content { - // Do we already have this data blob? - if t.dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { - continue - } - debug.Log("Copying blob %s\n", blobID.Str()) - t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) - } - - _, _, err = t.dstRepo.SaveBlob(ctx, restic.DataBlob, t.buf, blobID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) - } - } - } - - return nil + }) + return wg.Wait() }