diff --git a/changelog/unreleased/pull-3130 b/changelog/unreleased/pull-3130 new file mode 100644 index 000000000..9a71133a9 --- /dev/null +++ b/changelog/unreleased/pull-3130 @@ -0,0 +1,9 @@ +Enhancement: Parallelize reading of snapshots + +Restic used to read snapshots sequentially. For repositories containing +many snapshots this slowed down commands which have to read all snapshots. +Now the reading of snapshots is parallelized. This speeds up for example +`prune`, `backup` and other commands that search for snapshots with certain +properties or which have to find the `latest` snapshot. + +https://github.com/restic/restic/pull/3130 diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 68213759e..0eb1c8e87 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -55,8 +55,7 @@ func prettyPrintJSON(wr io.Writer, item interface{}) error { } func debugPrintSnapshots(ctx context.Context, repo *repository.Repository, wr io.Writer) error { - return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error { - snapshot, err := restic.LoadSnapshot(ctx, repo, id) + return restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, snapshot *restic.Snapshot, err error) error { if err != nil { return err } diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index ea33cceb6..fd051e605 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -159,19 +159,13 @@ func runPruneWithRepo(opts PruneOptions, gopts GlobalOptions, repo *repository.R Print("warning: running prune without a cache, this may be very slow!\n") } - Verbosef("loading all snapshots...\n") - snapshots, err := restic.LoadAllSnapshots(gopts.ctx, repo, ignoreSnapshots) - if err != nil { - return err - } - Verbosef("loading indexes...\n") - err = repo.LoadIndex(gopts.ctx) + err := repo.LoadIndex(gopts.ctx) if err != nil { return err } - usedBlobs, err := getUsedBlobs(gopts, repo, snapshots) + usedBlobs, err := getUsedBlobs(gopts, repo, ignoreSnapshots) if err != nil { return err } @@ -537,19 +531,34 @@ func rebuildIndexFiles(gopts GlobalOptions, repo restic.Repository, removePacks return DeleteFilesChecked(gopts, repo, obsoleteIndexes, restic.IndexFile) } -func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*restic.Snapshot) (usedBlobs restic.BlobSet, err error) { +func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.BlobSet, err error) { ctx := gopts.ctx - Verbosef("finding data that is still in use for %d snapshots\n", len(snapshots)) + var snapshotTrees restic.IDs + Verbosef("loading all snapshots...\n") + err = restic.ForAllSnapshots(gopts.ctx, repo, ignoreSnapshots, + func(id restic.ID, sn *restic.Snapshot, err error) error { + debug.Log("add snapshot %v (tree %v, error %v)", id, *sn.Tree, err) + if err != nil { + return err + } + snapshotTrees = append(snapshotTrees, *sn.Tree) + return nil + }) + if err != nil { + return nil, err + } + + Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees)) usedBlobs = restic.NewBlobSet() - bar := newProgressMax(!gopts.Quiet, uint64(len(snapshots)), "snapshots") + bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots") defer bar.Done() - for _, sn := range snapshots { - debug.Log("process snapshot %v", sn.ID()) + for _, tree := range snapshotTrees { + debug.Log("process tree %v", tree) - err = restic.FindUsedBlobs(ctx, repo, *sn.Tree, usedBlobs) + err = restic.FindUsedBlobs(ctx, repo, tree, usedBlobs) if err != nil { if repo.Backend().IsNotExist(err) { return nil, errors.Fatal("unable to load a tree from the repo: " + err.Error()) @@ -558,7 +567,7 @@ func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, snapshots []*rest return nil, err } - debug.Log("processed snapshot %v", sn.ID()) + debug.Log("processed tree %v", tree) bar.Add(1) } return usedBlobs, nil diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 9b7ddaad4..309f932f5 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -298,86 +298,6 @@ func (e Error) Error() string { return e.Err.Error() } -func loadTreeFromSnapshot(ctx context.Context, repo restic.Repository, id restic.ID) (restic.ID, error) { - sn, err := restic.LoadSnapshot(ctx, repo, id) - if err != nil { - debug.Log("error loading snapshot %v: %v", id, err) - return restic.ID{}, err - } - - if sn.Tree == nil { - debug.Log("snapshot %v has no tree", id) - return restic.ID{}, errors.Errorf("snapshot %v has no tree", id) - } - - return *sn.Tree, nil -} - -// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs. -func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.IDs, []error) { - var trees struct { - IDs restic.IDs - sync.Mutex - } - - var errs struct { - errs []error - sync.Mutex - } - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) - - ch := make(chan restic.ID) - - // send list of index files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return repo.List(ctx, restic.SnapshotFile, func(id restic.ID, size int64) error { - select { - case <-ctx.Done(): - return nil - case ch <- id: - } - return nil - }) - }) - - // a worker receives an index ID from ch, loads the snapshot and the tree, - // and adds the result to errs and trees. - worker := func() error { - for id := range ch { - debug.Log("load snapshot %v", id) - - treeID, err := loadTreeFromSnapshot(ctx, repo, id) - if err != nil { - errs.Lock() - errs.errs = append(errs.errs, err) - errs.Unlock() - continue - } - - debug.Log("snapshot %v has tree %v", id, treeID) - trees.Lock() - trees.IDs = append(trees.IDs, treeID) - trees.Unlock() - } - return nil - } - - for i := 0; i < defaultParallelism; i++ { - wg.Go(worker) - } - - err := wg.Wait() - if err != nil { - errs.errs = append(errs.errs, err) - } - - return trees.IDs, errs.errs -} - // TreeError collects several errors that occurred while processing a tree. type TreeError struct { ID restic.ID @@ -586,6 +506,24 @@ func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderCha } } +func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) { + err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error { + if err != nil { + errs = append(errs, err) + return nil + } + treeID := *sn.Tree + debug.Log("snapshot %v has tree %v", id, treeID) + ids = append(ids, treeID) + return nil + }) + if err != nil { + errs = append(errs, err) + } + + return ids, errs +} + // Structure checks that for all snapshots all referenced data blobs and // subtrees are available in the index. errChan is closed after all trees have // been traversed. diff --git a/internal/restic/snapshot.go b/internal/restic/snapshot.go index 86e98e234..647bf57e6 100644 --- a/internal/restic/snapshot.go +++ b/internal/restic/snapshot.go @@ -5,8 +5,11 @@ import ( "fmt" "os/user" "path/filepath" + "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/restic/restic/internal/debug" ) @@ -66,27 +69,61 @@ func LoadSnapshot(ctx context.Context, repo Repository, id ID) (*Snapshot, error return sn, nil } -// LoadAllSnapshots returns a list of all snapshots in the repo. -// If a snapshot ID is in excludeIDs, it will not be included in the result. -func LoadAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet) (snapshots []*Snapshot, err error) { - err = repo.List(ctx, SnapshotFile, func(id ID, size int64) error { - if excludeIDs.Has(id) { - return nil - } - sn, err := LoadSnapshot(ctx, repo, id) - if err != nil { - return err - } +const loadSnapshotParallelism = 5 - snapshots = append(snapshots, sn) - return nil +// ForAllSnapshots reads all snapshots in parallel and calls the +// given function. It is guaranteed that the function is not run concurrently. +// If the called function returns an error, this function is cancelled and +// also returns this error. +// If a snapshot ID is in excludeIDs, it will be ignored. +func ForAllSnapshots(ctx context.Context, repo Repository, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error { + var m sync.Mutex + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + ch := make(chan ID) + + // send list of snapshot files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return repo.List(ctx, SnapshotFile, func(id ID, size int64) error { + if excludeIDs.Has(id) { + return nil + } + + select { + case <-ctx.Done(): + return nil + case ch <- id: + } + return nil + }) }) - if err != nil { - return nil, err + // a worker receives an snapshot ID from ch, loads the snapshot + // and runs fn with id, the snapshot and the error + worker := func() error { + for id := range ch { + debug.Log("load snapshot %v", id) + sn, err := LoadSnapshot(ctx, repo, id) + + m.Lock() + err = fn(id, sn, err) + m.Unlock() + if err != nil { + return err + } + } + return nil } - return snapshots, nil + for i := 0; i < loadSnapshotParallelism; i++ { + wg.Go(worker) + } + + return wg.Wait() } func (sn Snapshot) String() string { diff --git a/internal/restic/snapshot_find.go b/internal/restic/snapshot_find.go index 50395c814..fe9b943de 100644 --- a/internal/restic/snapshot_find.go +++ b/internal/restic/snapshot_find.go @@ -33,10 +33,9 @@ func FindLatestSnapshot(ctx context.Context, repo Repository, targets []string, found bool ) - err = repo.List(ctx, SnapshotFile, func(snapshotID ID, size int64) error { - snapshot, err := LoadSnapshot(ctx, repo, snapshotID) + err = ForAllSnapshots(ctx, repo, nil, func(id ID, snapshot *Snapshot, err error) error { if err != nil { - return errors.Errorf("Error loading snapshot %v: %v", snapshotID.Str(), err) + return errors.Errorf("Error loading snapshot %v: %v", id.Str(), err) } if snapshot.Time.Before(latest) { @@ -56,7 +55,7 @@ func FindLatestSnapshot(ctx context.Context, repo Repository, targets []string, } latest = snapshot.Time - latestID = snapshotID + latestID = id found = true return nil }) @@ -90,8 +89,7 @@ func FindSnapshot(ctx context.Context, repo Repository, s string) (ID, error) { func FindFilteredSnapshots(ctx context.Context, repo Repository, hosts []string, tags []TagList, paths []string) (Snapshots, error) { results := make(Snapshots, 0, 20) - err := repo.List(ctx, SnapshotFile, func(id ID, size int64) error { - sn, err := LoadSnapshot(ctx, repo, id) + err := ForAllSnapshots(ctx, repo, nil, func(id ID, sn *Snapshot, err error) error { if err != nil { fmt.Fprintf(os.Stderr, "could not load snapshot %v: %v\n", id.Str(), err) return nil diff --git a/internal/restic/testing_test.go b/internal/restic/testing_test.go index c3989f55f..5607dc5fa 100644 --- a/internal/restic/testing_test.go +++ b/internal/restic/testing_test.go @@ -17,6 +17,25 @@ const ( testDepth = 2 ) +// LoadAllSnapshots returns a list of all snapshots in the repo. +// If a snapshot ID is in excludeIDs, it will not be included in the result. +func loadAllSnapshots(ctx context.Context, repo restic.Repository, excludeIDs restic.IDSet) (snapshots restic.Snapshots, err error) { + err = restic.ForAllSnapshots(ctx, repo, excludeIDs, func(id restic.ID, sn *restic.Snapshot, err error) error { + if err != nil { + return err + } + + snapshots = append(snapshots, sn) + return nil + }) + + if err != nil { + return nil, err + } + + return snapshots, nil +} + func TestCreateSnapshot(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() @@ -25,7 +44,7 @@ func TestCreateSnapshot(t *testing.T) { restic.TestCreateSnapshot(t, repo, testSnapshotTime.Add(time.Duration(i)*time.Second), testDepth, 0) } - snapshots, err := restic.LoadAllSnapshots(context.TODO(), repo, restic.NewIDSet()) + snapshots, err := loadAllSnapshots(context.TODO(), repo, restic.NewIDSet()) if err != nil { t.Fatal(err) }