Compare commits

...

15 Commits

Author SHA1 Message Date
Aneesh N d68a8efd5c
Merge 672f6cd776 into ffe5439149 2024-05-01 10:25:03 -07:00
Michael Eischer ffe5439149
Merge pull request #4605 from MichaelEischer/better-restorer-error-handling
Rework repository.StreamPacks & better restorer error handling
2024-05-01 16:37:41 +02:00
Michael Eischer 676f0dc60d add changelog 2024-05-01 16:28:57 +02:00
Michael Eischer 1e57057953
Merge pull request #4789 from restic/dependabot/go_modules/github.com/Azure/azure-sdk-for-go/sdk/storage/azblob-1.3.2
build(deps): bump github.com/Azure/azure-sdk-for-go/sdk/storage/azblob from 1.3.1 to 1.3.2
2024-05-01 10:45:47 +00:00
Michael Eischer 1ba0af6993
Merge pull request #4787 from restic/dependabot/go_modules/github.com/klauspost/compress-1.17.8
build(deps): bump github.com/klauspost/compress from 1.17.7 to 1.17.8
2024-05-01 10:44:33 +00:00
Michael Eischer ffc41ae62a
Merge pull request #4786 from restic/dependabot/go_modules/golang.org/x/net-0.24.0
build(deps): bump golang.org/x/net from 0.23.0 to 0.24.0
2024-05-01 10:41:26 +00:00
Michael Eischer 4832c2fbfa
Merge pull request #4790 from restic/dependabot/github_actions/golangci/golangci-lint-action-5
build(deps): bump golangci/golangci-lint-action from 4 to 5
2024-05-01 10:37:37 +00:00
dependabot[bot] 30609ae6b2
build(deps): bump golangci/golangci-lint-action from 4 to 5
Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 4 to 5.
- [Release notes](https://github.com/golangci/golangci-lint-action/releases)
- [Commits](https://github.com/golangci/golangci-lint-action/compare/v4...v5)

---
updated-dependencies:
- dependency-name: golangci/golangci-lint-action
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-01 01:45:43 +00:00
dependabot[bot] 502e5867a5
build(deps): bump github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Bumps [github.com/Azure/azure-sdk-for-go/sdk/storage/azblob](https://github.com/Azure/azure-sdk-for-go) from 1.3.1 to 1.3.2.
- [Release notes](https://github.com/Azure/azure-sdk-for-go/releases)
- [Changelog](https://github.com/Azure/azure-sdk-for-go/blob/main/documentation/release.md)
- [Commits](https://github.com/Azure/azure-sdk-for-go/compare/sdk/azcore/v1.3.1...sdk/storage/azblob/v1.3.2)

---
updated-dependencies:
- dependency-name: github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-01 01:02:39 +00:00
dependabot[bot] 18a6d6b408
build(deps): bump github.com/klauspost/compress from 1.17.7 to 1.17.8
Bumps [github.com/klauspost/compress](https://github.com/klauspost/compress) from 1.17.7 to 1.17.8.
- [Release notes](https://github.com/klauspost/compress/releases)
- [Changelog](https://github.com/klauspost/compress/blob/master/.goreleaser.yml)
- [Commits](https://github.com/klauspost/compress/compare/v1.17.7...v1.17.8)

---
updated-dependencies:
- dependency-name: github.com/klauspost/compress
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-01 01:02:22 +00:00
dependabot[bot] 3bb88e8307
build(deps): bump golang.org/x/net from 0.23.0 to 0.24.0
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.23.0 to 0.24.0.
- [Commits](https://github.com/golang/net/compare/v0.23.0...v0.24.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-05-01 01:02:11 +00:00
Michael Eischer 20d8eed400 repository: streamPack: separate requests for gap larger than 1MB
With most cloud providers, traffic is much more expensive than API
calls. Thus slightly bias streamPack towards a bit more API calls in
exchange for slightly less traffic.
2024-04-22 21:21:23 +02:00
Michael Eischer cf700d8794 repository: streamPack: reuse zstd decoder 2024-04-22 21:21:23 +02:00
Michael Eischer 666a0b0bdb repository: streamPack: replace streaming with chunked download
Due to the interface of streamPack, we cannot guarantee that operations
progress fast enough that the underlying connections remains open. This
introduces partial failures which massively complicate the error
handling.

Switch to a simpler approach that retrieves the pack in chunks of 32MB.
If a blob is larger than this limit, then it is downloaded separately.

To avoid multiple copies in memory, an auxiliary interface
`discardReader` is introduced that allows directly accessing the
downloaded byte slices, while still supporting the streaming used by the
`check` command.
2024-04-22 21:21:23 +02:00
Michael Eischer 621012dac0 repository: Add blob loading fallback to LoadBlobsFromPack
Try to retrieve individual blobs via LoadBlob if streaming did not work.
2024-04-21 21:35:55 +02:00
8 changed files with 270 additions and 104 deletions

View File

@ -261,7 +261,7 @@ jobs:
uses: actions/checkout@v4
- name: golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v5
with:
# Required: the version of golangci-lint is required and must be specified without patch version: we always use the latest patch version.
version: v1.57.1

View File

@ -0,0 +1,8 @@
Enhancement: Improve reliability of backend operations
Restic now downloads pack files in large chunks instead of using a streaming
download. This prevents failures due to interrupted streams. The `restore`
command now also retries downloading individual blobs that cannot be retrieved.
https://github.com/restic/restic/issues/4627
https://github.com/restic/restic/pull/4605

14
go.mod
View File

@ -2,9 +2,9 @@ module github.com/restic/restic
require (
cloud.google.com/go/storage v1.40.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/Backblaze/blazer v0.6.1
github.com/anacrolix/fuse v0.2.0
github.com/cenkalti/backoff/v4 v4.2.1
@ -13,7 +13,7 @@ require (
github.com/go-ole/go-ole v1.3.0
github.com/google/go-cmp v0.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/klauspost/compress v1.17.7
github.com/klauspost/compress v1.17.8
github.com/minio/minio-go/v7 v7.0.66
github.com/minio/sha256-simd v1.0.1
github.com/ncw/swift/v2 v2.0.2
@ -26,12 +26,12 @@ require (
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.21.0
golang.org/x/net v0.23.0
golang.org/x/crypto v0.22.0
golang.org/x/net v0.24.0
golang.org/x/oauth2 v0.18.0
golang.org/x/sync v0.6.0
golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0
golang.org/x/sys v0.19.0
golang.org/x/term v0.19.0
golang.org/x/text v0.14.0
golang.org/x/time v0.5.0
google.golang.org/api v0.170.0

28
go.sum
View File

@ -9,15 +9,15 @@ cloud.google.com/go/iam v1.1.7 h1:z4VHOhwKLF/+UYXAJDFwGtNF0b6gjsW1Pk9Ml0U/IoM=
cloud.google.com/go/iam v1.1.7/go.mod h1:J4PMPg8TtyurAUvSmPj8FF3EDgY1SPRZxcUGrn7WXGA=
cloud.google.com/go/storage v1.40.0 h1:VEpDQV5CJxFmJ6ueWNsKxcr1QAYOXEgxDa+sBbJahPw=
cloud.google.com/go/storage v1.40.0/go.mod h1:Rrj7/hKlG87BLqDJYtwR0fbPld8uJPbQ2ucUMY7Ir0g=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0 h1:n1DH8TPV4qqPTje2RcUBYwtrTWlabVp4n46+74X2pn4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.10.0/go.mod h1:HDcZnuGbiyppErN6lB+idp4CKhjbc8gwjto6OPpyggM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.1 h1:fXPMAmuh0gDuRDey0atC8cXBuKIlqCzCkL8sm1n9Ov0=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.1/go.mod h1:SUZc9YRRHfx2+FAQKNDGrssXehqLpxmwRv2mC/5ntj4=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt2H7QXzZs0q8UBjgRbl56qo8GYM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/Backblaze/blazer v0.6.1 h1:xC9HyC7OcxRzzmtfRiikIEvq4HZYWjU6caFwX2EXw1s=
@ -114,8 +114,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
@ -206,8 +206,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@ -227,8 +227,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
@ -255,14 +255,14 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.19.0 h1:+ThwsDv+tYfnJFhF4L8jITxu1tdTWRTZpdsWgEgjL6Q=
golang.org/x/term v0.19.0/go.mod h1:2CuTdWZ7KHSQwUzKva0cbMg6q2DMI3Mmxp+gKJbskEk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=

View File

@ -567,7 +567,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
hrd := hashing.NewReader(rd, sha256.New())
bufRd.Reset(hrd)
it := repository.NewPackBlobIterator(id, bufRd, 0, blobs, r.Key(), dec)
it := repository.NewPackBlobIterator(id, newBufReader(bufRd), 0, blobs, r.Key(), dec)
for {
val, err := it.Next()
if err == repository.ErrPackEOF {
@ -653,11 +653,41 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r
return nil
}
type bufReader struct {
rd *bufio.Reader
buf []byte
}
func newBufReader(rd *bufio.Reader) *bufReader {
return &bufReader{
rd: rd,
}
}
func (b *bufReader) Discard(n int) (discarded int, err error) {
return b.rd.Discard(n)
}
func (b *bufReader) ReadFull(n int) (buf []byte, err error) {
if cap(b.buf) < n {
b.buf = make([]byte, n)
}
b.buf = b.buf[:n]
_, err = io.ReadFull(b.rd, b.buf)
if err != nil {
return nil, err
}
return b.buf, nil
}
// ReadData loads all data from the repository and checks the integrity.
func (c *Checker) ReadData(ctx context.Context, errChan chan<- error) {
c.ReadPacks(ctx, c.packs, nil, errChan)
}
const maxStreamBufferSize = 4 * 1024 * 1024
// ReadPacks loads data from specified packs and checks the integrity.
func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *progress.Counter, errChan chan<- error) {
defer close(errChan)
@ -675,9 +705,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p
// run workers
for i := 0; i < workerCount; i++ {
g.Go(func() error {
// create a buffer that is large enough to be reused by repository.StreamPack
// this ensures that we can read the pack header later on
bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize)
bufRd := bufio.NewReaderSize(nil, maxStreamBufferSize)
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)

View File

@ -79,13 +79,8 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
for t := range downloadQueue {
err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
if ierr != nil {
// no luck, return the original error
return err
}
// a required blob couldn't be retrieved
return err
}
keepMutex.Lock()

View File

@ -1,7 +1,6 @@
package repository
import (
"bufio"
"bytes"
"context"
"fmt"
@ -12,7 +11,6 @@ import (
"sort"
"sync"
"github.com/cenkalti/backoff/v4"
"github.com/klauspost/compress/zstd"
"github.com/restic/chunker"
"github.com/restic/restic/internal/backend"
@ -29,8 +27,6 @@ import (
"golang.org/x/sync/errgroup"
)
const MaxStreamBufferSize = 4 * 1024 * 1024
const MinPackSize = 4 * 1024 * 1024
const DefaultPackSize = 16 * 1024 * 1024
const MaxPackSize = 128 * 1024 * 1024
@ -966,19 +962,21 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte
}
type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error
type loadBlobFn func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error)
// Skip sections with more than 4MB unused blobs
const maxUnusedRange = 4 * 1024 * 1024
// Skip sections with more than 1MB unused blobs
const maxUnusedRange = 1 * 1024 * 1024
// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to
// the handleBlobFn callback or an error if decryption failed or the blob hash does not match.
// handleBlobFn is called at most once for each blob. If the callback returns an error,
// then LoadBlobsFromPack will abort and not retry it.
// then LoadBlobsFromPack will abort and not retry it. The buf passed to the callback is only valid within
// this specific call. The callback must not keep a reference to buf.
func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn)
return streamPack(ctx, r.Backend().Load, r.LoadBlob, r.getZstdDecoder(), r.key, packID, blobs, handleBlobFn)
}
func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPack(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
if len(blobs) == 0 {
// nothing to do
return nil
@ -990,14 +988,29 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lowerIdx := 0
lastPos := blobs[0].Offset
const maxChunkSize = 2 * DefaultPackSize
for i := 0; i < len(blobs); i++ {
if blobs[i].Offset < lastPos {
// don't wait for streamPackPart to fail
return errors.Errorf("overlapping blobs in pack %v", packID)
}
chunkSizeAfter := (blobs[i].Offset + blobs[i].Length) - blobs[lowerIdx].Offset
split := false
// split if the chunk would become larger than maxChunkSize. Oversized chunks are
// handled by the requirement that the chunk contains at least one blob (i > lowerIdx)
if i > lowerIdx && chunkSizeAfter >= maxChunkSize {
split = true
}
// skip too large gaps as a new request is typically much cheaper than data transfers
if blobs[i].Offset-lastPos > maxUnusedRange {
split = true
}
if split {
// load everything up to the skipped file section
err := streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:i], handleBlobFn)
err := streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:i], handleBlobFn)
if err != nil {
return err
}
@ -1006,10 +1019,10 @@ func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, pack
lastPos = blobs[i].Offset + blobs[i].Length
}
// load remainder
return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn)
return streamPackPart(ctx, beLoad, loadBlobFn, dec, key, packID, blobs[lowerIdx:], handleBlobFn)
}
func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
func streamPackPart(ctx context.Context, beLoad backendLoadFn, loadBlobFn loadBlobFn, dec *zstd.Decoder, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error {
h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false}
dataStart := blobs[0].Offset
@ -1017,57 +1030,108 @@ func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key,
debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs))
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
ctx, cancel := context.WithCancel(ctx)
// stream blobs in pack
err = beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
bufferSize := int(dataEnd - dataStart)
if bufferSize > MaxStreamBufferSize {
bufferSize = MaxStreamBufferSize
}
bufRd := bufio.NewReaderSize(rd, bufferSize)
it := NewPackBlobIterator(packID, bufRd, dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
cancel()
return backoff.Permanent(err)
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return nil
data := make([]byte, int(dataEnd-dataStart))
err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error {
_, cerr := io.ReadFull(rd, data)
return cerr
})
// prevent callbacks after cancellation
if ctx.Err() != nil {
return ctx.Err()
}
if err != nil {
// the context is only still valid if handleBlobFn never returned an error
if loadBlobFn != nil {
// check whether we can get the remaining blobs somewhere else
for _, entry := range blobs {
buf, ierr := loadBlobFn(ctx, entry.Type, entry.ID, nil)
err = handleBlobFn(entry.BlobHandle, buf, ierr)
if err != nil {
break
}
}
}
return errors.Wrap(err, "StreamPack")
}
it := NewPackBlobIterator(packID, newByteReader(data), dataStart, blobs, key, dec)
for {
val, err := it.Next()
if err == ErrPackEOF {
break
} else if err != nil {
return err
}
if val.Err != nil && loadBlobFn != nil {
var ierr error
// check whether we can get a valid copy somewhere else
buf, ierr := loadBlobFn(ctx, val.Handle.Type, val.Handle.ID, nil)
if ierr == nil {
// success
val.Plaintext = buf
val.Err = nil
}
}
err = handleBlobFn(val.Handle, val.Plaintext, val.Err)
if err != nil {
return err
}
// ensure that each blob is only passed once to handleBlobFn
blobs = blobs[1:]
}
return errors.Wrap(err, "StreamPack")
}
// discardReader allows the PackBlobIterator to perform zero copy
// reads if the underlying data source is a byte slice.
type discardReader interface {
Discard(n int) (discarded int, err error)
// ReadFull reads the next n bytes into a byte slice. The caller must not
// retain a reference to the byte. Modifications are only allowed within
// the boundaries of the returned slice.
ReadFull(n int) (buf []byte, err error)
}
type byteReader struct {
buf []byte
}
func newByteReader(buf []byte) *byteReader {
return &byteReader{
buf: buf,
}
}
func (b *byteReader) Discard(n int) (discarded int, err error) {
if len(b.buf) < n {
return 0, io.ErrUnexpectedEOF
}
b.buf = b.buf[n:]
return n, nil
}
func (b *byteReader) ReadFull(n int) (buf []byte, err error) {
if len(b.buf) < n {
return nil, io.ErrUnexpectedEOF
}
buf = b.buf[:n]
b.buf = b.buf[n:]
return buf, nil
}
type PackBlobIterator struct {
packID restic.ID
rd *bufio.Reader
rd discardReader
currentOffset uint
blobs []restic.Blob
key *crypto.Key
dec *zstd.Decoder
buf []byte
decode []byte
}
@ -1079,7 +1143,7 @@ type PackBlobValue struct {
var ErrPackEOF = errors.New("reached EOF of pack file")
func NewPackBlobIterator(packID restic.ID, rd *bufio.Reader, currentOffset uint,
func NewPackBlobIterator(packID restic.ID, rd discardReader, currentOffset uint,
blobs []restic.Blob, key *crypto.Key, dec *zstd.Decoder) *PackBlobIterator {
return &PackBlobIterator{
packID: packID,
@ -1114,21 +1178,12 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry)
if uint(cap(b.buf)) < entry.Length {
b.buf = make([]byte, entry.Length)
}
b.buf = b.buf[:entry.Length]
n, err := io.ReadFull(b.rd, b.buf)
buf, err := b.rd.ReadFull(int(entry.Length))
if err != nil {
debug.Log(" read error %v", err)
return PackBlobValue{}, fmt.Errorf("readFull: %w", err)
}
if n != len(b.buf) {
return PackBlobValue{}, fmt.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, b.packID.Str(), len(b.buf), n)
}
b.currentOffset = entry.Offset + entry.Length
if int(entry.Length) <= b.key.NonceSize() {
@ -1137,7 +1192,7 @@ func (b *PackBlobIterator) Next() (PackBlobValue, error) {
}
// decryption errors are likely permanent, give the caller a chance to skip them
nonce, ciphertext := b.buf[:b.key.NonceSize()], b.buf[b.key.NonceSize():]
nonce, ciphertext := buf[:b.key.NonceSize()], buf[b.key.NonceSize():]
plaintext, err := b.key.Open(ciphertext[:0], nonce, ciphertext, nil)
if err != nil {
err = fmt.Errorf("decrypting blob %v from %v failed: %w", h, b.packID.Str(), err)

View File

@ -146,14 +146,14 @@ func TestStreamPack(t *testing.T) {
}
func testStreamPack(t *testing.T, version uint) {
// always use the same key for deterministic output
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
dec, err := zstd.NewReader(nil)
if err != nil {
t.Fatal(err)
panic(dec)
}
defer dec.Close()
// always use the same key for deterministic output
key := testKey(t)
blobSizes := []int{
5522811,
@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) {
loadCalls = 0
shortFirstLoad = test.shortFirstLoad
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err != nil {
t.Fatal(err)
}
@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) {
return err
}
err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob)
err := streamPack(ctx, load, nil, dec, &key, restic.ID{}, test.blobs, handleBlob)
if err == nil {
t.Fatalf("wanted error %v, got nil", test.err)
}
@ -449,3 +449,83 @@ func TestUnpackedVerification(t *testing.T) {
}
}
}
func testKey(t *testing.T) crypto.Key {
const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}`
var key crypto.Key
err := json.Unmarshal([]byte(jsonKey), &key)
if err != nil {
t.Fatal(err)
}
return key
}
func TestStreamPackFallback(t *testing.T) {
dec, err := zstd.NewReader(nil)
if err != nil {
panic(dec)
}
defer dec.Close()
test := func(t *testing.T, failLoad bool) {
key := testKey(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
plaintext := rtest.Random(800, 42)
blobID := restic.Hash(plaintext)
blobs := []restic.Blob{
{
Length: uint(crypto.CiphertextLength(len(plaintext))),
Offset: 0,
BlobHandle: restic.BlobHandle{
ID: blobID,
Type: restic.DataBlob,
},
},
}
var loadPack backendLoadFn
if failLoad {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
return errors.New("load error")
}
} else {
loadPack = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
// just return an empty array to provoke an error
data := make([]byte, length)
return fn(bytes.NewReader(data))
}
}
loadBlob := func(ctx context.Context, t restic.BlobType, id restic.ID, buf []byte) ([]byte, error) {
if id == blobID {
return plaintext, nil
}
return nil, errors.New("unknown blob")
}
blobOK := false
handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error {
rtest.OK(t, err)
rtest.Equals(t, blobID, blob.ID)
rtest.Equals(t, plaintext, buf)
blobOK = true
return err
}
err := streamPack(ctx, loadPack, loadBlob, dec, &key, restic.ID{}, blobs, handleBlob)
rtest.OK(t, err)
rtest.Assert(t, blobOK, "blob failed to load")
}
t.Run("corrupted blob", func(t *testing.T) {
test(t, false)
})
// test fallback for failed pack loading
t.Run("failed load", func(t *testing.T) {
test(t, true)
})
}