diff --git a/changelog/unreleased/pull-3955 b/changelog/unreleased/pull-3955 new file mode 100644 index 000000000..93ec957b2 --- /dev/null +++ b/changelog/unreleased/pull-3955 @@ -0,0 +1,9 @@ +Enhancement: Improve backup performance for small files + +When backing up small files restic was slower than it could be. In particular +this affected backups using maximum compression. + +This has been fixed by reworking the internal parallelism of the backup +command. + +https://github.com/restic/restic/issues/3955 diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index dda86c11e..37f1a378d 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -68,6 +68,9 @@ type Archiver struct { // be in the snapshot after saving. s contains some statistics about this // particular file/dir. // + // Once reading a file has completed successfully (but not saving it yet), + // CompleteItem will be called with current == nil. + // // CompleteItem may be called asynchronously from several different // goroutines! CompleteItem func(item string, previous, current *restic.Node, s ItemStats, d time.Duration) @@ -431,6 +434,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous // Save will close the file, we don't need to do that fn = arch.fileSaver.Save(ctx, snPath, target, file, fi, func() { arch.StartFile(snPath) + }, func() { + arch.CompleteItem(snPath, nil, nil, ItemStats{}, 0) }, func(node *restic.Node, stats ItemStats) { arch.CompleteItem(snPath, previous, node, stats, time.Since(start)) }) diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 86b5386be..60859857e 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -53,6 +53,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem } var ( + completeReadingCallback bool + completeCallbackNode *restic.Node completeCallbackStats ItemStats completeCallback bool @@ -60,6 +62,13 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem startCallback bool ) + completeReading := func() { + completeReadingCallback = true + if completeCallback { + t.Error("callbacks called in wrong order") + } + } + complete := func(node *restic.Node, stats ItemStats) { completeCallback = true completeCallbackNode = node @@ -80,7 +89,7 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(err) } - res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, complete) + res := arch.fileSaver.Save(ctx, "/", filename, file, fi, start, completeReading, complete) fnr := res.take(ctx) if fnr.err != nil { @@ -101,6 +110,10 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Errorf("start callback did not happen") } + if !completeReadingCallback { + t.Errorf("completeReading callback did not happen") + } + if !completeCallback { t.Errorf("complete callback did not happen") } diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index b2b5e59bb..ae4879ff4 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -43,51 +43,18 @@ func (s *BlobSaver) TriggerShutdown() { // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. -func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) +func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { select { - case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case s.ch <- saveBlobJob{BlobType: t, buf: buf, cb: cb}: case <-ctx.Done(): debug.Log("not sending job, context is cancelled") - close(ch) - return FutureBlob{ch: ch} } - - return FutureBlob{ch: ch} -} - -// FutureBlob is returned by SaveBlob and will return the data once it has been processed. -type FutureBlob struct { - ch <-chan SaveBlobResponse -} - -func (s *FutureBlob) Poll() *SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return &res - } - default: - } - return nil -} - -// Take blocks until the result is available or the context is cancelled. -func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse { - select { - case res, ok := <-s.ch: - if ok { - return res - } - case <-ctx.Done(): - } - return SaveBlobResponse{} } type saveBlobJob struct { restic.BlobType buf *Buffer - ch chan<- SaveBlobResponse + cb func(res SaveBlobResponse) } type SaveBlobResponse struct { @@ -128,11 +95,9 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) if err != nil { debug.Log("saveBlob returned error, exiting: %v", err) - close(job.ch) return err } - job.ch <- res - close(job.ch) + job.cb(res) job.buf.Release() } } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index b25478b26..caa1e7c39 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime" + "sync" "sync/atomic" "testing" @@ -45,16 +46,22 @@ func TestBlobSaver(t *testing.T) { b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) - var results []FutureBlob + var wait sync.WaitGroup + var results []SaveBlobResponse + wait.Add(20) for i := 0; i < 20; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - fb := b.Save(ctx, restic.DataBlob, buf) - results = append(results, fb) + idx := i + results = append(results, SaveBlobResponse{}) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) { + results[idx] = res + wait.Done() + }) } - for i, blob := range results { - sbr := blob.Take(ctx) + wait.Wait() + for i, sbr := range results { if sbr.known { t.Errorf("blob %v is known, that should not be the case", i) } @@ -94,7 +101,7 @@ func TestBlobSaverError(t *testing.T) { for i := 0; i < test.blobs; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} - b.Save(ctx, restic.DataBlob, buf) + b.Save(ctx, restic.DataBlob, buf, func(res SaveBlobResponse) {}) } b.TriggerShutdown() diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index fc008945c..c7303b929 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -4,6 +4,7 @@ import ( "context" "io" "os" + "sync" "github.com/restic/chunker" "github.com/restic/restic/internal/debug" @@ -14,7 +15,7 @@ import ( ) // SaveBlobFn saves a blob to a repo. -type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob +type SaveBlobFn func(context.Context, restic.BlobType, *Buffer, func(res SaveBlobResponse)) // FileSaver concurrently saves incoming files to the repo. type FileSaver struct { @@ -66,17 +67,21 @@ func (s *FileSaver) TriggerShutdown() { type CompleteFunc func(*restic.Node, ItemStats) // Save stores the file f and returns the data once it has been completed. The -// file is closed by Save. -func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), complete CompleteFunc) FutureNode { +// file is closed by Save. completeReading is only called if the file was read +// successfully. complete is always called. If completeReading is called, then +// this will always happen before calling complete. +func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, fi os.FileInfo, start func(), completeReading func(), complete CompleteFunc) FutureNode { fn, ch := newFutureNode() job := saveFileJob{ - snPath: snPath, - target: target, - file: file, - fi: fi, - start: start, - complete: complete, - ch: ch, + snPath: snPath, + target: target, + file: file, + fi: fi, + ch: ch, + + start: start, + completeReading: completeReading, + complete: complete, } select { @@ -91,56 +96,79 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, target string, file } type saveFileJob struct { - snPath string - target string - file fs.File - fi os.FileInfo - ch chan<- futureNodeResult - complete CompleteFunc - start func() + snPath string + target string + file fs.File + fi os.FileInfo + ch chan<- futureNodeResult + + start func() + completeReading func() + complete CompleteFunc } // saveFile stores the file f in the repo, then closes it. -func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func()) futureNodeResult { +func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, fi os.FileInfo, start func(), finishReading func(), finish func(res futureNodeResult)) { start() - stats := ItemStats{} fnr := futureNodeResult{ snPath: snPath, target: target, } + var lock sync.Mutex + remaining := 0 + isCompleted := false + + completeBlob := func() { + lock.Lock() + defer lock.Unlock() + + remaining-- + if remaining == 0 && fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + finish(fnr) + } + } + completeError := func(err error) { + lock.Lock() + defer lock.Unlock() + + if fnr.err == nil { + if isCompleted { + panic("completed twice") + } + isCompleted = true + fnr.err = err + fnr.node = nil + fnr.stats = ItemStats{} + finish(fnr) + } + } debug.Log("%v", snPath) node, err := s.NodeFromFileInfo(snPath, f.Name(), fi) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } if node.Type != "file" { _ = f.Close() - fnr.err = errors.Errorf("node type %q is wrong", node.Type) - return fnr + completeError(errors.Errorf("node type %q is wrong", node.Type)) + return } // reuse the chunker chnker.Reset(f, s.pol) - var results []FutureBlob - complete := func(sbr SaveBlobResponse) { - if !sbr.known { - stats.DataBlobs++ - stats.DataSize += uint64(sbr.length) - stats.DataSizeInRepo += uint64(sbr.sizeInRepo) - } - - node.Content = append(node.Content, sbr.id) - } - node.Content = []restic.ID{} - var size uint64 + node.Size = 0 + var idx int for { buf := s.saveFilePool.Get() chunk, err := chnker.Next(buf.Data) @@ -150,62 +178,63 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat } buf.Data = chunk.Data - - size += uint64(chunk.Length) + node.Size += uint64(chunk.Length) if err != nil { _ = f.Close() - fnr.err = err - return fnr + completeError(err) + return } + // test if the context has been cancelled, return the error + if ctx.Err() != nil { + _ = f.Close() + completeError(ctx.Err()) + return + } + + // add a place to store the saveBlob result + pos := idx + node.Content = append(node.Content, restic.ID{}) + + s.saveBlob(ctx, restic.DataBlob, buf, func(sbr SaveBlobResponse) { + lock.Lock() + if !sbr.known { + fnr.stats.DataBlobs++ + fnr.stats.DataSize += uint64(sbr.length) + fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Content[pos] = sbr.id + lock.Unlock() + + completeBlob() + }) + idx++ // test if the context has been cancelled, return the error if ctx.Err() != nil { _ = f.Close() - fnr.err = ctx.Err() - return fnr - } - - res := s.saveBlob(ctx, restic.DataBlob, buf) - results = append(results, res) - - // test if the context has been cancelled, return the error - if ctx.Err() != nil { - _ = f.Close() - fnr.err = ctx.Err() - return fnr + completeError(ctx.Err()) + return } s.CompleteBlob(uint64(len(chunk.Data))) - - // collect already completed blobs - for len(results) > 0 { - sbr := results[0].Poll() - if sbr == nil { - break - } - results[0] = FutureBlob{} - results = results[1:] - complete(*sbr) - } } err = f.Close() if err != nil { - fnr.err = err - return fnr + completeError(err) + return } - for i, res := range results { - results[i] = FutureBlob{} - sbr := res.Take(ctx) - complete(sbr) - } - - node.Size = size fnr.node = node - fnr.stats = stats - return fnr + lock.Lock() + // require one additional completeFuture() call to ensure that the future only completes + // after reaching the end of this method + remaining += idx + 1 + lock.Unlock() + finishReading() + completeBlob() } func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { @@ -224,11 +253,16 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { } } - res := s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start) - if job.complete != nil { - job.complete(res.node, res.stats) - } - job.ch <- res - close(job.ch) + s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.fi, job.start, func() { + if job.completeReading != nil { + job.completeReading() + } + }, func(res futureNodeResult) { + if job.complete != nil { + job.complete(res.node, res.stats) + } + job.ch <- res + close(job.ch) + }) } } diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index a311216c7..e32250496 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -34,10 +34,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) { wg, ctx := errgroup.WithContext(ctx) - saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse) - close(ch) - return FutureBlob{ch: ch} + saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer, cb func(SaveBlobResponse)) { + cb(SaveBlobResponse{}) } workers := uint(runtime.NumCPU()) @@ -62,6 +60,7 @@ func TestFileSaver(t *testing.T) { defer cleanup() startFn := func() {} + completeReadingFn := func() {} completeFn := func(*restic.Node, ItemStats) {} testFs := fs.Local{} @@ -80,7 +79,7 @@ func TestFileSaver(t *testing.T) { t.Fatal(err) } - ff := s.Save(ctx, filename, filename, f, fi, startFn, completeFn) + ff := s.Save(ctx, filename, filename, f, fi, startFn, completeReadingFn, completeFn) results = append(results, ff) } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 180f987cd..d25781b03 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -11,7 +11,7 @@ import ( // TreeSaver concurrently saves incoming trees to the repo. type TreeSaver struct { - saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob + saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) errFn ErrorFunc ch chan<- saveTreeJob @@ -19,7 +19,7 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob, errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveBlob func(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -124,21 +124,24 @@ func (s *TreeSaver) save(ctx context.Context, job *saveTreeJob) (*restic.Node, I } b := &Buffer{Data: buf} - res := s.saveBlob(ctx, restic.TreeBlob, b) + ch := make(chan SaveBlobResponse, 1) + s.saveBlob(ctx, restic.TreeBlob, b, func(res SaveBlobResponse) { + ch <- res + }) - sbr := res.Take(ctx) - if !sbr.known { - stats.TreeBlobs++ - stats.TreeSize += uint64(sbr.length) - stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) - } - // The context was canceled in the meantime, id might be invalid - if ctx.Err() != nil { + select { + case sbr := <-ch: + if !sbr.known { + stats.TreeBlobs++ + stats.TreeSize += uint64(sbr.length) + stats.TreeSizeInRepo += uint64(sbr.sizeInRepo) + } + + node.Subtree = &sbr.id + return node, stats, nil + case <-ctx.Done(): return nil, stats, ctx.Err() } - - node.Subtree = &sbr.id - return node, stats, nil } func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index ab55a2742..7cc53346c 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -12,15 +12,13 @@ import ( "golang.org/x/sync/errgroup" ) -func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { - ch := make(chan SaveBlobResponse, 1) - ch <- SaveBlobResponse{ +func treeSaveHelper(ctx context.Context, t restic.BlobType, buf *Buffer, cb func(res SaveBlobResponse)) { + cb(SaveBlobResponse{ id: restic.NewRandomID(), known: false, length: len(buf.Data), sizeInRepo: len(buf.Data), - } - return FutureBlob{ch: ch} + }) } func setupTreeSaver() (context.Context, context.CancelFunc, *TreeSaver, func() error) { diff --git a/internal/ui/termstatus/status.go b/internal/ui/termstatus/status.go index 88c4f898e..fdc7e14f6 100644 --- a/internal/ui/termstatus/status.go +++ b/internal/ui/termstatus/status.go @@ -25,6 +25,7 @@ type Terminal struct { msg chan message status chan status canUpdateStatus bool + lastStatusLen int // will be closed when the goroutine which runs Run() terminates, so it'll // yield a default value immediately @@ -154,6 +155,18 @@ func (t *Terminal) run(ctx context.Context) { } func (t *Terminal) writeStatus(status []string) { + statusLen := len(status) + status = append([]string{}, status...) + for i := len(status); i < t.lastStatusLen; i++ { + // clear no longer used status lines + status = append(status, "") + if i > 0 { + // all lines except the last one must have a line break + status[i-1] = status[i-1] + "\n" + } + } + t.lastStatusLen = statusLen + for _, line := range status { t.clearCurrentLine(t.wr, t.fd)