copy: parallelize tree walk

This commit is contained in:
Michael Eischer 2020-11-07 01:41:22 +01:00 committed by Alexander Neumann
parent f2a1b125cb
commit 0caad1e890
1 changed files with 55 additions and 52 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -184,39 +185,37 @@ type treeCloner struct {
buf []byte buf []byte
} }
func (t *treeCloner) copyTree(ctx context.Context, treeID restic.ID) error { func (t *treeCloner) copyTree(ctx context.Context, rootTreeID restic.ID) error {
// We have already processed this tree wg, ctx := errgroup.WithContext(ctx)
if t.visitedTrees.Has(treeID) {
return nil
}
tree, err := t.srcRepo.LoadTree(ctx, treeID) treeStream := restic.StreamTrees(ctx, wg, t.srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool {
if err != nil { visited := t.visitedTrees.Has(treeID)
return fmt.Errorf("LoadTree(%v) returned error %v", treeID.Str(), err)
}
t.visitedTrees.Insert(treeID) t.visitedTrees.Insert(treeID)
return visited
})
wg.Go(func() error {
for tree := range treeStream {
if tree.Error != nil {
return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error)
}
// Do we already have this tree blob? // Do we already have this tree blob?
if !t.dstRepo.Index().Has(restic.BlobHandle{ID: treeID, Type: restic.TreeBlob}) { if !t.dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) {
newTreeID, err := t.dstRepo.SaveTree(ctx, tree) newTreeID, err := t.dstRepo.SaveTree(ctx, tree.Tree)
if err != nil { if err != nil {
return fmt.Errorf("SaveTree(%v) returned error %v", treeID.Str(), err) return fmt.Errorf("SaveTree(%v) returned error %v", tree.ID.Str(), err)
} }
// Assurance only. // Assurance only.
if newTreeID != treeID { if newTreeID != tree.ID {
return fmt.Errorf("SaveTree(%v) returned unexpected id %s", treeID.Str(), newTreeID.Str()) return fmt.Errorf("SaveTree(%v) returned unexpected id %s", tree.ID.Str(), newTreeID.Str())
} }
} }
// TODO: parellize this stuff, likely only needed inside a tree. // TODO: parallelize blob down/upload
for _, entry := range tree.Nodes { for _, entry := range tree.Nodes {
// If it is a directory, recurse // Recursion into directories is handled by StreamTrees
if entry.Type == "dir" && entry.Subtree != nil {
if err := t.copyTree(ctx, *entry.Subtree); err != nil {
return err
}
}
// Copy the blobs for this file. // Copy the blobs for this file.
for _, blobID := range entry.Content { for _, blobID := range entry.Content {
// Do we already have this data blob? // Do we already have this data blob?
@ -224,6 +223,7 @@ func (t *treeCloner) copyTree(ctx context.Context, treeID restic.ID) error {
continue continue
} }
debug.Log("Copying blob %s\n", blobID.Str()) debug.Log("Copying blob %s\n", blobID.Str())
var err error
t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf)
if err != nil { if err != nil {
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
@ -236,5 +236,8 @@ func (t *treeCloner) copyTree(ctx context.Context, treeID restic.ID) error {
} }
} }
}
return nil return nil
})
return wg.Wait()
} }