From 4bb7f2f2edaa53f837829183900877223eb603d3 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 17 Feb 2015 22:39:44 +0100 Subject: [PATCH] Add Preloader for blobs --- archiver.go | 39 +++++++++++++++++++++++++++++ archiver_test.go | 54 +++++++++++++++++++++++++++++++++++++++- cmd/restic/cmd_backup.go | 6 +++++ 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/archiver.go b/archiver.go index 67e76d9fb..70e39e69d 100644 --- a/archiver.go +++ b/archiver.go @@ -59,6 +59,45 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) { return arch, nil } +// Preload loads all tree objects from repository and adds all blobs that are +// still available to the map for deduplication. +func (arch *Archiver) Preload() error { + // load all trees, in parallel + worker := func(wg *sync.WaitGroup, c <-chan backend.ID) { + for id := range c { + tree, err := LoadTree(arch.s, id) + // ignore error and advance to next tree + if err != nil { + return + } + + arch.m.Merge(tree.Map) + } + wg.Done() + } + + idCh := make(chan backend.ID) + + // start workers + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { + wg.Add(1) + go worker(&wg, idCh) + } + + // list ids + err := arch.s.EachID(backend.Tree, func(id backend.ID) { + idCh <- id + }) + + close(idCh) + + // wait for workers + wg.Wait() + + return err +} + func (arch *Archiver) Save(t backend.Type, id backend.ID, length uint, rd io.Reader) (Blob, error) { debug.Log("Archiver.Save", "Save(%v, %v)\n", t, id.Str()) diff --git a/archiver_test.go b/archiver_test.go index c867f903b..abaa75bdb 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -8,10 +8,11 @@ import ( "testing" "github.com/restic/restic" + "github.com/restic/restic/backend" "github.com/restic/restic/chunker" ) -var benchArchiveDirectory = flag.String("test.benchdir", "", "benchmark archiving a real directory") +var benchArchiveDirectory = flag.String("test.benchdir", ".", "benchmark archiving a real directory (default: .)") func get_random(seed, count int) []byte { buf := make([]byte, count) @@ -141,3 +142,54 @@ func BenchmarkArchiveDirectory(b *testing.B) { b.Logf("snapshot archived as %v", id) } + +func snapshot(t *testing.T, server restic.Server, path string) *restic.Snapshot { + arch, err := restic.NewArchiver(server, nil) + ok(t, err) + ok(t, arch.Preload()) + sn, _, err := arch.Snapshot(path, nil) + ok(t, err) + return sn +} + +func countBlobs(t *testing.T, server restic.Server) int { + blobs := 0 + err := server.EachID(backend.Tree, func(id backend.ID) { + tree, err := restic.LoadTree(server, id) + ok(t, err) + + blobs += tree.Map.Len() + }) + ok(t, err) + + return blobs +} + +func TestArchiverPreload(t *testing.T) { + be := setupBackend(t) + defer teardownBackend(t, be) + key := setupKey(t, be, "geheim") + server := restic.NewServerWithKey(be, key) + + // archive a few files + sn := snapshot(t, server, *benchArchiveDirectory) + t.Logf("archived snapshot %v", sn.ID) + + // get archive stats + blobsBefore := countBlobs(t, server) + t.Logf("found %v blobs", blobsBefore) + + // archive the same files again + sn2 := snapshot(t, server, *benchArchiveDirectory) + t.Logf("archived snapshot %v", sn2.ID) + + // get archive stats + blobsAfter := countBlobs(t, server) + t.Logf("found %v blobs", blobsAfter) + + // if there are more than 10% more blobs, something is wrong + if blobsAfter > (blobsBefore + blobsBefore/10) { + t.Fatalf("TestArchiverPreload: too many blobs in repository: before %d, after %d, threshhold %d", + blobsBefore, blobsAfter, (blobsBefore + blobsBefore/10)) + } +} diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index e238a8bf2..50d19daa2 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -193,6 +193,12 @@ func (cmd CmdBackup) Execute(args []string) error { return nil } + fmt.Printf("loading blobs\n") + err = arch.Preload() + if err != nil { + return err + } + _, id, err := arch.Snapshot(target, parentSnapshotID) if err != nil { return err