From b373f164fe3a57074942e2e5e60e456afe7cf2a2 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 20 Sep 2020 00:45:11 +0200 Subject: [PATCH 1/6] prune: Parallelize repack command --- changelog/unreleased/pull-2941 | 8 ++ internal/repository/repack.go | 210 +++++++++++++++++++++++---------- 2 files changed, 153 insertions(+), 65 deletions(-) create mode 100644 changelog/unreleased/pull-2941 diff --git a/changelog/unreleased/pull-2941 b/changelog/unreleased/pull-2941 new file mode 100644 index 000000000..4a73c31ba --- /dev/null +++ b/changelog/unreleased/pull-2941 @@ -0,0 +1,8 @@ +Enhancement: Speed up repacking step of prune command + +The repack step of the prune command, which moves still used file parts into +new pack files such that the old ones can be garbage collected later on, now +processes multiple pack files in parallel. This is especially beneficial for +high latency backends or when using a fast network connection. + +https://github.com/restic/restic/pull/2941 diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 1a579d00b..3d0651f2c 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -2,14 +2,19 @@ package repository import ( "context" + "os" + "sync" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" ) +const numRepackWorkers = 8 + // Repack takes a list of packs together with a list of blobs contained in // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved // into a new pack. Returned is the list of obsolete packs which can then @@ -22,91 +27,166 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) - for packID := range packs { - // load the complete pack into a temp file - h := restic.Handle{Type: restic.PackFile, Name: packID.String()} + wg, ctx := errgroup.WithContext(ctx) - tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h) - if err != nil { - return nil, errors.Wrap(err, "Repack") - } - - debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) - - if !packID.Equal(hash) { - return nil, errors.Errorf("hash does not match id: want %v, got %v", packID, hash) - } - - _, err = tempfile.Seek(0, 0) - if err != nil { - return nil, errors.Wrap(err, "Seek") - } - - blobs, err := pack.List(repo.Key(), tempfile, packLength) - if err != nil { - return nil, err - } - - debug.Log("processing pack %v, blobs: %v", packID, len(blobs)) - var buf []byte - for _, entry := range blobs { - h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} - if !keepBlobs.Has(h) { - continue + downloadQueue := make(chan restic.ID) + wg.Go(func() error { + defer close(downloadQueue) + for packID := range packs { + select { + case downloadQueue <- packID: + case <-ctx.Done(): + return ctx.Err() } + } + return nil + }) - debug.Log(" process blob %v", h) + type repackJob struct { + tempfile *os.File + hash restic.ID + packLength int64 + } + processQueue := make(chan repackJob) + // used to close processQueue once all downloaders have finished + var downloadWG sync.WaitGroup - if uint(cap(buf)) < entry.Length { - buf = make([]byte, entry.Length) - } - buf = buf[:entry.Length] + downloader := func() error { + defer downloadWG.Done() + for packID := range downloadQueue { + // load the complete pack into a temp file + h := restic.Handle{Type: restic.PackFile, Name: packID.String()} - n, err := tempfile.ReadAt(buf, int64(entry.Offset)) + tempfile, hash, packLength, err := DownloadAndHash(ctx, repo.Backend(), h) if err != nil { - return nil, errors.Wrap(err, "ReadAt") + return errors.Wrap(err, "Repack") } - if n != len(buf) { - return nil, errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, tempfile.Name(), len(buf), n) + debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) + + if !packID.Equal(hash) { + return errors.Errorf("hash does not match id: want %v, got %v", packID, hash) } - nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():] - plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil) + select { + case processQueue <- repackJob{tempfile, hash, packLength}: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil + } + + downloadWG.Add(numRepackWorkers) + for i := 0; i < numRepackWorkers; i++ { + wg.Go(downloader) + } + wg.Go(func() error { + downloadWG.Wait() + close(processQueue) + return nil + }) + + var keepMutex sync.Mutex + worker := func() error { + for job := range processQueue { + tempfile, packID, packLength := job.tempfile, job.hash, job.packLength + + _, err = tempfile.Seek(0, 0) if err != nil { - return nil, err + return errors.Wrap(err, "Seek") } - id := restic.Hash(plaintext) - if !id.Equal(entry.ID) { - debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", - h.Type, h.ID, tempfile.Name(), id) - return nil, errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", - h, tempfile.Name(), id) - } - - // We do want to save already saved blobs! - _, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true) + blobs, err := pack.List(repo.Key(), tempfile, packLength) if err != nil { - return nil, err + return err } - debug.Log(" saved blob %v", entry.ID) + debug.Log("processing pack %v, blobs: %v", packID, len(blobs)) + var buf []byte + for _, entry := range blobs { + h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} - keepBlobs.Delete(h) - } + keepMutex.Lock() + shouldKeep := keepBlobs.Has(h) + keepMutex.Unlock() - if err = tempfile.Close(); err != nil { - return nil, errors.Wrap(err, "Close") - } + if !shouldKeep { + continue + } - if err = fs.RemoveIfExists(tempfile.Name()); err != nil { - return nil, errors.Wrap(err, "Remove") - } - if p != nil { - p.Report(restic.Stat{Blobs: 1}) + debug.Log(" process blob %v", h) + + if uint(cap(buf)) < entry.Length { + buf = make([]byte, entry.Length) + } + buf = buf[:entry.Length] + + n, err := tempfile.ReadAt(buf, int64(entry.Offset)) + if err != nil { + return errors.Wrap(err, "ReadAt") + } + + if n != len(buf) { + return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", + h, tempfile.Name(), len(buf), n) + } + + nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():] + plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil) + if err != nil { + return err + } + + id := restic.Hash(plaintext) + if !id.Equal(entry.ID) { + debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", + h.Type, h.ID, tempfile.Name(), id) + return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", + h, tempfile.Name(), id) + } + + keepMutex.Lock() + // recheck whether some other worker was faster + shouldKeep = keepBlobs.Has(h) + if shouldKeep { + keepBlobs.Delete(h) + } + keepMutex.Unlock() + + if !shouldKeep { + continue + } + + // We do want to save already saved blobs! + _, _, err = repo.SaveBlob(ctx, entry.Type, plaintext, entry.ID, true) + if err != nil { + return err + } + + debug.Log(" saved blob %v", entry.ID) + } + + if err = tempfile.Close(); err != nil { + return errors.Wrap(err, "Close") + } + + if err = fs.RemoveIfExists(tempfile.Name()); err != nil { + return errors.Wrap(err, "Remove") + } + if p != nil { + p.Report(restic.Stat{Blobs: 1}) + } } + return nil + } + + for i := 0; i < numRepackWorkers; i++ { + wg.Go(worker) + } + + if err := wg.Wait(); err != nil { + return nil, err } if err := repo.Flush(ctx); err != nil { From ee0112ab3b1603c83e92f3fce924ec4c8f90f411 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 2 Nov 2020 12:53:45 +0100 Subject: [PATCH 2/6] Clarify message about expected error --- internal/repository/repack_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 519037b0d..526fa98ea 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -277,5 +277,5 @@ func TestRepackWrongBlob(t *testing.T) { if err == nil { t.Fatal("expected repack to fail but got no error") } - t.Log(err) + t.Logf("found expected error: %v", err) } From 7def2d8ea7e2d71a49057e362ff1f2771425b3e0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 2 Nov 2020 12:55:34 +0100 Subject: [PATCH 3/6] Use a non-constant seed --- internal/repository/repack_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 526fa98ea..87470d815 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -4,6 +4,7 @@ import ( "context" "math/rand" "testing" + "time" "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" @@ -195,7 +196,7 @@ func TestRepack(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() - seed := rand.Int63() + seed := time.Now().UnixNano() rand.Seed(seed) t.Logf("rand seed is %v", seed) @@ -262,7 +263,7 @@ func TestRepackWrongBlob(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() - seed := rand.Int63() + seed := time.Now().UnixNano() rand.Seed(seed) t.Logf("rand seed is %v", seed) From a4507610a06f3acdc8949d1e19c2531dcac0b392 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 2 Nov 2020 12:59:38 +0100 Subject: [PATCH 4/6] Fix typo --- internal/repository/repository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 5a3d38fec..77299c277 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -758,7 +758,7 @@ type Loader interface { // DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location // and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file; -// it is reponsibility of the caller to close and delete the file. +// it is the reponsibility of the caller to close and delete the file. func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) { tmpfile, err = fs.TempFile("", "restic-temp-") if err != nil { From 866a52ad4e9bbb9542bdfc1fcd8eb1e27f9f2e9b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 2 Nov 2020 13:15:05 +0100 Subject: [PATCH 5/6] Remove unneeded seek The file returned from DownloadAndHash() is already seeked to the start of the file. --- internal/repository/repack.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 3d0651f2c..f1f091222 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -92,11 +92,6 @@ func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, kee for job := range processQueue { tempfile, packID, packLength := job.tempfile, job.hash, job.packLength - _, err = tempfile.Seek(0, 0) - if err != nil { - return errors.Wrap(err, "Seek") - } - blobs, err := pack.List(repo.Key(), tempfile, packLength) if err != nil { return err From ae5302c7a885d052e627b9ad06c85bffa657cc88 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Thu, 5 Nov 2020 10:33:38 +0100 Subject: [PATCH 6/6] Add comment that keepBlobs is modified --- internal/repository/repack.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index f1f091222..7b7ba8141 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -19,6 +19,9 @@ const numRepackWorkers = 8 // these packs. Each pack is loaded and the blobs listed in keepBlobs is saved // into a new pack. Returned is the list of obsolete packs which can then // be removed. +// +// The map keepBlobs is modified by Repack, it is used to keep track of which +// blobs have been processed. func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *restic.Progress) (obsoletePacks restic.IDSet, err error) { if p != nil { p.Start()