mirror of
https://github.com/restic/restic.git
synced 2024-12-22 07:43:03 +00:00
dump: Simplify writeNode and use fewer goroutines
This changes Dumper.writeNode to spawn loader goroutines as needed instead of as a pool. The code is shorter, fewer goroutines are spawned for small files, and crash dumps (also for unrelated errors) should be smaller.
This commit is contained in:
parent
efec1a5e96
commit
8c7a6daa47
1 changed files with 32 additions and 58 deletions
|
@ -6,7 +6,6 @@ import (
|
||||||
"path"
|
"path"
|
||||||
|
|
||||||
"github.com/restic/restic/internal/bloblru"
|
"github.com/restic/restic/internal/bloblru"
|
||||||
"github.com/restic/restic/internal/errors"
|
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
"github.com/restic/restic/internal/walker"
|
"github.com/restic/restic/internal/walker"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -104,75 +103,50 @@ func (d *Dumper) WriteNode(ctx context.Context, node *restic.Node) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
|
func (d *Dumper) writeNode(ctx context.Context, w io.Writer, node *restic.Node) error {
|
||||||
type loadTask struct {
|
|
||||||
id restic.ID
|
|
||||||
out chan<- []byte
|
|
||||||
}
|
|
||||||
type writeTask struct {
|
|
||||||
data <-chan []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
loaderCh := make(chan loadTask)
|
|
||||||
// per worker: allows for one blob that gets download + one blob thats queue for writing
|
|
||||||
writerCh := make(chan writeTask, d.repo.Connections()*2)
|
|
||||||
|
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
limit := d.repo.Connections() - 1 // See below for the -1.
|
||||||
|
blobs := make(chan (<-chan []byte), limit)
|
||||||
|
|
||||||
wg.Go(func() error {
|
wg.Go(func() error {
|
||||||
defer close(loaderCh)
|
for ch := range blobs {
|
||||||
defer close(writerCh)
|
|
||||||
for _, id := range node.Content {
|
|
||||||
// non-blocking blob handover to allow the loader to load the next blob
|
|
||||||
// while the old one is still written
|
|
||||||
ch := make(chan []byte, 1)
|
|
||||||
select {
|
select {
|
||||||
case loaderCh <- loadTask{id: id, out: ch}:
|
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
}
|
case blob := <-ch:
|
||||||
|
if _, err := w.Write(blob); err != nil {
|
||||||
select {
|
|
||||||
case writerCh <- writeTask{data: ch}:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
for i := uint(0); i < d.repo.Connections(); i++ {
|
|
||||||
wg.Go(func() error {
|
|
||||||
for task := range loaderCh {
|
|
||||||
blob, err := d.cache.GetOrCompute(task.id, func() ([]byte, error) {
|
|
||||||
return d.repo.LoadBlob(ctx, restic.DataBlob, task.id, nil)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
|
||||||
case task.out <- blob:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Go(func() error {
|
|
||||||
for result := range writerCh {
|
|
||||||
select {
|
|
||||||
case data := <-result.data:
|
|
||||||
if _, err := w.Write(data); err != nil {
|
|
||||||
return errors.Wrap(err, "Write")
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Start short-lived goroutines to load blobs.
|
||||||
|
// There will be at most 1+cap(blobs) calling LoadBlob at any moment.
|
||||||
|
loop:
|
||||||
|
for _, id := range node.Content {
|
||||||
|
// This needs to be buffered, so that loaders can quit
|
||||||
|
// without waiting for the writer.
|
||||||
|
ch := make(chan []byte, 1)
|
||||||
|
|
||||||
|
wg.Go(func() error {
|
||||||
|
blob, err := d.cache.GetOrCompute(id, func() ([]byte, error) {
|
||||||
|
return d.repo.LoadBlob(ctx, restic.DataBlob, id, nil)
|
||||||
|
})
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
ch <- blob
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
select {
|
||||||
|
case blobs <- ch:
|
||||||
|
case <-ctx.Done():
|
||||||
|
break loop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
close(blobs)
|
||||||
return wg.Wait()
|
return wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue