archiver: reduce memory usage for large files

FutureBlob now uses a Take() method as a more memory-efficient way to
retrieve the futures result. In addition, futures are now collected
while saving the file. As only a limited number of blobs can be queued
for uploading, for a large file nearly all FutureBlobs already have
their result ready, such that the FutureBlob object just consumes
memory.
This commit is contained in:
Michael Eischer 2022-05-22 15:14:25 +02:00
parent b817681a11
commit 4a10ebed15
5 changed files with 64 additions and 61 deletions

View File

@ -184,17 +184,17 @@ func (arch *Archiver) saveTree(ctx context.Context, t *restic.TreeJSONBuilder) (
b := &Buffer{Data: buf}
res := arch.blobSaver.Save(ctx, restic.TreeBlob, b)
res.Wait(ctx)
if !res.Known() {
sbr := res.Take(ctx)
if !sbr.known {
s.TreeBlobs++
s.TreeSize += uint64(res.Length())
s.TreeSizeInRepo += uint64(res.SizeInRepo())
s.TreeSize += uint64(sbr.length)
s.TreeSizeInRepo += uint64(sbr.sizeInRepo)
}
// The context was canceled in the meantime, res.ID() might be invalid
// The context was canceled in the meantime, id might be invalid
if ctx.Err() != nil {
return restic.ID{}, s, ctx.Err()
}
return res.ID(), s, nil
return sbr.id, s, nil
}
// nodeFromFileInfo returns the restic node from an os.FileInfo.

View File

@ -44,9 +44,7 @@ func (s *BlobSaver) TriggerShutdown() {
// Save stores a blob in the repo. It checks the index and the known blobs
// before saving anything. It takes ownership of the buffer passed in.
func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob {
// buf might be freed once the job was submitted, thus calculate the length now
length := len(buf.Data)
ch := make(chan saveBlobResponse, 1)
ch := make(chan SaveBlobResponse, 1)
select {
case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}:
case <-ctx.Done():
@ -55,72 +53,62 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu
return FutureBlob{ch: ch}
}
return FutureBlob{ch: ch, length: length}
return FutureBlob{ch: ch}
}
// FutureBlob is returned by SaveBlob and will return the data once it has been processed.
type FutureBlob struct {
ch <-chan saveBlobResponse
length int
res saveBlobResponse
ch <-chan SaveBlobResponse
}
// Wait blocks until the result is available or the context is cancelled.
func (s *FutureBlob) Wait(ctx context.Context) {
func (s *FutureBlob) Poll() *SaveBlobResponse {
select {
case <-ctx.Done():
return
case res, ok := <-s.ch:
if ok {
s.res = res
return &res
}
default:
}
return nil
}
// ID returns the ID of the blob after it has been saved.
func (s *FutureBlob) ID() restic.ID {
return s.res.id
// Take blocks until the result is available or the context is cancelled.
func (s *FutureBlob) Take(ctx context.Context) SaveBlobResponse {
select {
case res, ok := <-s.ch:
if ok {
return res
}
// Known returns whether or not the blob was already known.
func (s *FutureBlob) Known() bool {
return s.res.known
case <-ctx.Done():
}
// Length returns the raw length of the blob.
func (s *FutureBlob) Length() int {
return s.length
}
// SizeInRepo returns the number of bytes added to the repo (including
// compression and crypto overhead).
func (s *FutureBlob) SizeInRepo() int {
return s.res.size
return SaveBlobResponse{}
}
type saveBlobJob struct {
restic.BlobType
buf *Buffer
ch chan<- saveBlobResponse
ch chan<- SaveBlobResponse
}
type saveBlobResponse struct {
type SaveBlobResponse struct {
id restic.ID
length int
sizeInRepo int
known bool
size int
}
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (saveBlobResponse, error) {
id, known, size, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) (SaveBlobResponse, error) {
id, known, sizeInRepo, err := s.repo.SaveBlob(ctx, t, buf, restic.ID{}, false)
if err != nil {
return saveBlobResponse{}, err
return SaveBlobResponse{}, err
}
return saveBlobResponse{
return SaveBlobResponse{
id: id,
length: len(buf),
sizeInRepo: sizeInRepo,
known: known,
size: size,
}, nil
}

View File

@ -54,8 +54,8 @@ func TestBlobSaver(t *testing.T) {
}
for i, blob := range results {
blob.Wait(ctx)
if blob.Known() {
sbr := blob.Take(ctx)
if sbr.known {
t.Errorf("blob %v is known, that should not be the case", i)
}
}

View File

@ -129,6 +129,15 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
chnker.Reset(f, s.pol)
var results []FutureBlob
complete := func(sbr SaveBlobResponse) {
if !sbr.known {
stats.DataBlobs++
stats.DataSize += uint64(sbr.length)
stats.DataSizeInRepo += uint64(sbr.sizeInRepo)
}
node.Content = append(node.Content, sbr.id)
}
node.Content = []restic.ID{}
var size uint64
@ -168,6 +177,17 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
}
s.CompleteBlob(f.Name(), uint64(len(chunk.Data)))
// collect already completed blobs
for len(results) > 0 {
sbr := results[0].Poll()
if sbr == nil {
break
}
results[0] = FutureBlob{}
results = results[1:]
complete(*sbr)
}
}
err = f.Close()
@ -176,15 +196,10 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat
return fnr
}
for _, res := range results {
res.Wait(ctx)
if !res.Known() {
stats.DataBlobs++
stats.DataSize += uint64(res.Length())
stats.DataSizeInRepo += uint64(res.SizeInRepo())
}
node.Content = append(node.Content, res.ID())
for i, res := range results {
results[i] = FutureBlob{}
sbr := res.Take(ctx)
complete(sbr)
}
node.Size = size

View File

@ -34,7 +34,7 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont
wg, ctx := errgroup.WithContext(ctx)
saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob {
ch := make(chan saveBlobResponse)
ch := make(chan SaveBlobResponse)
close(ch)
return FutureBlob{ch: ch}
}