1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-01-18 13:31:08 +00:00

Merge pull request #328 from restic/improve-backup-speed

Load trees from repository in parallel
This commit is contained in:
Alexander Neumann 2015-10-29 20:03:29 +01:00
commit 6ffd7da4d7
8 changed files with 1503 additions and 28 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/restic/restic" "github.com/restic/restic"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/filter" "github.com/restic/restic/filter"
"github.com/restic/restic/repository" "github.com/restic/restic/repository"
"golang.org/x/crypto/ssh/terminal" "golang.org/x/crypto/ssh/terminal"
@ -300,6 +301,10 @@ func (cmd CmdBackup) Execute(args []string) error {
cmd.global.Warnf("error for exclude pattern: %v", err) cmd.global.Warnf("error for exclude pattern: %v", err)
} }
if matched {
debug.Log("backup.Execute", "path %q excluded by a filter", item)
}
return !matched return !matched
} }

View file

@ -12,11 +12,18 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
) )
type process struct {
tag string
goroutine int
}
var opts struct { var opts struct {
logger *log.Logger logger *log.Logger
tags map[string]bool tags map[string]bool
last map[process]time.Time
m sync.Mutex m sync.Mutex
} }
@ -65,6 +72,7 @@ func initDebugLogger() {
func initDebugTags() { func initDebugTags() {
opts.tags = make(map[string]bool) opts.tags = make(map[string]bool)
opts.last = make(map[process]time.Time)
// defaults // defaults
opts.tags["break"] = true opts.tags["break"] = true
@ -113,14 +121,12 @@ func goroutineNum() int {
} }
// taken from https://github.com/VividCortex/trace // taken from https://github.com/VividCortex/trace
func getPosition() string { func getPosition(goroutine int) string {
_, file, line, ok := runtime.Caller(2) _, file, line, ok := runtime.Caller(2)
if !ok { if !ok {
return "" return ""
} }
goroutine := goroutineNum()
return fmt.Sprintf("%3d %s:%d", goroutine, filepath.Base(file), line) return fmt.Sprintf("%3d %s:%d", goroutine, filepath.Base(file), line)
} }
@ -130,6 +136,15 @@ func Log(tag string, f string, args ...interface{}) {
opts.m.Lock() opts.m.Lock()
defer opts.m.Unlock() defer opts.m.Unlock()
goroutine := goroutineNum()
last, ok := opts.last[process{tag, goroutine}]
if !ok {
last = time.Now()
}
current := time.Now()
opts.last[process{tag, goroutine}] = current
if f[len(f)-1] != '\n' { if f[len(f)-1] != '\n' {
f += "\n" f += "\n"
} }
@ -138,8 +153,8 @@ func Log(tag string, f string, args ...interface{}) {
maxTagLen = len(tag) maxTagLen = len(tag)
} }
formatStringTag := "[%" + strconv.FormatInt(int64(maxTagLen), 10) + "s]" formatStringTag := "%2.3f [%" + strconv.FormatInt(int64(maxTagLen), 10) + "s]"
formatString := fmt.Sprintf(formatStringTag+" %s %s", tag, getPosition(), f) formatString := fmt.Sprintf(formatStringTag+" %s %s", current.Sub(last).Seconds(), tag, getPosition(goroutine), f)
dbgprint := func() { dbgprint := func() {
fmt.Fprintf(os.Stderr, formatString, args...) fmt.Fprintf(os.Stderr, formatString, args...)

View file

@ -175,6 +175,28 @@ func TestLoadJSONPack(t *testing.T) {
OK(t, err) OK(t, err)
} }
func BenchmarkLoadJSONPack(t *testing.B) {
repo := SetupRepo()
defer TeardownRepo(repo)
if BenchArchiveDirectory == "" {
t.Skip("benchdir not set, skipping")
}
// archive a few files
sn := SnapshotDir(t, repo, BenchArchiveDirectory, nil)
OK(t, repo.Flush())
tree := restic.NewTree()
t.ResetTimer()
for i := 0; i < t.N; i++ {
err := repo.LoadJSONPack(pack.Tree, *sn.Tree, &tree)
OK(t, err)
}
}
func TestLoadJSONUnpacked(t *testing.T) { func TestLoadJSONUnpacked(t *testing.T) {
repo := SetupRepo() repo := SetupRepo()
defer TeardownRepo(repo) defer TeardownRepo(repo)

Binary file not shown.

BIN
testdata/walktree-test-repo.tar.gz vendored Normal file

Binary file not shown.

View file

@ -8,7 +8,6 @@ import (
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/debug" "github.com/restic/restic/debug"
"github.com/restic/restic/pack" "github.com/restic/restic/pack"
"github.com/restic/restic/repository"
) )
type Tree struct { type Tree struct {
@ -30,7 +29,11 @@ func (t Tree) String() string {
return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes)) return fmt.Sprintf("Tree<%d nodes>", len(t.Nodes))
} }
func LoadTree(repo *repository.Repository, id backend.ID) (*Tree, error) { type TreeLoader interface {
LoadJSONPack(pack.BlobType, backend.ID, interface{}) error
}
func LoadTree(repo TreeLoader, id backend.ID) (*Tree, error) {
tree := &Tree{} tree := &Tree{}
err := repo.LoadJSONPack(pack.Tree, id, tree) err := repo.LoadJSONPack(pack.Tree, id, tree)
if err != nil { if err != nil {

165
walk.go
View file

@ -2,12 +2,14 @@ package restic
import ( import (
"path/filepath" "path/filepath"
"sync"
"github.com/restic/restic/backend" "github.com/restic/restic/backend"
"github.com/restic/restic/debug" "github.com/restic/restic/debug"
"github.com/restic/restic/repository" "github.com/restic/restic/pack"
) )
// WalkTreeJob is a job sent from the tree walker.
type WalkTreeJob struct { type WalkTreeJob struct {
Path string Path string
Error error Error error
@ -16,47 +18,168 @@ type WalkTreeJob struct {
Tree *Tree Tree *Tree
} }
func walkTree(repo *repository.Repository, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { // TreeWalker traverses a tree in the repository depth-first and sends a job
debug.Log("walkTree", "start on %q (%v)", path, treeID.Str()) // for each item (file or dir) that it encounters.
type TreeWalker struct {
ch chan<- loadTreeJob
out chan<- WalkTreeJob
}
t, err := LoadTree(repo, treeID) // NewTreeWalker uses ch to load trees from the repository and sends jobs to
if err != nil { // out.
func NewTreeWalker(ch chan<- loadTreeJob, out chan<- WalkTreeJob) *TreeWalker {
return &TreeWalker{ch: ch, out: out}
}
// Walk starts walking the tree given by id. When the channel done is closed,
// processing stops.
func (tw *TreeWalker) Walk(path string, id backend.ID, done chan struct{}) {
debug.Log("TreeWalker.Walk", "starting on tree %v for %v", id.Str(), path)
defer debug.Log("TreeWalker.Walk", "done walking tree %v for %v", id.Str(), path)
resCh := make(chan loadTreeResult, 1)
tw.ch <- loadTreeJob{
id: id,
res: resCh,
}
res := <-resCh
if res.err != nil {
select { select {
case jobCh <- WalkTreeJob{Path: path, Error: err}: case tw.out <- WalkTreeJob{Path: path, Error: res.err}:
case <-done: case <-done:
return return
} }
return return
} }
for _, node := range t.Nodes { tw.walk(path, res.tree, done)
p := filepath.Join(path, node.Name)
select {
case tw.out <- WalkTreeJob{Path: path, Tree: res.tree}:
case <-done:
return
}
}
func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) {
debug.Log("TreeWalker.walk", "start on %q", path)
defer debug.Log("TreeWalker.walk", "done for %q", path)
// load all subtrees in parallel
results := make([]<-chan loadTreeResult, len(tree.Nodes))
for i, node := range tree.Nodes {
if node.Type == "dir" { if node.Type == "dir" {
walkTree(repo, p, *node.Subtree, done, jobCh) resCh := make(chan loadTreeResult, 1)
tw.ch <- loadTreeJob{
id: *node.Subtree,
res: resCh,
}
results[i] = resCh
}
}
for i, node := range tree.Nodes {
p := filepath.Join(path, node.Name)
var job WalkTreeJob
if node.Type == "dir" {
if results[i] == nil {
panic("result chan should not be nil")
}
res := <-results[i]
tw.walk(p, res.tree, done)
job = WalkTreeJob{Path: p, Tree: res.tree, Error: res.err}
} else { } else {
job = WalkTreeJob{Path: p, Node: node}
}
select {
case tw.out <- job:
case <-done:
return
}
}
}
type loadTreeResult struct {
tree *Tree
err error
}
type loadTreeJob struct {
id backend.ID
res chan<- loadTreeResult
}
type treeLoader func(backend.ID) (*Tree, error)
func loadTreeWorker(wg *sync.WaitGroup, in <-chan loadTreeJob, load treeLoader, done <-chan struct{}) {
debug.Log("loadTreeWorker", "start")
defer debug.Log("loadTreeWorker", "exit")
defer wg.Done()
for {
select {
case <-done:
debug.Log("loadTreeWorker", "done channel closed")
return
case job, ok := <-in:
if !ok {
debug.Log("loadTreeWorker", "input channel closed, exiting")
return
}
debug.Log("loadTreeWorker", "received job to load tree %v", job.id.Str())
tree, err := load(job.id)
debug.Log("loadTreeWorker", "tree %v loaded, error %v", job.id.Str(), err)
select { select {
case jobCh <- WalkTreeJob{Path: p, Node: node}: case job.res <- loadTreeResult{tree, err}:
debug.Log("loadTreeWorker", "job result sent")
case <-done: case <-done:
debug.Log("loadTreeWorker", "done channel closed before result could be sent")
return return
} }
} }
} }
select {
case jobCh <- WalkTreeJob{Path: path, Tree: t}:
case <-done:
return
}
debug.Log("walkTree", "done for %q (%v)", path, treeID.Str())
} }
const loadTreeWorkers = 10
// WalkTree walks the tree specified by id recursively and sends a job for each // WalkTree walks the tree specified by id recursively and sends a job for each
// file and directory it finds. When the channel done is closed, processing // file and directory it finds. When the channel done is closed, processing
// stops. // stops.
func WalkTree(repo *repository.Repository, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { func WalkTree(repo TreeLoader, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) {
debug.Log("WalkTree", "start on %v", id.Str()) debug.Log("WalkTree", "start on %v, start workers", id.Str())
walkTree(repo, "", id, done, jobCh)
load := func(id backend.ID) (*Tree, error) {
tree := &Tree{}
err := repo.LoadJSONPack(pack.Tree, id, tree)
if err != nil {
return nil, err
}
return tree, nil
}
ch := make(chan loadTreeJob)
var wg sync.WaitGroup
for i := 0; i < loadTreeWorkers; i++ {
wg.Add(1)
go loadTreeWorker(&wg, ch, load, done)
}
tw := NewTreeWalker(ch, jobCh)
tw.Walk("", id, done)
close(jobCh) close(jobCh)
close(ch)
wg.Wait()
debug.Log("WalkTree", "done") debug.Log("WalkTree", "done")
} }

File diff suppressed because it is too large Load diff