From 45509eafc8c0a08301bd734926f87030dadbd233 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 4 May 2024 18:57:37 +0200 Subject: [PATCH] dump: load blobs of a file from repository in parallel --- internal/dump/common.go | 97 ++++++++++++++++++++++++++++++++--------- 1 file changed, 76 insertions(+), 21 deletions(-) diff --git a/internal/dump/common.go b/internal/dump/common.go index 016328835..116762b5a 100644 --- a/internal/dump/common.go +++ b/internal/dump/common.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/walker" + "golang.org/x/sync/errgroup" ) // A Dumper writes trees and files from a repository to a Writer @@ -16,11 +17,11 @@ import ( type Dumper struct { cache *bloblru.Cache format string - repo restic.BlobLoader + repo restic.Loader w io.Writer } -func New(format string, repo restic.BlobLoader, w io.Writer) *Dumper { +func New(format string, repo restic.Loader, w io.Writer) *Dumper { return &Dumper{ cache: bloblru.New(64 << 20), format: format, @@ -103,27 +104,81 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error { } func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error { - var ( - buf []byte - err error - ) - for _, id := range node.Content { - blob, ok := d.cache.Get(id) - if !ok { - blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, id, buf) - if err != nil { - return err - } - - buf = d.cache.Add(id, blob) // Reuse evicted buffer. - } - - if _, err := w.Write(blob); err != nil { - return errors.Wrap(err, "Write") - } + type loadTask struct { + id restic.ID + out chan<- []byte + } + type writeTask struct { + data <-chan []byte } - return nil + loaderCh := make(chan loadTask) + // per worker: allows for one blob that gets download + one blob thats queue for writing + writerCh := make(chan writeTask, d.repo.Connections()*2) + + wg, ctx := errgroup.WithContext(ctx) + + wg.Go(func() error { + defer close(loaderCh) + defer close(writerCh) + for _, id := range node.Content { + // non-blocking blob handover to allow the loader to load the next blob + // while the old one is still written + ch := make(chan []byte, 1) + select { + case loaderCh <- loadTask{id: id, out: ch}: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case writerCh <- writeTask{data: ch}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + for i := uint(0); i < d.repo.Connections(); i++ { + wg.Go(func() error { + for task := range loaderCh { + var err error + blob, ok := d.cache.Get(task.id) + if !ok { + blob, err = d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil) + if err != nil { + return err + } + + d.cache.Add(task.id, blob) + } + + select { + case task.out <- blob: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + } + + wg.Go(func() error { + for result := range writerCh { + select { + case data := <-result.data: + if _, err := w.Write(data); err != nil { + return errors.Wrap(err, "Write") + } + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + }) + + return wg.Wait() } // IsDir checks if the given node is a directory.