mirror of
https://github.com/restic/restic.git
synced 2025-01-03 05:35:43 +00:00
cache: Ensure failed downloads are retried
This fixes #1833, which consists of two different bugs: * The `defer` in `cacheFile()` may remove a channel from the `inProgress` map although it is not responsible for downloading the file * If the download fails, goroutines waiting for the file to be cached assumed that the file was there, there was no way to signal the error.
This commit is contained in:
parent
48aab8bd65
commit
fc5439a37a
2 changed files with 72 additions and 23 deletions
35
internal/cache/backend.go
vendored
35
internal/cache/backend.go
vendored
|
@ -91,14 +91,6 @@ var autoCacheFiles = map[restic.FileType]bool{
|
|||
|
||||
func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
||||
finish := make(chan struct{})
|
||||
defer func() {
|
||||
close(finish)
|
||||
|
||||
// remove the finish channel from the map
|
||||
b.inProgressMutex.Lock()
|
||||
delete(b.inProgress, h)
|
||||
b.inProgressMutex.Unlock()
|
||||
}()
|
||||
|
||||
b.inProgressMutex.Lock()
|
||||
other, alreadyDownloading := b.inProgress[h]
|
||||
|
@ -120,10 +112,17 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
|||
if err != nil {
|
||||
// try to remove from the cache, ignore errors
|
||||
_ = b.Cache.Remove(h)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
// signal other waiting goroutines that the file may now be cached
|
||||
close(finish)
|
||||
|
||||
// remove the finish channel from the map
|
||||
b.inProgressMutex.Lock()
|
||||
delete(b.inProgress, h)
|
||||
b.inProgressMutex.Unlock()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// loadFromCacheOrDelegate will try to load the file from the cache, and fall
|
||||
|
@ -131,12 +130,13 @@ func (b *Backend) cacheFile(ctx context.Context, h restic.Handle) error {
|
|||
func (b *Backend) loadFromCacheOrDelegate(ctx context.Context, h restic.Handle, length int, offset int64, consumer func(rd io.Reader) error) error {
|
||||
rd, err := b.Cache.Load(h, length, offset)
|
||||
if err != nil {
|
||||
debug.Log("error caching %v: %v, falling back to backend", h, err)
|
||||
return b.Backend.Load(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors
|
||||
_ = rd.Close() // ignore secondary errors
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
|
@ -193,19 +193,8 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||
|
||||
debug.Log("auto-store %v in the cache", h)
|
||||
err := b.cacheFile(ctx, h)
|
||||
|
||||
if err == nil {
|
||||
// load the cached version
|
||||
rd, err := b.Cache.Load(h, 0, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = consumer(rd)
|
||||
if err != nil {
|
||||
rd.Close() // ignore secondary errors
|
||||
return err
|
||||
}
|
||||
return rd.Close()
|
||||
return b.loadFromCacheOrDelegate(ctx, h, length, offset, consumer)
|
||||
}
|
||||
|
||||
debug.Log("error caching %v: %v, falling back to backend", h, err)
|
||||
|
|
60
internal/cache/backend_test.go
vendored
60
internal/cache/backend_test.go
vendored
|
@ -3,9 +3,13 @@ package cache
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
|
@ -112,3 +116,59 @@ func TestBackend(t *testing.T) {
|
|||
t.Errorf("removed file still in cache after stat")
|
||||
}
|
||||
}
|
||||
|
||||
type loadErrorBackend struct {
|
||||
restic.Backend
|
||||
loadError error
|
||||
}
|
||||
|
||||
func (be loadErrorBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
return be.loadError
|
||||
}
|
||||
|
||||
func TestErrorBackend(t *testing.T) {
|
||||
be := mem.New()
|
||||
|
||||
c, cleanup := TestNewCache(t)
|
||||
defer cleanup()
|
||||
|
||||
h, data := randomData(5234142)
|
||||
|
||||
// save directly in backend
|
||||
save(t, be, h, data)
|
||||
|
||||
testErr := errors.New("test error")
|
||||
errBackend := loadErrorBackend{
|
||||
Backend: be,
|
||||
loadError: testErr,
|
||||
}
|
||||
|
||||
loadTest := func(wg *sync.WaitGroup, be restic.Backend) {
|
||||
defer wg.Done()
|
||||
|
||||
buf, err := backend.LoadAll(context.TODO(), be, h)
|
||||
if err == testErr {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
return
|
||||
}
|
||||
|
||||
if !bytes.Equal(buf, data) {
|
||||
t.Errorf("data does not match")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
wrappedBE := c.Wrap(errBackend)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 5; i++ {
|
||||
wg.Add(1)
|
||||
go loadTest(&wg, wrappedBE)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue