diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index 0daee7fca..5f1c07ab2 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -467,7 +467,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc // remember which packs were found in the index indexPackIDs := make(map[string]struct{}) - for pb := range f.repo.Index().Each(wctx) { + f.repo.Index().Each(wctx, func(pb restic.PackedBlob) { idStr := pb.PackID.String() // keep entry in packIDs as Each() returns individual index entries matchingID := false @@ -485,7 +485,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc f.blobIDs[pb.ID.String()] = struct{}{} indexPackIDs[idStr] = struct{}{} } - } + }) for id := range indexPackIDs { delete(packIDs, id) diff --git a/cmd/restic/cmd_list.go b/cmd/restic/cmd_list.go index 811b17e41..3feb8f3e7 100644 --- a/cmd/restic/cmd_list.go +++ b/cmd/restic/cmd_list.go @@ -64,9 +64,9 @@ func runList(cmd *cobra.Command, opts GlobalOptions, args []string) error { if err != nil { return err } - for blobs := range idx.Each(opts.ctx) { + idx.Each(opts.ctx, func(blobs restic.PackedBlob) { Printf("%v %v\n", blobs.Type, blobs.ID) - } + }) return nil }) default: diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 7421bc0cc..ffc442d18 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -279,12 +279,12 @@ func planPrune(opts PruneOptions, gopts GlobalOptions, repo restic.Repository, i if len(plan.repackPacks) != 0 { // when repacking, we do not want to keep blobs which are // already contained in kept packs, so delete them from keepBlobs - for blob := range repo.Index().Each(ctx) { + repo.Index().Each(ctx, func(blob restic.PackedBlob) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { - continue + return } keepBlobs.Delete(blob.BlobHandle) - } + }) } else { // keepBlobs is only needed if packs are repacked keepBlobs = nil @@ -299,7 +299,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re duplicateBlobs := make(map[restic.BlobHandle]uint8) // iterate over all blobs in index to find out which blobs are duplicates - for blob := range idx.Each(ctx) { + idx.Each(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle size := uint64(blob.Length) switch { @@ -325,7 +325,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re stats.size.unused += size stats.blobs.unused++ } - } + }) // Check if all used blobs have been found in index if len(usedBlobs) != 0 { @@ -346,7 +346,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re } // iterate over all blobs in index to generate packInfo - for blob := range idx.Each(ctx) { + idx.Each(ctx, func(blob restic.PackedBlob) { ip := indexPack[blob.PackID] // Set blob type if not yet set @@ -376,7 +376,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re } // update indexPack indexPack[blob.PackID] = ip - } + }) // if duplicate blobs exist, those will be set to either "used" or "unused": // - mark only one occurence of duplicate blobs as used @@ -384,11 +384,11 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re // - if there are no used blobs in a pack, possibly mark duplicates as "unused" if len(duplicateBlobs) > 0 { // iterate again over all blobs in index (this is pretty cheap, all in-mem) - for blob := range idx.Each(ctx) { + idx.Each(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, isDuplicate := duplicateBlobs[bh] if !isDuplicate { - continue + return } ip := indexPack[blob.PackID] @@ -412,7 +412,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re } // update indexPack indexPack[blob.PackID] = ip - } + }) } return keepBlobs, indexPack, nil diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index 9f6d2061d..d20bde036 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -66,11 +66,11 @@ func runRecover(gopts GlobalOptions) error { // tree. If it is not referenced, we have a root tree. trees := make(map[restic.ID]bool) - for blob := range repo.Index().Each(gopts.ctx) { + repo.Index().Each(gopts.ctx, func(blob restic.PackedBlob) { if blob.Type == restic.TreeBlob { trees[blob.Blob.ID] = false } - } + }) Verbosef("load %d trees\n", len(trees)) bar := newProgressMax(!gopts.Quiet, uint64(len(trees)), "trees loaded") diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 3f1df2544..033267cc5 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -444,11 +444,11 @@ func removePacksExcept(gopts GlobalOptions, t *testing.T, keep restic.IDSet, rem rtest.OK(t, r.LoadIndex(gopts.ctx)) treePacks := restic.NewIDSet() - for pb := range r.Index().Each(context.TODO()) { + r.Index().Each(context.TODO(), func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } - } + }) // remove all packs containing data blobs rtest.OK(t, r.List(gopts.ctx, restic.PackFile, func(id restic.ID, size int64) error { @@ -506,11 +506,11 @@ func TestBackupTreeLoadError(t *testing.T) { rtest.OK(t, err) rtest.OK(t, r.LoadIndex(env.gopts.ctx)) treePacks := restic.NewIDSet() - for pb := range r.Index().Each(context.TODO()) { + r.Index().Each(context.TODO(), func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } - } + }) testRunBackup(t, filepath.Dir(env.testdata), []string{filepath.Base(env.testdata)}, opts, env.gopts) testRunCheck(t, env.gopts) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 0e4310c95..9209c2f15 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -124,14 +124,14 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) { debug.Log("process blobs") cnt := 0 - for blob := range index.Each(ctx) { + index.Each(ctx, func(blob restic.PackedBlob) { cnt++ if _, ok := packToIndex[blob.PackID]; !ok { packToIndex[blob.PackID] = restic.NewIDSet() } packToIndex[blob.PackID].Insert(id) - } + }) debug.Log("%d blobs processed", cnt) return nil @@ -458,13 +458,13 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles) { ctx, cancel := context.WithCancel(ctx) defer cancel() - for blob := range c.repo.Index().Each(ctx) { + c.repo.Index().Each(ctx, func(blob restic.PackedBlob) { h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} if !c.blobRefs.M.Has(h) { debug.Log("blob %v not referenced", h) blobs = append(blobs, h) } - } + }) return blobs } diff --git a/internal/pack/pack.go b/internal/pack/pack.go index 11be41697..34ad9d071 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -370,7 +370,7 @@ func CalculateHeaderSize(blobs []restic.Blob) int { func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.ID]int64 { packSize := make(map[restic.ID]int64) - for blob := range mi.Each(ctx) { + mi.Each(ctx, func(blob restic.PackedBlob) { size, ok := packSize[blob.PackID] if !ok { size = headerSize @@ -379,7 +379,7 @@ func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) map[restic.I size += int64(blob.Length) } packSize[blob.PackID] = size + int64(CalculateEntrySize(blob.Blob)) - } + }) return packSize } diff --git a/internal/repository/index.go b/internal/repository/index.go index 22528f296..69903c872 100644 --- a/internal/repository/index.go +++ b/internal/repository/index.go @@ -217,34 +217,22 @@ func (idx *Index) AddToSupersedes(ids ...restic.ID) error { return nil } -// Each returns a channel that yields all blobs known to the index. When the -// context is cancelled, the background goroutine terminates. This blocks any +// Each passes all blobs known to the index to the callback fn. This blocks any // modification of the index. -func (idx *Index) Each(ctx context.Context) <-chan restic.PackedBlob { +func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) { idx.m.Lock() + defer idx.m.Unlock() - ch := make(chan restic.PackedBlob) - - go func() { - defer idx.m.Unlock() - defer func() { - close(ch) - }() - - for typ := range idx.byType { - m := &idx.byType[typ] - m.foreach(func(e *indexEntry) bool { - select { - case <-ctx.Done(): - return false - case ch <- idx.toPackedBlob(e, restic.BlobType(typ)): - return true - } - }) - } - }() - - return ch + for typ := range idx.byType { + m := &idx.byType[typ] + m.foreach(func(e *indexEntry) bool { + if ctx.Err() != nil { + return false + } + fn(idx.toPackedBlob(e, restic.BlobType(typ))) + return true + }) + } } type EachByPackResult struct { diff --git a/internal/repository/index_test.go b/internal/repository/index_test.go index 6bc7afca2..aeaa91a60 100644 --- a/internal/repository/index_test.go +++ b/internal/repository/index_test.go @@ -355,11 +355,11 @@ func TestIndexUnserialize(t *testing.T) { } func listPack(idx *repository.Index, id restic.ID) (pbs []restic.PackedBlob) { - for pb := range idx.Each(context.TODO()) { + idx.Each(context.TODO(), func(pb restic.PackedBlob) { if pb.PackID.Equal(id) { pbs = append(pbs, pb) } - } + }) return pbs } diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 05d0fdce2..424e60046 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -234,30 +234,15 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index { return list } -// Each returns a channel that yields all blobs known to the index. When the -// context is cancelled, the background goroutine terminates. This blocks any -// modification of the index. -func (mi *MasterIndex) Each(ctx context.Context) <-chan restic.PackedBlob { +// Each runs fn on all blobs known to the index. When the context is cancelled, +// the index iteration return immediately. This blocks any modification of the index. +func (mi *MasterIndex) Each(ctx context.Context, fn func(restic.PackedBlob)) { mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() - ch := make(chan restic.PackedBlob) - - go func() { - defer mi.idxMutex.RUnlock() - defer close(ch) - - for _, idx := range mi.idx { - for pb := range idx.Each(ctx) { - select { - case <-ctx.Done(): - return - case ch <- pb: - } - } - } - }() - - return ch + for _, idx := range mi.idx { + idx.Each(ctx, fn) + } } // MergeFinalIndexes merges all final indexes together. @@ -450,11 +435,11 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan if len(packBlob) == 0 { continue } - for pb := range mi.Each(ctx) { + mi.Each(ctx, func(pb restic.PackedBlob) { if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i { packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob) } - } + }) // pass on packs for packID, pbs := range packBlob { diff --git a/internal/repository/master_index_test.go b/internal/repository/master_index_test.go index 4456203ac..77cbc6178 100644 --- a/internal/repository/master_index_test.go +++ b/internal/repository/master_index_test.go @@ -163,9 +163,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) { rtest.Equals(t, 1, idxCount) blobCount := 0 - for range mIdx.Each(context.TODO()) { + mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { blobCount++ - } + }) rtest.Equals(t, 2, blobCount) blobs := mIdx.Lookup(bhInIdx1) @@ -195,9 +195,9 @@ func TestMasterMergeFinalIndexes(t *testing.T) { rtest.Equals(t, []restic.PackedBlob{blob2}, blobs) blobCount = 0 - for range mIdx.Each(context.TODO()) { + mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { blobCount++ - } + }) rtest.Equals(t, 2, blobCount) } @@ -316,9 +316,9 @@ func BenchmarkMasterIndexEach(b *testing.B) { for i := 0; i < b.N; i++ { entries := 0 - for _ = range mIdx.Each(context.TODO()) { + mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { entries++ - } + }) } } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 625ad9b16..5fe43164f 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -595,10 +595,15 @@ func (r *Repository) LoadIndex(ctx context.Context) error { // sanity check ctx, cancel := context.WithCancel(ctx) defer cancel() - for blob := range r.idx.Each(ctx) { + + invalidIndex := false + r.idx.Each(ctx, func(blob restic.PackedBlob) { if blob.IsCompressed() { - return errors.Fatal("index uses feature not supported by repository version 1") + invalidIndex = true } + }) + if invalidIndex { + return errors.Fatal("index uses feature not supported by repository version 1") } } diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index b5b0ff92d..bd324b850 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -362,13 +362,13 @@ func testRepositoryIncrementalIndex(t *testing.T, version uint) { idx, err := loadIndex(context.TODO(), repo, id) rtest.OK(t, err) - for pb := range idx.Each(context.TODO()) { + idx.Each(context.TODO(), func(pb restic.PackedBlob) { if _, ok := packEntries[pb.PackID]; !ok { packEntries[pb.PackID] = make(map[restic.ID]struct{}) } packEntries[pb.PackID][id] = struct{}{} - } + }) return nil }) if err != nil { diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 36f5a73bf..c559b5aa3 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -83,10 +83,9 @@ type MasterIndex interface { Has(BlobHandle) bool Lookup(BlobHandle) []PackedBlob - // Each returns a channel that yields all blobs known to the index. When - // the context is cancelled, the background goroutine terminates. This - // blocks any modification of the index. - Each(ctx context.Context) <-chan PackedBlob + // Each runs fn on all blobs known to the index. When the context is cancelled, + // the index iteration return immediately. This blocks any modification of the index. + Each(ctx context.Context, fn func(PackedBlob)) ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs Save(ctx context.Context, repo SaverUnpacked, packBlacklist IDSet, extraObsolete IDs, p *progress.Counter) (obsolete IDSet, err error)