1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2024-12-26 01:37:12 +00:00

repository: cancel streampack context after error

This commit is contained in:
Michael Eischer 2021-09-05 12:20:07 +02:00
parent 47554a3428
commit bba8ba7a5b

View file

@ -11,7 +11,7 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/cenkalti/backoff" "github.com/cenkalti/backoff/v4"
"github.com/restic/chunker" "github.com/restic/chunker"
"github.com/restic/restic/internal/backend/dryrun" "github.com/restic/restic/internal/backend/dryrun"
"github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/cache"
@ -746,6 +746,10 @@ type Loader interface {
type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error
// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In
// case of download errors handleBlobFn might be called multiple times for the same blob. If the
// callback returns an error, then StreamPack will abort and not retry it.
func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 { if len(blobs) == 0 {
// nothing to do // nothing to do
@ -762,8 +766,13 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack // stream blobs in pack
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancelation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart) bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize { if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize bufferSize = MaxStreamBufferSize
@ -823,6 +832,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack
err = handleBlobFn(entry.BlobHandle, plaintext, err) err = handleBlobFn(entry.BlobHandle, plaintext, err)
if err != nil { if err != nil {
cancel()
return backoff.Permanent(err) return backoff.Permanent(err)
} }
} }