1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2024-12-23 00:07:25 +00:00

Merge pull request #3593 from DarkKirb/parallelize-restic-copy

Parallelize blob upload/download for restic copy
This commit is contained in:
MichaelEischer 2021-12-29 22:31:54 +01:00 committed by GitHub
commit 2c3e5d943d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 42 additions and 18 deletions

View file

@ -0,0 +1,9 @@
Enhancement: Improve restic copy performance by parallelizing IO
Restic copy previously only used a single thread for copying blobs between
repositories, which resulted in limited performance when copying small blobs
to/from a high latency backend (i.e. any remote backend, especially b2).
Copying will now use 8 parallel threads to increase the throughput of the copy
operation.
https://github.com/restic/restic/pull/3593

View file

@ -176,9 +176,12 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool {
return true return true
} }
const numCopyWorkers = 8
func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository,
visitedTrees restic.IDSet, rootTreeID restic.ID) error { visitedTrees restic.IDSet, rootTreeID restic.ID) error {
idChan := make(chan restic.ID)
wg, ctx := errgroup.WithContext(ctx) wg, ctx := errgroup.WithContext(ctx)
treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool {
@ -188,9 +191,9 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
}, nil) }, nil)
wg.Go(func() error { wg.Go(func() error {
defer close(idChan)
// reused buffer // reused buffer
var buf []byte var buf []byte
for tree := range treeStream { for tree := range treeStream {
if tree.Error != nil { if tree.Error != nil {
return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error)
@ -211,32 +214,44 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep
} }
} }
// TODO: parallelize blob down/upload
for _, entry := range tree.Nodes { for _, entry := range tree.Nodes {
// Recursion into directories is handled by StreamTrees // Recursion into directories is handled by StreamTrees
// Copy the blobs for this file. // Copy the blobs for this file.
for _, blobID := range entry.Content { for _, blobID := range entry.Content {
// Do we already have this data blob? select {
if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { case idChan <- blobID:
continue case <-ctx.Done():
} return ctx.Err()
debug.Log("Copying blob %s\n", blobID.Str())
var err error
buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf)
if err != nil {
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
}
_, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false)
if err != nil {
return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err)
} }
} }
} }
} }
return nil return nil
}) })
for i := 0; i < numCopyWorkers; i++ {
wg.Go(func() error {
// reused buffer
var buf []byte
for blobID := range idChan {
// Do we already have this data blob?
if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) {
continue
}
debug.Log("Copying blob %s\n", blobID.Str())
var err error
buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf)
if err != nil {
return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err)
}
_, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false)
if err != nil {
return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err)
}
}
return nil
})
}
return wg.Wait() return wg.Wait()
} }