Merge pull request #3082 from aawsome/check-sizes

Check: check sizes of packs from index and packheader
This commit is contained in:
MichaelEischer 2020-11-14 22:37:42 +01:00 committed by GitHub
commit 333c5a19d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 91 additions and 43 deletions

View File

@ -1,9 +1,17 @@
Enhancement: Check now checks index when reading packs Enhancement: Check now has more checks for consistency of index and pack files
Restic used to only verfiy the pack file content when calling `check --read-data` Restic used to only verify the pack file content when calling `check --read-data` or
`check --read-data-subset` but did not check if the blobs within the pack are `check --read-data-subset` but did not check if the blobs within the pack are
correctly contained in the index. correctly contained in the index.
This check is now added and may give an "Blob ID is not contained in index" error. This check is now added and may give an "Blob ID is not contained in index or position
is incorrect" error.
Also a new test is added which compares pack file sizes computed from the index and the
pack header with the actual file size. This test is able to detect truncated pack files.
If the index is not correct, it can be rebuilt by using the `rebuild-index` command. If the index is not correct, it can be rebuilt by using the `rebuild-index` command.
Having added these tests, `restic check` is now able to detect non-existing blobs which
are wrongly referenced in the index. This situation could have lead to missing data.
https://github.com/restic/restic/pull/3048 https://github.com/restic/restic/pull/3048
https://github.com/restic/restic/pull/3082

View File

