From e19c87fa7da9951b24bf106da165fd343de8deda Mon Sep 17 00:00:00 2001 From: Florian Weingarten Date: Sat, 2 May 2015 01:05:49 -0400 Subject: [PATCH] Break Archiver.SaveFile() method down into smaller ones --- archiver.go | 157 +++++++++++++++++++++++++++++----------------------- 1 file changed, 88 insertions(+), 69 deletions(-) diff --git a/archiver.go b/archiver.go index f1d82b082..dd1b1b11e 100644 --- a/archiver.go +++ b/archiver.go @@ -12,6 +12,7 @@ import ( "github.com/juju/arrar" "github.com/restic/restic/backend" + "github.com/restic/restic/chunker" "github.com/restic/restic/debug" "github.com/restic/restic/pack" "github.com/restic/restic/pipe" @@ -88,92 +89,67 @@ func (arch *Archiver) SaveTreeJSON(item interface{}) (backend.ID, error) { return arch.s.SaveJSON(pack.Tree, item) } -// SaveFile stores the content of the file on the backend as a Blob by calling -// Save for each chunk. -func (arch *Archiver) SaveFile(p *Progress, node *Node) error { - file, err := node.OpenForReading() - defer file.Close() - if err != nil { - return err - } - - // check file again, since it could have disappeared by now +func (arch *Archiver) reloadFileIfChanged(node *Node, file *os.File) (*Node, error) { fi, err := file.Stat() if err != nil { - return err + return nil, err } - if fi.ModTime() != node.ModTime { - e2 := arch.Error(node.path, fi, errors.New("file was updated, using new version")) - - if e2 == nil { - n, err := NodeFromFileInfo(node.path, fi) - if err != nil { - debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err) - return err - } - - *node = *n - } + if fi.ModTime() == node.ModTime { + return node, nil } - type result struct { - id backend.ID - bytes uint64 + err = arch.Error(node.path, fi, errors.New("file has changed")) + if err != nil { + return nil, err } - // store all chunks - chnker := GetChunker("archiver.SaveFile") - chnker.Reset(file, arch.s.ChunkerPolynomial()) - chans := [](<-chan result){} - defer FreeChunker("archiver.SaveFile", chnker) - - chunks := 0 - - for { - chunk, err := chnker.Next() - if err == io.EOF { - break - } - - if err != nil { - return arrar.Annotate(err, "SaveFile() chunker.Next()") - } - - chunks++ - - // acquire token, start goroutine to save chunk - token := <-arch.blobToken - resCh := make(chan result, 1) - - go func(ch chan<- result) { - err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) - // TODO handle error - if err != nil { - panic(err) - } - - p.Report(Stat{Bytes: uint64(chunk.Length)}) - arch.blobToken <- token - ch <- result{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)} - }(resCh) - - chans = append(chans, resCh) + node, err = NodeFromFileInfo(node.path, fi) + if err != nil { + debug.Log("Archiver.SaveFile", "NodeFromFileInfo returned error for %v: %v", node.path, err) + return nil, err } - results := []result{} - for _, ch := range chans { + return node, nil +} + +type saveResult struct { + id backend.ID + bytes uint64 +} + +func (arch *Archiver) saveChunk(chunk *chunker.Chunk, p *Progress, token struct{}, file *os.File, resultChannel chan<- saveResult) { + err := arch.Save(pack.Data, chunk.Digest, chunk.Length, chunk.Reader(file)) + // TODO handle error + if err != nil { + panic(err) + } + + p.Report(Stat{Bytes: uint64(chunk.Length)}) + arch.blobToken <- token + resultChannel <- saveResult{id: backend.ID(chunk.Digest), bytes: uint64(chunk.Length)} +} + +func waitForResults(resultChannels [](<-chan saveResult)) ([]saveResult, error) { + results := []saveResult{} + + for _, ch := range resultChannels { results = append(results, <-ch) } - if len(results) != chunks { - return fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", chunks, len(results)) + if len(results) != len(resultChannels) { + return nil, fmt.Errorf("chunker returned %v chunks, but only %v blobs saved", len(resultChannels), len(results)) } - var bytes uint64 + return results, nil +} - node.Content = make([]backend.ID, len(results)) +func updateNodeContent(node *Node, results []saveResult) error { debug.Log("Archiver.Save", "checking size for file %s", node.path) + + var bytes uint64 + node.Content = make([]backend.ID, len(results)) + for i, b := range results { node.Content[i] = b.id bytes += b.bytes @@ -190,6 +166,49 @@ func (arch *Archiver) SaveFile(p *Progress, node *Node) error { return nil } +// SaveFile stores the content of the file on the backend as a Blob by calling +// Save for each chunk. +func (arch *Archiver) SaveFile(p *Progress, node *Node) error { + file, err := node.OpenForReading() + defer file.Close() + if err != nil { + return err + } + + node, err = arch.reloadFileIfChanged(node, file) + if err != nil { + return err + } + + chnker := GetChunker("archiver.SaveFile") + chnker.Reset(file, arch.s.ChunkerPolynomial()) + resultChannels := [](<-chan saveResult){} + defer FreeChunker("archiver.SaveFile", chnker) + + for { + chunk, err := chnker.Next() + if err == io.EOF { + break + } + + if err != nil { + return arrar.Annotate(err, "SaveFile() chunker.Next()") + } + + resCh := make(chan saveResult, 1) + go arch.saveChunk(chunk, p, <-arch.blobToken, file, resCh) + resultChannels = append(resultChannels, resCh) + } + + results, err := waitForResults(resultChannels) + if err != nil { + return err + } + + err = updateNodeContent(node, results) + return err +} + func (arch *Archiver) saveTree(p *Progress, t *Tree) (backend.ID, error) { debug.Log("Archiver.saveTree", "saveTree(%v)\n", t) var wg sync.WaitGroup