From b4de902596060b0fad0e374c101ab27c3587bcda Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Oct 2022 20:23:38 +0200 Subject: [PATCH 1/4] archiver: Asynchronously complete FutureFile After reading and chunking all data in a file, the FutureFile still has to wait until the FutureBlobs are completed. This was done synchronously which results in blocking the file saver and prevents the next file from being read. By replacing the FutureBlob with a callback, it becomes possible to complete the FutureFile asynchronously. --- internal/archiver/blob_saver.go | 43 +------- internal/archiver/blob_saver_test.go | 19 ++-- internal/archiver/file_saver.go | 149 ++++++++++++++++----------- internal/archiver/file_saver_test.go | 6 +- internal/archiver/tree_saver.go | 31 +++--- internal/archiver/tree_saver_test.go | 8 +- 6 files changed, 125 insertions(+), 131 deletions(-) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index b2b5e59bb..ae4879ff4 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -43,51 +43,18 @@ func (s *BlobSaver) TriggerShutdown() { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. -func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { select { - case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}: case <-ctx.Done(): debug.Log("not sending job, context is cancelled") - close(ch) - return FutureBlob{ch: ch} } - - return FutureBlob{ch: ch} -} - -// FutureBlob is returned by SaveBlob and will return the data once it has been processed. -type FutureBlob struct { - ch <-chan SaveBlobResponse -} - -func (s *FutureBlob) Poll() *SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return &res - } - default: - } - return nil -} - -// Take blocks until the result is available or the context is cancelled. -func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return res - } - case <-ctx.Done(): - } - return SaveBlobResponse{} } type saveBlobJob struct { restic.BlobType buf *Buffer - ch chan<- SaveBlobResponse + cb func(res SaveBlobResponse) } type SaveBlobResponse struct { @@ -128,11 +95,9 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) if err != nil { debug.Log("saveBlob returned error, exiting: %v", err) - close(job.ch) return err } - job.ch <- res - close(job.ch) + job.cb(res) job.buf.Release() } } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index b25478b26..caa1e7c39 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "testing" @@ -45,16 +46,22 @@ func TestBlobSaver(t *testing.T) { b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) - var results []FutureBlob + var wait sync.WaitGroup + var results []SaveBlobResponse + wait.Add(20) for i := 0; i < 20; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - fb := b.Save(ctx, restic.DataBlob, buf) - results = append(results, fb) + idx := i + results = append(results, SaveBlobResponse{}) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) { + results[idx] = res + wait.Done() + }) } - for i, blob := range results { - sbr := blob.Take(ctx) + wait.Wait() + for i, sbr := range results { if sbr.known { t.Errorf("blob %v is known, that should not be the case", i) } @@ -94,7 +101,7 @@ func TestBlobSaverError(t *testing.T) { for i := 0; i < test.blobs; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - b.Save(ctx, restic.DataBlob, buf) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {}) } b.TriggerShutdown() diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index fc008945c..1c9352ef2 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/debug" @@ -14,7 +15,7 @@ import ( ) // SaveBlobFn saves a blob to a repo. -type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob +type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse)) // FileSaver concurrently saves incoming files to the repo. type FileSaver struct { @@ -101,46 +102,67 @@ type saveFileJob struct { } // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finish func(res futureNodeResult)) { start() - stats := ItemStats{} fnr := futureNodeResult{ snPath: snPath, target: target, } + var lock sync.Mutex + remaining := 0 + isCompleted := false + + completeBlob := func() { + lock.Lock() + defer lock.Unlock() + + remaining-- + if remaining == 0 && fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + finish(fnr) + } + } + completeError := func(err error) { + lock.Lock() + defer lock.Unlock() + + if fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + fnr.err = err + fnr.node = nil + fnr.stats = ItemStats{} + finish(fnr) + } + } debug.Log("%v", snPath) node, err := s.NodeFromFileInfo(snPath, f.Name(), fi) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } if node.Type != "file" { _ = f.Close() - fnr.err = errors.Errorf("node type %q is wrong", node.Type) - return fnr + completeError(errors.Errorf("node type %q is wrong", node.Type)) + return } // reuse the chunker chnker.Reset(f, s.pol) - var results []FutureBlob - complete := func(sbr SaveBlobResponse) { - if !sbr.known { - stats.DataBlobs++ - stats.DataSize += uint64(sbr.length) - stats.DataSizeInRepo += uint64(sbr.sizeInRepo) - } - - node.Content = append(node.Content, sbr.id) - } - node.Content = []restic.ID{} - var size uint64 + node.Size = 0 + var idx int for { buf := s.saveFilePool.Get() chunk, err := chnker.Next(buf.Data) @@ -150,62 +172,62 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } buf.Data = chunk.Data - - size += uint64(chunk.Length) + node.Size += uint64(chunk.Length) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } + // test if the context has been cancelled, return the error + if ctx.Err() != nil { + _ = f.Close() + completeError(ctx.Err()) + return + } + + // add a place to store the saveBlob result + pos := idx + node.Content = append(node.Content, restic.ID{}) + + s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) { + lock.Lock() + if !sbr.known { + fnr.stats.DataBlobs++ + fnr.stats.DataSize += uint64(sbr.length) + fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Content[pos] = sbr.id + lock.Unlock() + + completeBlob() + }) + idx++ // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - fnr.err = ctx.Err() - return fnr - } - - res := s.saveBlob(ctx, restic.DataBlob, buf) - results = append(results, res) - - // test if the context has been cancelled, return the error - if ctx.Err() != nil { - _ = f.Close() - fnr.err = ctx.Err() - return fnr + completeError(ctx.Err()) + return } s.CompleteBlob(uint64(len(chunk.Data))) - - // collect already completed blobs - for len(results) > 0 { - sbr := results[0].Poll() - if sbr == nil { - break - } - results[0] = FutureBlob{} - results = results[1:] - complete(*sbr) - } } err = f.Close() if err != nil { - fnr.err = err - return fnr + completeError(err) + return } - for i, res := range results { - results[i] = FutureBlob{} - sbr := res.Take(ctx) - complete(sbr) - } - - node.Size = size fnr.node = node - fnr.stats = stats - return fnr + lock.Lock() + // require one additional completeFuture() call to ensure that the future only completes + // after reaching the end of this method + remaining += idx + 1 + lock.Unlock() + completeBlob() } func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { @@ -224,11 +246,12 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start) - if job.complete != nil { - job.complete(res.node, res.stats) - } - job.ch <- res - close(job.ch) + s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func(res futureNodeResult) { + if job.complete != nil { + job.complete(res.node, res.stats) + } + job.ch <- res + close(job.ch) + }) } } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index a311216c7..dde4356fc 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -34,10 +34,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) { wg, ctx := errgroup.WithContext(ctx) - saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse) - close(ch) - return FutureBlob{ch: ch} + saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) { + cb(SaveBlobResponse{}) } workers := uint(runtime.NumCPU()) diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 180f987cd..d25781b03 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -11,7 +11,7 @@ import ( // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { - saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob + saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) errFn ErrorFunc ch chan<- saveTreeJob @@ -19,7 +19,7 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -124,21 +124,24 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I } b := &Buffer{Data: buf} - res := s.saveBlob(ctx, restic.TreeBlob, b) + ch := make(chan SaveBlobResponse, 1) + s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) { + ch <- res + }) - sbr := res.Take(ctx) - if !sbr.known { - stats.TreeBlobs++ - stats.TreeSize += uint64(sbr.length) - stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) - } - // The context was canceled in the meantime, id might be invalid - if ctx.Err() != nil { + select { + case sbr := <-ch: + if !sbr.known { + stats.TreeBlobs++ + stats.TreeSize += uint64(sbr.length) + stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Subtree = &sbr.id + return node, stats, nil + case <-ctx.Done(): return nil, stats, ctx.Err() } - - node.Subtree = &sbr.id - return node, stats, nil } func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index ab55a2742..7cc53346c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -12,15 +12,13 @@ import ( "golang.org/x/sync/errgroup" ) -func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) - ch <- SaveBlobResponse{ +func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { + cb(SaveBlobResponse{ id: restic.NewRandomID(), known: false, length: len(buf.Data), sizeInRepo: len(buf.Data), - } - return FutureBlob{ch: ch} + }) } func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) { From b52a8ff05c82e82914d931ae11d5909eb0712bf6 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Oct 2022 20:40:39 +0200 Subject: [PATCH 2/4] ui: Properly clear lines no longer used for status Previously, the old status text remained until it was overwritten. --- internal/ui/termstatus/status.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/internal/ui/termstatus/status.go b/internal/ui/termstatus/status.go index 88c4f898e..fdc7e14f6 100644 --- a/internal/ui/termstatus/status.go +++ b/internal/ui/termstatus/status.go @@ -25,6 +25,7 @@ type Terminal struct { msg chan message status chan status canUpdateStatus bool + lastStatusLen int // will be closed when the goroutine which runs Run() terminates, so it'll // yield a default value immediately @@ -154,6 +155,18 @@ func (t *Terminal) run(ctx context.Context) { } func (t *Terminal) writeStatus(status []string) { + statusLen := len(status) + status = append([]string{}, status...) + for i := len(status); i < t.lastStatusLen; i++ { + // clear no longer used status lines + status = append(status, "") + if i > 0 { + // all lines except the last one must have a line break + status[i-1] = status[i-1] + "\n" + } + } + t.lastStatusLen = statusLen + for _, line := range status { t.clearCurrentLine(t.wr, t.fd) From a571fc4aa1d4762e9d3565b45b4f7591331216ff Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 21 Oct 2022 22:02:41 +0200 Subject: [PATCH 3/4] add changelog for faster backups with small files --- changelog/unreleased/pull-3955 | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 changelog/unreleased/pull-3955 diff --git a/changelog/unreleased/pull-3955 b/changelog/unreleased/pull-3955 new file mode 100644 index 000000000..93ec957b2 --- /dev/null +++ b/changelog/unreleased/pull-3955 @@ -0,0 +1,9 @@ +Enhancement: Improve backup performance for small files + +When backing up small files restic was slower than it could be. In particular +this affected backups using maximum compression. + +This has been fixed by reworking the internal parallelism of the backup +command. + +https://github.com/restic/restic/issues/3955 From c0f34af9db0701104be9120601fe680f4039eb50 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 22 Oct 2022 12:05:49 +0200 Subject: [PATCH 4/4] backup: hide files from status which are read completely but not saved As the FileSaver is asynchronously waiting for all blobs of a file to be stored, the number of active files is higher than the number of files from which restic is reading concurrently. Thus to not confuse users, only display files in the status from which restic is currently reading. --- internal/archiver/archiver.go | 5 +++ internal/archiver/archiver_test.go | 15 ++++++++- internal/archiver/file_saver.go | 47 +++++++++++++++++----------- internal/archiver/file_saver_test.go | 3 +- 4 files changed, 50 insertions(+), 20 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index dda86c11e..37f1a378d 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -68,6 +68,9 @@ type Archiver struct { // be in the snapshot after saving. s contains some statistics about this // particular file/dir. // + // Once reading a file has completed successfully (but not saving it yet), + // CompleteItem will be called with current == nil. + // // CompleteItem may be called asynchronously from several different // goroutines! CompleteItem func(item string, previous, current *restic.Node, s ItemStats, d time.Duration) @@ -431,6 +434,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous // Save will close the file, we don't need to do that fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() { arch.StartFile(snPath) + }, func() { + arch.CompleteItem(snPath, nil, nil, ItemStats{}, 0) }, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) }) diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 86b5386be..60859857e 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -53,6 +53,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem } var ( + completeReadingCallback bool + completeCallbackNode *restic.Node completeCallbackStats ItemStats completeCallback bool @@ -60,6 +62,13 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem startCallback bool ) + completeReading := func() { + completeReadingCallback = true + if completeCallback { + t.Error("callbacks called in wrong order") + } + } + complete := func(node *restic.Node, stats ItemStats) { completeCallback = true completeCallbackNode = node @@ -80,7 +89,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete) + res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, completeReading, complete) fnr := res.take(ctx) if fnr.err != nil { @@ -101,6 +110,10 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Errorf("start callback did not happen") } + if !completeReadingCallback { + t.Errorf("completeReading callback did not happen") + } + if !completeCallback { t.Errorf("complete callback did not happen") } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1c9352ef2..c7303b929 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -67,17 +67,21 @@ func (s *FileSaver) TriggerShutdown() { type CompleteFunc func(*restic.Node, ItemStats) // Save stores the file f and returns the data once it has been completed. The -// file is closed by Save. -func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode { +// file is closed by Save. completeReading is only called if the file was read +// successfully. complete is always called. If completeReading is called, then +// this will always happen before calling complete. +func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), completeReading func(), complete CompleteFunc) FutureNode { fn, ch := newFutureNode() job := saveFileJob{ - snPath: snPath, - target: target, - file: file, - fi: fi, - start: start, - complete: complete, - ch: ch, + snPath: snPath, + target: target, + file: file, + fi: fi, + ch: ch, + + start: start, + completeReading: completeReading, + complete: complete, } select { @@ -92,17 +96,19 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file } type saveFileJob struct { - snPath string - target string - file fs.File - fi os.FileInfo - ch chan<- futureNodeResult - complete CompleteFunc - start func() + snPath string + target string + file fs.File + fi os.FileInfo + ch chan<- futureNodeResult + + start func() + completeReading func() + complete CompleteFunc } // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finish func(res futureNodeResult)) { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finishReading func(), finish func(res futureNodeResult)) { start() fnr := futureNodeResult{ @@ -227,6 +233,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat // after reaching the end of this method remaining += idx + 1 lock.Unlock() + finishReading() completeBlob() } @@ -246,7 +253,11 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func(res futureNodeResult) { + s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func() { + if job.completeReading != nil { + job.completeReading() + } + }, func(res futureNodeResult) { if job.complete != nil { job.complete(res.node, res.stats) } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index dde4356fc..e32250496 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -60,6 +60,7 @@ func TestFileSaver(t *testing.T) { defer cleanup() startFn := func() {} + completeReadingFn := func() {} completeFn := func(*restic.Node, ItemStats) {} testFs := fs.Local{} @@ -78,7 +79,7 @@ func TestFileSaver(t *testing.T) { t.Fatal(err) } - ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn) + ff := s.Save(ctx, filename, filename, f, fi, startFn, completeReadingFn, completeFn) results = append(results, ff) }