mirror of
https://github.com/restic/restic.git
synced 2024-12-24 00:37:28 +00:00
Merge pull request #358 from episource/iss358_pack_not_referened_add_test
Closes #365 Closes #358
This commit is contained in:
commit
34c1056efc
3 changed files with 156 additions and 16 deletions
107
archiver_test.go
107
archiver_test.go
|
@ -5,12 +5,15 @@ import (
|
|||
"crypto/sha256"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/chunker"
|
||||
"github.com/restic/restic"
|
||||
"github.com/restic/restic/backend"
|
||||
"github.com/restic/restic/checker"
|
||||
"github.com/restic/restic/crypto"
|
||||
"github.com/restic/restic/pack"
|
||||
"github.com/restic/restic/repository"
|
||||
. "github.com/restic/restic/test"
|
||||
)
|
||||
|
||||
|
@ -21,6 +24,11 @@ type Rdr interface {
|
|||
io.ReaderAt
|
||||
}
|
||||
|
||||
type chunkedData struct {
|
||||
buf []byte
|
||||
chunks []*chunker.Chunk
|
||||
}
|
||||
|
||||
func benchmarkChunkEncrypt(b testing.TB, buf, buf2 []byte, rd Rdr, key *crypto.Key) {
|
||||
rd.Seek(0, 0)
|
||||
ch := chunker.New(rd, testPol, sha256.New())
|
||||
|
@ -233,3 +241,102 @@ func BenchmarkLoadTree(t *testing.B) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Saves several identical chunks concurrently and later checks that there are no
|
||||
// unreferenced packs in the repository. See also #292 and #358.
|
||||
func TestParallelSaveWithDuplication(t *testing.T) {
|
||||
for seed := 0; seed < 10; seed++ {
|
||||
testParallelSaveWithDuplication(t, seed)
|
||||
}
|
||||
}
|
||||
|
||||
func testParallelSaveWithDuplication(t *testing.T, seed int) {
|
||||
repo := SetupRepo()
|
||||
defer TeardownRepo(repo)
|
||||
|
||||
dataSizeMb := 128
|
||||
duplication := 7
|
||||
|
||||
arch := restic.NewArchiver(repo)
|
||||
data, chunks := getRandomData(seed, dataSizeMb*1024*1024)
|
||||
reader := bytes.NewReader(data)
|
||||
|
||||
errChannels := [](<-chan error){}
|
||||
|
||||
// interweaved processing of subsequent chunks
|
||||
maxParallel := 2*duplication - 1
|
||||
barrier := make(chan struct{}, maxParallel)
|
||||
|
||||
for _, c := range chunks {
|
||||
for dupIdx := 0; dupIdx < duplication; dupIdx++ {
|
||||
errChan := make(chan error)
|
||||
errChannels = append(errChannels, errChan)
|
||||
|
||||
go func(reader *bytes.Reader, c *chunker.Chunk, errChan chan<- error) {
|
||||
barrier <- struct{}{}
|
||||
|
||||
hash := c.Digest
|
||||
id := backend.ID{}
|
||||
copy(id[:], hash)
|
||||
|
||||
time.Sleep(time.Duration(hash[0]))
|
||||
err := arch.Save(pack.Data, id, c.Length, c.Reader(reader))
|
||||
<-barrier
|
||||
errChan <- err
|
||||
}(reader, c, errChan)
|
||||
}
|
||||
}
|
||||
|
||||
for _, errChan := range errChannels {
|
||||
OK(t, <-errChan)
|
||||
}
|
||||
|
||||
OK(t, repo.Flush())
|
||||
OK(t, repo.SaveIndex())
|
||||
|
||||
chkr := createAndInitChecker(t, repo)
|
||||
assertNoUnreferencedPacks(t, chkr)
|
||||
}
|
||||
|
||||
func getRandomData(seed int, size int) ([]byte, []*chunker.Chunk) {
|
||||
buf := Random(seed, size)
|
||||
chunks := []*chunker.Chunk{}
|
||||
chunker := chunker.New(bytes.NewReader(buf), testPol, sha256.New())
|
||||
|
||||
for {
|
||||
c, err := chunker.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
chunks = append(chunks, c)
|
||||
}
|
||||
|
||||
return buf, chunks
|
||||
}
|
||||
|
||||
func createAndInitChecker(t *testing.T, repo *repository.Repository) *checker.Checker {
|
||||
chkr := checker.New(repo)
|
||||
|
||||
hints, errs := chkr.LoadIndex()
|
||||
if len(errs) > 0 {
|
||||
t.Fatalf("expected no errors, got %v: %v", len(errs), errs)
|
||||
}
|
||||
|
||||
if len(hints) > 0 {
|
||||
t.Errorf("expected no hints, got %v: %v", len(hints), hints)
|
||||
}
|
||||
|
||||
return chkr
|
||||
}
|
||||
|
||||
func assertNoUnreferencedPacks(t *testing.T, chkr *checker.Checker) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
errChan := make(chan error)
|
||||
go chkr.Packs(errChan, done)
|
||||
|
||||
for err := range errChan {
|
||||
OK(t, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -157,27 +157,47 @@ func (mi *MasterIndex) Current() *Index {
|
|||
// AddInFlight add the given ID to the list of in-flight IDs. An error is
|
||||
// returned when the ID is already in the list.
|
||||
func (mi *MasterIndex) AddInFlight(id backend.ID) error {
|
||||
mi.inFlight.Lock()
|
||||
defer mi.inFlight.Unlock()
|
||||
// The index + inFlight store must be searched for a matching id in one
|
||||
// atomic operation. This requires locking the inFlight store and the
|
||||
// index together!
|
||||
mi.inFlight.Lock()
|
||||
defer mi.inFlight.Unlock()
|
||||
|
||||
debug.Log("MasterIndex.AddInFlight", "adding %v", id)
|
||||
if mi.inFlight.Has(id) {
|
||||
return fmt.Errorf("%v is already in flight", id)
|
||||
}
|
||||
// Note: mi.Has read locks the index again.
|
||||
mi.idxMutex.RLock()
|
||||
defer mi.idxMutex.RUnlock()
|
||||
|
||||
mi.inFlight.Insert(id)
|
||||
return nil
|
||||
debug.Log("MasterIndex.AddInFlight", "adding %v", id)
|
||||
if mi.inFlight.Has(id) {
|
||||
return fmt.Errorf("%v is already in flight", id)
|
||||
}
|
||||
if mi.Has(id) {
|
||||
return fmt.Errorf("%v is already indexed (fully processed)", id)
|
||||
}
|
||||
|
||||
mi.inFlight.Insert(id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsInFlight returns true iff the id is contained in the list of in-flight IDs.
|
||||
func (mi *MasterIndex) IsInFlight(id backend.ID) bool {
|
||||
mi.inFlight.RLock()
|
||||
defer mi.inFlight.RUnlock()
|
||||
// The index + inFlight store must be searched for a matching id in one
|
||||
// atomic operation. This requires locking the inFlight store and the
|
||||
// index together!
|
||||
mi.inFlight.RLock()
|
||||
defer mi.inFlight.RUnlock()
|
||||
|
||||
inFlight := mi.inFlight.Has(id)
|
||||
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
|
||||
// Note: mi.Has read locks the index again.
|
||||
mi.idxMutex.RLock()
|
||||
defer mi.idxMutex.RUnlock()
|
||||
|
||||
return inFlight
|
||||
inFlight := mi.inFlight.Has(id)
|
||||
debug.Log("MasterIndex.IsInFlight", "testing whether %v is in flight: %v", id.Str(), inFlight)
|
||||
|
||||
indexed := mi.Has(id)
|
||||
debug.Log("MasterIndex.IsInFlight", "testing whether %v is indexed (fully processed): %v", id.Str(), indexed)
|
||||
|
||||
return inFlight
|
||||
}
|
||||
|
||||
// RemoveFromInFlight deletes the given ID from the liste of in-flight IDs.
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package test_helper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/bzip2"
|
||||
"compress/gzip"
|
||||
"crypto/rand"
|
||||
|
@ -86,10 +85,24 @@ func Random(seed, count int) []byte {
|
|||
return buf
|
||||
}
|
||||
|
||||
type rndReader struct {
|
||||
src *mrand.Rand
|
||||
}
|
||||
|
||||
func (r *rndReader) Read(p []byte) (int, error) {
|
||||
fmt.Printf("Read(%v)\n", len(p))
|
||||
for i := range p {
|
||||
p[i] = byte(r.src.Uint32())
|
||||
}
|
||||
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// RandomReader returns a reader that returns size bytes of pseudo-random data
|
||||
// derived from the seed.
|
||||
func RandomReader(seed, size int) *bytes.Reader {
|
||||
return bytes.NewReader(Random(seed, size))
|
||||
func RandomReader(seed, size int) io.Reader {
|
||||
r := &rndReader{src: mrand.New(mrand.NewSource(int64(seed)))}
|
||||
return io.LimitReader(r, int64(size))
|
||||
}
|
||||
|
||||
// GenRandom returns a []byte filled with up to 1000 random bytes.
|
||||
|
|
Loading…
Reference in a new issue