diff --git a/archiver.go b/archiver.go index c1b9a3578..08f8b9995 100644 --- a/archiver.go +++ b/archiver.go @@ -29,6 +29,7 @@ const ( type Archiver struct { s Server m *Map + c *Cache blobToken chan struct{} @@ -51,6 +52,12 @@ func NewArchiver(s Server) (*Archiver, error) { // create new map to store all blobs in arch.m = NewMap() + // init cache + arch.c, err = NewCache() + if err != nil { + return nil, err + } + // abort on all errors arch.Error = func(string, os.FileInfo, error) error { return err } // allow all files @@ -59,81 +66,40 @@ func NewArchiver(s Server) (*Archiver, error) { return arch, nil } -// Preload loads all tree objects from repository and adds all blobs that are -// still available to the map for deduplication. -func (arch *Archiver) Preload(p *Progress) error { - cache, err := NewCache() +// Preload loads all blobs for all cached snapshots. +func (arch *Archiver) Preload() error { + // list snapshots first + snapshots, err := arch.s.List(backend.Snapshot) if err != nil { return err } - p.Start() - defer p.Done() - - debug.Log("Archiver.Preload", "Start loading known blobs") - - // load all trees, in parallel - worker := func(wg *sync.WaitGroup, c <-chan backend.ID) { - for id := range c { - var tree *Tree - - // load from cache - var t Tree - rd, err := cache.Load(backend.Tree, id) - if err == nil { - debug.Log("Archiver.Preload", "tree %v cached", id.Str()) - tree = &t - dec := json.NewDecoder(rd) - err = dec.Decode(&t) - - if err != nil { - continue - } - } else { - debug.Log("Archiver.Preload", "tree %v not cached: %v", id.Str(), err) - - tree, err = LoadTree(arch.s, id) - // ignore error and advance to next tree - if err != nil { - continue - } - } - - debug.Log("Archiver.Preload", "load tree %v with %d blobs", id, tree.Map.Len()) - arch.m.Merge(tree.Map) - p.Report(Stat{Trees: 1, Blobs: uint64(tree.Map.Len())}) + // TODO: track seen tree ids, load trees that aren't in the set + for _, id := range snapshots { + // try to load snapshot blobs from cache + rd, err := arch.c.Load(backend.Snapshot, "blobs", id) + if err != nil { + debug.Log("Archiver.Preload", "blobs for snapshot %v not cached: %v", id.Str(), err) + return err } - wg.Done() + + debug.Log("Archiver.Preload", "load cached blobs for snapshot %v", id.Str()) + dec := json.NewDecoder(rd) + + m := &Map{} + err = dec.Decode(m) + if err != nil { + debug.Log("Archiver.Preload", "error loading cached blobs for snapshot %v: %v", id.Str(), err) + continue + } + + arch.m.Merge(m) + + debug.Log("Archiver.Preload", "done loading cached blobs for snapshot %v", id.Str()) } - idCh := make(chan backend.ID) - - // start workers - var wg sync.WaitGroup - for i := 0; i < maxConcurrencyPreload; i++ { - wg.Add(1) - go worker(&wg, idCh) - } - - // list ids - trees := 0 - err = arch.s.EachID(backend.Tree, func(id backend.ID) { - trees++ - - if trees%1000 == 0 { - debug.Log("Archiver.Preload", "Loaded %v trees", trees) - } - idCh <- id - }) - - close(idCh) - - // wait for workers - wg.Wait() - - debug.Log("Archiver.Preload", "Loaded %v blobs from %v trees", arch.m.Len(), trees) - - return err + debug.Log("Archiver.Preload", "Loaded %v blobs from %v snapshots", arch.m.Len(), len(snapshots)) + return nil } func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) { @@ -787,7 +753,7 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn // receive the top-level tree root := (<-resCh).(*Node) - debug.Log("Archiver.Snapshot", "root node received: %#v", root.blobs[0]) + debug.Log("Archiver.Snapshot", "root node received: %v", root.blobs[0]) sn.Tree = root.blobs[0] // save snapshot @@ -796,6 +762,26 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn return nil, nil, err } + debug.Log("Archiver.Snapshot", "saved snapshot %v", blob.Storage.Str()) + + // cache blobs + wr, err := arch.c.Store(backend.Snapshot, "blobs", blob.Storage) + if err != nil { + debug.Log("Archiver.Snapshot", "unable to cache blobs for snapshot %v: %v", blob.Storage.Str(), err) + fmt.Fprintf(os.Stderr, "unable to cache blobs for snapshot %v: %v\n", blob.Storage.Str(), err) + return sn, blob.Storage, nil + } + + enc := json.NewEncoder(wr) + err = enc.Encode(arch.m) + if err != nil { + debug.Log("Archiver.Snapshot", "error encoding map for snapshot %v: %v", blob.Storage.Str(), err) + } else { + debug.Log("Archiver.Snapshot", "cached %d blobs for snapshot %v", arch.m.Len(), blob.Storage.Str()) + } + + wr.Close() + return sn, blob.Storage, nil } diff --git a/archiver_test.go b/archiver_test.go index 1adbb1c61..744c34ad2 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -146,7 +146,7 @@ func BenchmarkArchiveDirectory(b *testing.B) { func snapshot(t testing.TB, server restic.Server, path string) *restic.Snapshot { arch, err := restic.NewArchiver(server) ok(t, err) - ok(t, arch.Preload(nil)) + ok(t, arch.Preload()) sn, _, err := arch.Snapshot(nil, []string{path}, nil) ok(t, err) return sn @@ -226,7 +226,7 @@ func BenchmarkPreload(t *testing.B) { // create new archiver and preload arch2, err := restic.NewArchiver(server) ok(t, err) - ok(t, arch2.Preload(nil)) + ok(t, arch2.Preload()) } } diff --git a/cache.go b/cache.go index 9ecad0339..341c9446e 100644 --- a/cache.go +++ b/cache.go @@ -9,12 +9,15 @@ import ( "github.com/restic/restic/backend" ) +// for testing +var getCacheDir = GetCacheDir + type Cache struct { base string } func NewCache() (*Cache, error) { - dir, err := GetCacheDir() + dir, err := getCacheDir() if err != nil { return nil, err } @@ -22,9 +25,9 @@ func NewCache() (*Cache, error) { return &Cache{base: dir}, nil } -func (c *Cache) Has(t backend.Type, id backend.ID) (bool, error) { +func (c *Cache) Has(t backend.Type, subtype string, id backend.ID) (bool, error) { // try to open file - filename, err := c.filename(t, id) + filename, err := c.filename(t, subtype, id) if err != nil { return false, err } @@ -42,31 +45,29 @@ func (c *Cache) Has(t backend.Type, id backend.ID) (bool, error) { return true, nil } -func (c *Cache) Store(t backend.Type, id backend.ID, rd io.Reader) error { - filename, err := c.filename(t, id) +func (c *Cache) Store(t backend.Type, subtype string, id backend.ID) (io.WriteCloser, error) { + filename, err := c.filename(t, subtype, id) if err != nil { - return err + return nil, err } dirname := filepath.Dir(filename) err = os.MkdirAll(dirname, 0700) if err != nil { - return err + return nil, err } file, err := os.Create(filename) - defer file.Close() if err != nil { - return err + return nil, err } - _, err = io.Copy(file, rd) - return err + return file, nil } -func (c *Cache) Load(t backend.Type, id backend.ID) (io.ReadCloser, error) { +func (c *Cache) Load(t backend.Type, subtype string, id backend.ID) (io.ReadCloser, error) { // try to open file - filename, err := c.filename(t, id) + filename, err := c.filename(t, subtype, id) if err != nil { return nil, err } @@ -75,17 +76,17 @@ func (c *Cache) Load(t backend.Type, id backend.ID) (io.ReadCloser, error) { } // Construct file name for given Type. -func (c *Cache) filename(t backend.Type, id backend.ID) (string, error) { - cachedir, err := GetCacheDir() - if err != nil { - return "", err +func (c *Cache) filename(t backend.Type, subtype string, id backend.ID) (string, error) { + filename := id.String() + if subtype != "" { + filename += "." + subtype } switch t { case backend.Snapshot: - return filepath.Join(cachedir, "snapshots", id.String()), nil + return filepath.Join(c.base, "snapshots", filename), nil case backend.Tree: - return filepath.Join(cachedir, "trees", id.String()), nil + return filepath.Join(c.base, "trees", filename), nil } return "", fmt.Errorf("cache not supported for type %v", t) diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 54e23b0e6..438727295 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -91,44 +91,6 @@ func newScanProgress() *restic.Progress { return p } -func newLoadBlobsProgress(s restic.Server) (*restic.Progress, error) { - if !terminal.IsTerminal(int(os.Stdout.Fd())) { - return nil, nil - } - - trees, err := s.Count(backend.Tree) - if err != nil { - return nil, err - } - - eta := uint64(0) - tps := uint64(0) // trees per second - - p := restic.NewProgress(time.Second) - p.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { - sec := uint64(d / time.Second) - if trees > 0 && sec > 0 && ticker { - tps = uint64(s.Trees) / sec - if tps > 0 { - eta = (uint64(trees) - s.Trees) / tps - } - } - - fmt.Printf("\x1b[2K\r[%s] %3.2f%% %d trees/s %d / %d trees, %d blobs ETA %s", - format_duration(d), - float64(s.Trees)/float64(trees)*100, - tps, - s.Trees, trees, - s.Blobs, - format_seconds(eta)) - } - p.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { - fmt.Printf("\nDone in %s\n", format_duration(d)) - } - - return p, nil -} - func newArchiveProgress(todo restic.Stat) *restic.Progress { if !terminal.IsTerminal(int(os.Stdout.Fd())) { return nil @@ -218,12 +180,7 @@ func (cmd CmdBackup) Execute(args []string) error { } fmt.Printf("loading blobs\n") - pb, err := newLoadBlobsProgress(s) - if err != nil { - return err - } - - err = arch.Preload(pb) + err = arch.Preload() if err != nil { return err } diff --git a/cmd/restic/cmd_cache.go b/cmd/restic/cmd_cache.go index 40dd929da..ab8ea0a5a 100644 --- a/cmd/restic/cmd_cache.go +++ b/cmd/restic/cmd_cache.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "io" "sync" "github.com/restic/restic" @@ -49,7 +50,7 @@ func (cmd CmdCache) Execute(args []string) error { treeCh := make(chan backend.ID) worker := func(wg *sync.WaitGroup, ch chan backend.ID) { for treeID := range ch { - cached, err := cache.Has(backend.Tree, treeID) + cached, err := cache.Has(backend.Tree, "", treeID) if err != nil { fmt.Printf("tree %v cache error: %v\n", treeID.Str(), err) continue @@ -72,12 +73,18 @@ func (cmd CmdCache) Execute(args []string) error { continue } - err = cache.Store(backend.Tree, treeID, decRd) + wr, err := cache.Store(backend.Tree, "", treeID) if err != nil { fmt.Printf(" store error: %v\n", err) continue } + _, err = io.Copy(wr, decRd) + if err != nil { + fmt.Printf(" Copy error: %v\n", err) + continue + } + err = decRd.Close() if err != nil { fmt.Printf(" close error: %v\n", err) diff --git a/map.go b/map.go index 7c1dc4872..4b9cd7d87 100644 --- a/map.go +++ b/map.go @@ -140,6 +140,17 @@ func (bl *Map) Equals(other *Map) bool { return true } +// Each calls f for each blob in the Map. While Each is running, no other +// operation is possible, since a mutex is held for the whole time. +func (bl *Map) Each(f func(blob Blob)) { + bl.m.Lock() + defer bl.m.Unlock() + + for _, blob := range bl.list { + f(blob) + } +} + // Select returns a list of of blobs from the plaintext IDs given in list. func (bl *Map) Select(list backend.IDs) (Blobs, error) { bl.m.Lock()