From 4cefd456bb3fb77adcc5bb68e16f42b23239c132 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 23 Feb 2016 23:48:55 +0100 Subject: [PATCH] Refactor rebuild-index code This code reads all pack headers from all packs and rebuilds the index from scratch. Afterwards, all indexes are removed. This is needed because in #434 the command `optimize` produced a broken index that did not contain a blob any more. Running `rebuild-index` should fix this. --- src/cmds/restic/cmd_dump.go | 4 +- src/cmds/restic/cmd_rebuild_index.go | 195 +++++++++------------------ 2 files changed, 68 insertions(+), 131 deletions(-) diff --git a/src/cmds/restic/cmd_dump.go b/src/cmds/restic/cmd_dump.go index 56b010693..72a9d85b8 100644 --- a/src/cmds/restic/cmd_dump.go +++ b/src/cmds/restic/cmd_dump.go @@ -101,7 +101,7 @@ func printTrees(repo *repository.Repository, wr io.Writer) error { return nil } -const numWorkers = 10 +const dumpPackWorkers = 10 // Pack is the struct used in printPacks. type Pack struct { @@ -138,7 +138,7 @@ func printPacks(repo *repository.Repository, wr io.Writer) error { jobCh := make(chan worker.Job) resCh := make(chan worker.Job) - wp := worker.New(numWorkers, f, jobCh, resCh) + wp := worker.New(dumpPackWorkers, f, jobCh, resCh) go func() { for name := range repo.Backend().List(backend.Data, done) { diff --git a/src/cmds/restic/cmd_rebuild_index.go b/src/cmds/restic/cmd_rebuild_index.go index cab8bbc46..88ad618ab 100644 --- a/src/cmds/restic/cmd_rebuild_index.go +++ b/src/cmds/restic/cmd_rebuild_index.go @@ -1,13 +1,13 @@ package main import ( - "bytes" "fmt" - + "os" "restic/backend" "restic/debug" "restic/pack" "restic/repository" + "restic/worker" ) type CmdRebuildIndex struct { @@ -26,164 +26,101 @@ func init() { } } -func (cmd CmdRebuildIndex) storeIndex(index *repository.Index) (*repository.Index, error) { - debug.Log("RebuildIndex.RebuildIndex", "saving index") - - cmd.global.Printf(" saving new index\n") - id, err := repository.SaveIndex(cmd.repo, index) - if err != nil { - debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err) - return nil, err - } - - debug.Log("RebuildIndex.RebuildIndex", "index saved as %v", id.Str()) - index = repository.NewIndex() - - return index, nil -} - -func (cmd CmdRebuildIndex) RebuildIndex() error { - debug.Log("RebuildIndex.RebuildIndex", "start") +const rebuildIndexWorkers = 10 +func loadBlobsFromPacks(repo *repository.Repository) (packs map[backend.ID][]pack.Blob) { done := make(chan struct{}) defer close(done) - indexIDs := backend.NewIDSet() - for id := range cmd.repo.List(backend.Index, done) { - indexIDs.Insert(id) - } + f := func(job worker.Job, done <-chan struct{}) (interface{}, error) { + id := job.Data.(backend.ID) - cmd.global.Printf("rebuilding index from %d indexes\n", len(indexIDs)) + h := backend.Handle{Type: backend.Data, Name: id.String()} + rd := backend.NewReadSeeker(repo.Backend(), h) - debug.Log("RebuildIndex.RebuildIndex", "found %v indexes", len(indexIDs)) - - combinedIndex := repository.NewIndex() - packsDone := backend.NewIDSet() - - type Blob struct { - id backend.ID - tpe pack.BlobType - } - blobsDone := make(map[Blob]struct{}) - - i := 0 - for indexID := range indexIDs { - cmd.global.Printf(" loading index %v\n", i) - - debug.Log("RebuildIndex.RebuildIndex", "load index %v", indexID.Str()) - idx, err := repository.LoadIndex(cmd.repo, indexID) + unpacker, err := pack.NewUnpacker(repo.Key(), rd) if err != nil { - return err + return nil, err } - debug.Log("RebuildIndex.RebuildIndex", "adding blobs from index %v", indexID.Str()) - - for packedBlob := range idx.Each(done) { - packsDone.Insert(packedBlob.PackID) - b := Blob{ - id: packedBlob.ID, - tpe: packedBlob.Type, - } - if _, ok := blobsDone[b]; ok { - continue - } - - blobsDone[b] = struct{}{} - combinedIndex.Store(packedBlob) - } - - combinedIndex.AddToSupersedes(indexID) - - if repository.IndexFull(combinedIndex) { - combinedIndex, err = cmd.storeIndex(combinedIndex) - if err != nil { - return err - } - } - - i++ + return unpacker.Entries, nil } - var err error - if combinedIndex.Length() > 0 { - combinedIndex, err = cmd.storeIndex(combinedIndex) - if err != nil { - return err + jobCh := make(chan worker.Job) + resCh := make(chan worker.Job) + wp := worker.New(rebuildIndexWorkers, f, jobCh, resCh) + + go func() { + for id := range repo.List(backend.Data, done) { + jobCh <- worker.Job{Data: id} } - } + close(jobCh) + }() - cmd.global.Printf("removing %d old indexes\n", len(indexIDs)) - for id := range indexIDs { - debug.Log("RebuildIndex.RebuildIndex", "remove index %v", id.Str()) + packs = make(map[backend.ID][]pack.Blob) + for job := range resCh { + id := job.Data.(backend.ID) - err := cmd.repo.Backend().Remove(backend.Index, id.String()) - if err != nil { - debug.Log("RebuildIndex.RebuildIndex", "error removing index %v: %v", id.Str(), err) - return err - } - } - - cmd.global.Printf("checking for additional packs\n") - newPacks := 0 - var buf []byte - for packID := range cmd.repo.List(backend.Data, done) { - if packsDone.Has(packID) { + if job.Error != nil { + fmt.Fprintf(os.Stderr, "error for pack %v: %v\n", id, job.Error) continue } - debug.Log("RebuildIndex.RebuildIndex", "pack %v not indexed", packID.Str()) - newPacks++ + entries := job.Result.([]pack.Blob) + packs[id] = entries + } - var err error + wp.Wait() - h := backend.Handle{Type: backend.Data, Name: packID.String()} - buf, err = backend.LoadAll(cmd.repo.Backend(), h, buf) - if err != nil { - debug.Log("RebuildIndex.RebuildIndex", "error while loading pack %v", packID.Str()) - return fmt.Errorf("error while loading pack %v: %v", packID.Str(), err) - } + return packs +} - hash := backend.Hash(buf) - if !hash.Equal(packID) { - debug.Log("RebuildIndex.RebuildIndex", "Pack ID does not match, want %v, got %v", packID.Str(), hash.Str()) - return fmt.Errorf("Pack ID does not match, want %v, got %v", packID.Str(), hash.Str()) - } +func listIndexIDs(repo *repository.Repository) (list backend.IDs) { + done := make(chan struct{}) + for id := range repo.List(backend.Index, done) { + list = append(list, id) + } - up, err := pack.NewUnpacker(cmd.repo.Key(), bytes.NewReader(buf)) - if err != nil { - debug.Log("RebuildIndex.RebuildIndex", "error while unpacking pack %v", packID.Str()) - return err - } + return list +} - for _, blob := range up.Entries { - debug.Log("RebuildIndex.RebuildIndex", "pack %v: blob %v", packID.Str(), blob) - combinedIndex.Store(repository.PackedBlob{ - Type: blob.Type, - ID: blob.ID, +func (cmd CmdRebuildIndex) RebuildIndex() error { + debug.Log("RebuildIndex.RebuildIndex", "start rebuilding index") + + packs := loadBlobsFromPacks(cmd.repo) + cmd.global.Verbosef("loaded blobs from %d packs\n", len(packs)) + + idx := repository.NewIndex() + for packID, entries := range packs { + for _, entry := range entries { + pb := repository.PackedBlob{ + ID: entry.ID, + Type: entry.Type, + Length: entry.Length, + Offset: entry.Offset, PackID: packID, - Offset: blob.Offset, - Length: blob.Length, - }) - } - - if repository.IndexFull(combinedIndex) { - combinedIndex, err = cmd.storeIndex(combinedIndex) - if err != nil { - return err } + idx.Store(pb) } } - if combinedIndex.Length() > 0 { - combinedIndex, err = cmd.storeIndex(combinedIndex) + oldIndexes := listIndexIDs(cmd.repo) + idx.AddToSupersedes(oldIndexes...) + cmd.global.Printf(" saving new index\n") + id, err := repository.SaveIndex(cmd.repo, idx) + if err != nil { + debug.Log("RebuildIndex.RebuildIndex", "error saving index: %v", err) + return err + } + debug.Log("RebuildIndex.RebuildIndex", "new index saved as %v", id.Str()) + + for _, indexID := range oldIndexes { + err := cmd.repo.Backend().Remove(backend.Index, indexID.String()) if err != nil { - return err + cmd.global.Warnf("unable to remove index %v: %v\n", indexID.Str(), err) } } - cmd.global.Printf("added %d packs to the index\n", newPacks) - - debug.Log("RebuildIndex.RebuildIndex", "done") return nil }