restic/internal/repository/repack.go

131 lines
3.5 KiB
Go
Raw Normal View History

package repository
import (
2017-06-04 09:16:55 +00:00
"context"
2020-09-19 22:45:11 +00:00
"sync"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
2017-07-24 15:42:25 +00:00
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
2020-09-19 22:45:11 +00:00
"golang.org/x/sync/errgroup"
)
// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
// be removed.
2020-11-05 09:33:38 +00:00
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
2016-09-27 20:35:08 +00:00
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
if repo == dstRepo && dstRepo.Backend().Connections() < 2 {
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
}
2020-11-05 16:04:42 +00:00
wg, wgCtx := errgroup.WithContext(ctx)
dstRepo.StartPackUploader(wgCtx, wg)
wg.Go(func() error {
var err error
obsoletePacks, err = repack(wgCtx, repo, dstRepo, packs, keepBlobs, p)
return err
})
if err := wg.Wait(); err != nil {
return nil, err
}
return obsoletePacks, nil
}
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
wg, wgCtx := errgroup.WithContext(ctx)
var keepMutex sync.Mutex
downloadQueue := make(chan restic.PackBlobs)
2020-09-19 22:45:11 +00:00
wg.Go(func() error {
defer close(downloadQueue)
for pbs := range repo.Index().ListPacks(wgCtx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
2020-09-19 22:45:11 +00:00
select {
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
2020-11-05 16:04:42 +00:00
case <-wgCtx.Done():
return wgCtx.Err()
}
2020-09-19 22:45:11 +00:00
}
return nil
})
worker := func() error {
for t := range downloadQueue {
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
2020-09-19 22:45:11 +00:00
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
2020-09-19 22:45:11 +00:00
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep := keepBlobs.Has(blob)
2020-09-19 22:45:11 +00:00
if shouldKeep {
keepBlobs.Delete(blob)
2020-09-19 22:45:11 +00:00
}
keepMutex.Unlock()
if !shouldKeep {
return nil
2020-09-19 22:45:11 +00:00
}
// We do want to save already saved blobs!
2022-05-01 12:26:57 +00:00
_, _, _, err = dstRepo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
2020-09-19 22:45:11 +00:00
if err != nil {
return err
}
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
2020-09-19 22:45:11 +00:00
}
p.Add(1)
}
2020-09-19 22:45:11 +00:00
return nil
}
// as packs are streamed the concurrency is limited by IO
// reduce by one to ensure that uploading is always possible
repackWorkerCount := int(repo.Connections() - 1)
for i := 0; i < repackWorkerCount; i++ {
2020-09-19 22:45:11 +00:00
wg.Go(worker)
}
2020-09-19 22:45:11 +00:00
if err := wg.Wait(); err != nil {
return nil, err
}
if err := dstRepo.Flush(ctx); err != nil {
return nil, err
}
return packs, nil
}