1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2024-12-22 15:57:07 +00:00
restic/internal/archiver/file_saver.go
Michael Eischer 48dbefc37e fs / archiver: convert to handle based interface
The actual implementation still relies on file paths, but with the
abstraction layer in place, an FS implementation can ensure atomic file
accesses in the future.
2024-11-16 12:56:23 +01:00

274 lines
6 KiB
Go

package archiver
import (
"context"
"fmt"
"io"
"sync"
"github.com/restic/chunker"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/restic"
"golang.org/x/sync/errgroup"
)
// saveBlobFn saves a blob to a repo.
type saveBlobFn func(context.Context, restic.BlobType, *buffer, string, func(res saveBlobResponse))
// fileSaver concurrently saves incoming files to the repo.
type fileSaver struct {
saveFilePool *bufferPool
saveBlob saveBlobFn
pol chunker.Pol
ch chan<- saveFileJob
CompleteBlob func(bytes uint64)
NodeFromFileInfo func(snPath, filename string, meta ToNoder, ignoreXattrListError bool) (*restic.Node, error)
}
// newFileSaver returns a new file saver. A worker pool with fileWorkers is
// started, it is stopped when ctx is cancelled.
func newFileSaver(ctx context.Context, wg *errgroup.Group, save saveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *fileSaver {
ch := make(chan saveFileJob)
debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers)
poolSize := fileWorkers + blobWorkers
s := &fileSaver{
saveBlob: save,
saveFilePool: newBufferPool(int(poolSize), chunker.MaxSize),
pol: pol,
ch: ch,
CompleteBlob: func(uint64) {},
}
for i := uint(0); i < fileWorkers; i++ {
wg.Go(func() error {
s.worker(ctx, ch)
return nil
})
}
return s
}
func (s *fileSaver) TriggerShutdown() {
close(s.ch)
}
// fileCompleteFunc is called when the file has been saved.
type fileCompleteFunc func(*restic.Node, ItemStats)
// Save stores the file f and returns the data once it has been completed. The
// file is closed by Save. completeReading is only called if the file was read
// successfully. complete is always called. If completeReading is called, then
// this will always happen before calling complete.
func (s *fileSaver) Save(ctx context.Context, snPath string, target string, file fs.File, start func(), completeReading func(), complete fileCompleteFunc) futureNode {
fn, ch := newFutureNode()
job := saveFileJob{
snPath: snPath,
target: target,
file: file,
ch: ch,
start: start,
completeReading: completeReading,
complete: complete,
}
select {
case s.ch <- job:
case <-ctx.Done():
debug.Log("not sending job, context is cancelled: %v", ctx.Err())
_ = file.Close()
close(ch)
}
return fn
}
type saveFileJob struct {
snPath string
target string
file fs.File
ch chan<- futureNodeResult
start func()
completeReading func()
complete fileCompleteFunc
}
// saveFile stores the file f in the repo, then closes it.
func (s *fileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPath string, target string, f fs.File, start func(), finishReading func(), finish func(res futureNodeResult)) {
start()
fnr := futureNodeResult{
snPath: snPath,
target: target,
}
var lock sync.Mutex
remaining := 0
isCompleted := false
completeBlob := func() {
lock.Lock()
defer lock.Unlock()
remaining--
if remaining == 0 && fnr.err == nil {
if isCompleted {
panic("completed twice")
}
for _, id := range fnr.node.Content {
if id.IsNull() {
panic("completed file with null ID")
}
}
isCompleted = true
finish(fnr)
}
}
completeError := func(err error) {
lock.Lock()
defer lock.Unlock()
if fnr.err == nil {
if isCompleted {
panic("completed twice")
}
isCompleted = true
fnr.err = fmt.Errorf("failed to save %v: %w", target, err)
fnr.node = nil
fnr.stats = ItemStats{}
finish(fnr)
}
}
debug.Log("%v", snPath)
node, err := s.NodeFromFileInfo(snPath, target, f, false)
if err != nil {
_ = f.Close()
completeError(err)
return
}
if node.Type != restic.NodeTypeFile {
_ = f.Close()
completeError(errors.Errorf("node type %q is wrong", node.Type))
return
}
// reuse the chunker
chnker.Reset(f, s.pol)
node.Content = []restic.ID{}
node.Size = 0
var idx int
for {
buf := s.saveFilePool.Get()
chunk, err := chnker.Next(buf.Data)
if err == io.EOF {
buf.Release()
break
}
buf.Data = chunk.Data
node.Size += uint64(chunk.Length)
if err != nil {
_ = f.Close()
completeError(err)
return
}
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
completeError(ctx.Err())
return
}
// add a place to store the saveBlob result
pos := idx
lock.Lock()
node.Content = append(node.Content, restic.ID{})
lock.Unlock()
s.saveBlob(ctx, restic.DataBlob, buf, target, func(sbr saveBlobResponse) {
lock.Lock()
if !sbr.known {
fnr.stats.DataBlobs++
fnr.stats.DataSize += uint64(sbr.length)
fnr.stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content[pos] = sbr.id
lock.Unlock()
completeBlob()
})
idx++
// test if the context has been cancelled, return the error
if ctx.Err() != nil {
_ = f.Close()
completeError(ctx.Err())
return
}
s.CompleteBlob(uint64(len(chunk.Data)))
}
err = f.Close()
if err != nil {
completeError(err)
return
}
fnr.node = node
lock.Lock()
// require one additional completeFuture() call to ensure that the future only completes
// after reaching the end of this method
remaining += idx + 1
lock.Unlock()
finishReading()
completeBlob()
}
func (s *fileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) {
// a worker has one chunker which is reused for each file (because it contains a rather large buffer)
chnker := chunker.New(nil, s.pol)
for {
var job saveFileJob
var ok bool
select {
case <-ctx.Done():
return
case job, ok = <-jobs:
if !ok {
return
}
}
s.saveFile(ctx, chnker, job.snPath, job.target, job.file, job.start, func() {
if job.completeReading != nil {
job.completeReading()
}
}, func(res futureNodeResult) {
if job.complete != nil {
job.complete(res.node, res.stats)
}
job.ch <- res
close(job.ch)
})
}
}