@ -234,12 +234,12 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error {
} }
doReadData := func(bucket, totalBuckets uint) { doReadData := func(bucket, totalBuckets uint) {
packs := restic.IDSet{} packs := make(map[restic.ID]int64)
for pack := range chkr.GetPacks() { for pack, size := range chkr.GetPacks() {
// If we ever check more than the first byte // If we ever check more than the first byte
// of pack, update totalBucketsMax. // of pack, update totalBucketsMax.
if (uint(pack[0]) % totalBuckets) == (bucket - 1) { if (uint(pack[0]) % totalBuckets) == (bucket - 1) {
packs.Insert(pack) packs[pack] = size
} }
} }
packCount := uint64(len(packs)) packCount := uint64(len(packs))

View File

@ -22,7 +22,7 @@ import (
// A Checker only tests for internal errors within the data structures of the // A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on. // repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct { type Checker struct {
packs restic.IDSet packs map[restic.ID]int64
blobRefs struct { blobRefs struct {
sync.Mutex sync.Mutex
// see flags below // see flags below
@ -44,7 +44,7 @@ const (
// New returns a new checker which runs on repo. // New returns a new checker which runs on repo.
func New(repo restic.Repository) *Checker { func New(repo restic.Repository) *Checker {
c := &Checker{ c := &Checker{
packs: restic.NewIDSet(), packs: make(map[restic.ID]int64),
masterIndex: repository.NewMasterIndex(), masterIndex: repository.NewMasterIndex(),
repo: repo, repo: repo,
} }
@ -82,7 +82,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
// track spawned goroutines using wg, create a new context which is // track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs. // cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
type FileInfo struct { type FileInfo struct {
restic.ID restic.ID
@ -101,9 +101,9 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
// send list of index files through ch, which is closed afterwards // send list of index files through ch, which is closed afterwards
wg.Go(func() error { wg.Go(func() error {
defer close(ch) defer close(ch)
return c.repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { return c.repo.List(wgCtx, restic.IndexFile, func(id restic.ID, size int64) error {
select { select {
case <-ctx.Done(): case <-wgCtx.Done():
return nil return nil
case ch <- FileInfo{id, size}: case ch <- FileInfo{id, size}:
} }
@ -120,7 +120,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
var idx *repository.Index var idx *repository.Index
oldFormat := false oldFormat := false
buf, err = c.repo.LoadAndDecrypt(ctx, buf[:0], restic.IndexFile, fi.ID) buf, err = c.repo.LoadAndDecrypt(wgCtx, buf[:0], restic.IndexFile, fi.ID)
if err == nil { if err == nil {
idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID) idx, oldFormat, err = repository.DecodeIndex(buf, fi.ID)
} }
@ -134,7 +134,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
select { select {
case resultCh <- Result{idx, fi.ID, err}: case resultCh <- Result{idx, fi.ID, err}:
case <-ctx.Done(): case <-wgCtx.Done():
} }
} }
return nil return nil
@ -161,8 +161,7 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
debug.Log("process blobs") debug.Log("process blobs")
cnt := 0 cnt := 0
for blob := range res.Index.Each(ctx) { for blob := range res.Index.Each(wgCtx) {
c.packs.Insert(blob.PackID)
h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} h := restic.BlobHandle{ID: blob.ID, Type: blob.Type}
c.blobRefs.M[h] = blobStatusExists c.blobRefs.M[h] = blobStatusExists
cnt++ cnt++
@ -183,6 +182,18 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
errs = append(errs, err) errs = append(errs, err)
} }
// Merge index before computing pack sizes, as this needs removed duplicates
c.masterIndex.MergeFinalIndexes()
// compute pack size using index entries
for blob := range c.masterIndex.Each(ctx) {
size, ok := c.packs[blob.PackID]
if !ok {
size = pack.HeaderSize
}
c.packs[blob.PackID] = size + int64(pack.PackedSizeOfBlob(blob.Length))
}
debug.Log("checking for duplicate packs") debug.Log("checking for duplicate packs")
for packID := range c.packs { for packID := range c.packs {
debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID])) debug.Log(" check pack %v: contained in %d indexes", packID, len(packToIndex[packID]))
@ -194,8 +205,6 @@ func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
} }
} }
c.masterIndex.MergeFinalIndexes()
err = c.repo.SetIndex(c.masterIndex) err = c.repo.SetIndex(c.masterIndex)
if err != nil { if err != nil {
debug.Log("SetIndex returned error: %v", err) debug.Log("SetIndex returned error: %v", err)
@ -235,10 +244,10 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
debug.Log("checking for %d packs", len(c.packs)) debug.Log("checking for %d packs", len(c.packs))
debug.Log("listing repository packs") debug.Log("listing repository packs")
repoPacks := restic.NewIDSet() repoPacks := make(map[restic.ID]int64)
err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error { err := c.repo.List(ctx, restic.PackFile, func(id restic.ID, size int64) error {
repoPacks.Insert(id) repoPacks[id] = size
return nil return nil
}) })
@ -246,23 +255,39 @@ func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
errChan <- err errChan <- err
} }
for id, size := range c.packs {
reposize, ok := repoPacks[id]
// remove from repoPacks so we can find orphaned packs
delete(repoPacks, id)
// missing: present in c.packs but not in the repo
if !ok {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Err: errors.New("does not exist")}:
}
continue
}
// size not matching: present in c.packs and in the repo, but sizes do not match
if size != reposize {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Err: errors.Errorf("unexpected file size: got %d, expected %d", reposize, size)}:
}
}
}
// orphaned: present in the repo but not in c.packs // orphaned: present in the repo but not in c.packs
for orphanID := range repoPacks.Sub(c.packs) { for orphanID := range repoPacks {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}: case errChan <- PackError{ID: orphanID, Orphaned: true, Err: errors.New("not referenced in any index")}:
} }
} }
// missing: present in c.packs but not in the repo
for missingID := range c.packs.Sub(repoPacks) {
select {
case <-ctx.Done():
return
case errChan <- PackError{ID: missingID, Err: errors.New("does not exist")}:
}
}
} }
// Error is an error that occurred while checking a repository. // Error is an error that occurred while checking a repository.
@ -695,16 +720,16 @@ func (c *Checker) CountPacks() uint64 {
} }
// GetPacks returns IDSet of packs in the repository // GetPacks returns IDSet of packs in the repository
func (c *Checker) GetPacks() restic.IDSet { func (c *Checker) GetPacks() map[restic.ID]int64 {
return c.packs return c.packs
} }
// checkPack reads a pack and checks the integrity of all blobs. // checkPack reads a pack and checks the integrity of all blobs.
func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error { func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error {
debug.Log("checking pack %v", id) debug.Log("checking pack %v", id)
h := restic.Handle{Type: restic.PackFile, Name: id.String()} h := restic.Handle{Type: restic.PackFile, Name: id.String()}
packfile, hash, size, err := repository.DownloadAndHash(ctx, r.Backend(), h) packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h)
if err != nil { if err != nil {
return errors.Wrap(err, "checkPack") return errors.Wrap(err, "checkPack")
} }
@ -721,6 +746,11 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
} }
if realSize != size {
debug.Log("Pack size does not match, want %v, got %v", size, realSize)
return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize)
}
blobs, err := pack.List(r.Key(), packfile, size) blobs, err := pack.List(r.Key(), packfile, size)
if err != nil { if err != nil {
return err return err
@ -728,8 +758,10 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
var errs []error var errs []error
var buf []byte var buf []byte
sizeFromBlobs := int64(pack.HeaderSize) // pack size computed only from blob information
idx := r.Index() idx := r.Index()
for i, blob := range blobs { for i, blob := range blobs {
sizeFromBlobs += int64(pack.PackedSizeOfBlob(blob.Length))
debug.Log(" check blob %d: %v", i, blob) debug.Log(" check blob %d: %v", i, blob)
buf = buf[:cap(buf)] buf = buf[:cap(buf)]
@ -765,20 +797,25 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
continue continue
} }
// Check if blob is contained in index // Check if blob is contained in index and position is correct
idxHas := false idxHas := false
for _, pb := range idx.Lookup(blob.ID, blob.Type) { for _, pb := range idx.Lookup(blob.ID, blob.Type) {
if pb.PackID == id { if pb.PackID == id && pb.Offset == blob.Offset && pb.Length == blob.Length {
idxHas = true idxHas = true
break break
} }
} }
if !idxHas { if !idxHas {
errs = append(errs, errors.Errorf("Blob ID %v is not contained in index", blob.ID.Str())) errs = append(errs, errors.Errorf("Blob %v is not contained in index or position is incorrect", blob.ID.Str()))
continue continue
} }
} }
if sizeFromBlobs != size {
debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs)
errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs))
}
if len(errs) > 0 { if len(errs) > 0 {
return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
} }
@ -792,29 +829,32 @@ func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
} }
// ReadPacks loads data from specified packs and checks the integrity. // ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress.Counter, errChan chan<- error) { func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan) defer close(errChan)
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
ch := make(chan restic.ID) type packsize struct {
id restic.ID
size int64
}
ch := make(chan packsize)
// run workers // run workers
for i := 0; i < defaultParallelism; i++ { for i := 0; i < defaultParallelism; i++ {
g.Go(func() error { g.Go(func() error {
for { for {
var id restic.ID var ps packsize
var ok bool var ok bool
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case id, ok = <-ch: case ps, ok = <-ch:
if !ok { if !ok {
return nil return nil
} }
} }
err := checkPack(ctx, c.repo, ps.id, ps.size)
err := checkPack(ctx, c.repo, id)
p.Add(1) p.Add(1)
if err == nil { if err == nil {
continue continue
@ -830,9 +870,9 @@ func (c *Checker) ReadPacks(ctx context.Context, packs restic.IDSet, p *progress
} }
// push packs to ch // push packs to ch
for pack := range packs { for pack, size := range packs {
select { select {
case ch <- pack: case ch <- packsize{id: pack, size: size}:
case <-ctx.Done(): case <-ctx.Done():
} }
} }