From b856e9489a6e9256653aff2dc17d6254a96e2e7e Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 7 Jan 2024 12:00:32 +0100 Subject: [PATCH] restore: split downloadPack into smaller methods --- internal/restorer/filerestorer.go | 122 ++++++++++++++++-------------- 1 file changed, 65 insertions(+), 57 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 1fc74c7f0..7621e5ebb 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -197,12 +197,13 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return wg.Wait() } -func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { +type blobToFileOffsetsMapping map[restic.ID]struct { + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file +} +func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate blob->[]files->[]offsets mappings - blobs := make(map[restic.ID]struct { - files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file - }) + blobs := make(blobToFileOffsetsMapping) var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { @@ -239,60 +240,9 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } - sanitizeError := func(file *fileInfo, err error) error { - if err != nil { - err = r.Error(file.location, err) - } - return err - } - // track already processed blobs for precise error reporting processedBlobs := restic.NewBlobSet() - err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { - processedBlobs.Insert(h) - blob := blobs[h.ID] - if err != nil { - for file := range blob.files { - if errFile := sanitizeError(file, err); errFile != nil { - return errFile - } - } - return nil - } - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - createSize := int64(-1) - file.lock.Lock() - if file.inProgress { - file.lock.Unlock() - } else { - defer file.lock.Unlock() - file.inProgress = true - createSize = file.size - } - writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse) - - if r.progress != nil { - r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size)) - } - - return writeErr - } - err := sanitizeError(file, writeToFile()) - if err != nil { - return err - } - } - } - return nil - }) + err := r.downloadBlobs(ctx, pack.id, blobList, blobs, processedBlobs) if err != nil { // only report error for not yet processed blobs @@ -308,7 +258,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } for file := range affectedFiles { - if errFile := sanitizeError(file, err); errFile != nil { + if errFile := r.sanitizeError(file, err); errFile != nil { return errFile } } @@ -316,3 +266,61 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return nil } + +func (r *fileRestorer) sanitizeError(file *fileInfo, err error) error { + if err != nil { + err = r.Error(file.location, err) + } + return err +} + +func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, blobList []restic.Blob, + blobs blobToFileOffsetsMapping, processedBlobs restic.BlobSet) error { + + return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList, + func(h restic.BlobHandle, blobData []byte, err error) error { + processedBlobs.Insert(h) + blob := blobs[h.ID] + if err != nil { + for file := range blob.files { + if errFile := r.sanitizeError(file, err); errFile != nil { + return errFile + } + } + return nil + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + createSize := int64(-1) + file.lock.Lock() + if file.inProgress { + file.lock.Unlock() + } else { + defer file.lock.Unlock() + file.inProgress = true + createSize = file.size + } + writeErr := r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize, file.sparse) + + if r.progress != nil { + r.progress.AddProgress(file.location, uint64(len(blobData)), uint64(file.size)) + } + + return writeErr + } + err := r.sanitizeError(file, writeToFile()) + if err != nil { + return err + } + } + } + return nil + }) +}