restic/pipe/pipe_test.go

391 lines
7.0 KiB
Go
Raw Normal View History

2015-02-15 11:57:09 +00:00
package pipe_test
import (
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/restic/restic/pipe"
2015-04-09 19:15:48 +00:00
. "github.com/restic/restic/test"
2015-02-15 11:57:09 +00:00
)
func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
type stats struct {
dirs, files int
}
func statPath(path string) (stats, error) {
var s stats
// count files and directories with filepath.Walk()
2015-06-28 11:15:35 +00:00
err := filepath.Walk(TestWalkerPath, func(p string, fi os.FileInfo, err error) error {
2015-02-15 11:57:09 +00:00
if fi == nil {
return err
}
if fi.IsDir() {
s.dirs++
2015-02-15 13:44:54 +00:00
} else {
2015-02-15 11:57:09 +00:00
s.files++
}
return err
})
return s, err
}
2015-06-28 11:15:35 +00:00
const maxWorkers = 100
2015-03-02 13:48:47 +00:00
func TestPipelineWalkerWithSplit(t *testing.T) {
2015-06-28 11:15:35 +00:00
if TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
2015-02-15 11:57:09 +00:00
}
2015-06-28 11:15:35 +00:00
before, err := statPath(TestWalkerPath)
2015-04-09 19:15:48 +00:00
OK(t, err)
2015-02-15 11:57:09 +00:00
2015-06-28 11:15:35 +00:00
t.Logf("walking path %s with %d dirs, %d files", TestWalkerPath,
2015-02-15 11:57:09 +00:00
before.dirs, before.files)
// account for top level dir
before.dirs++
2015-02-15 11:57:09 +00:00
after := stats{}
m := sync.Mutex{}
2015-02-15 13:44:54 +00:00
worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) {
2015-02-15 11:57:09 +00:00
defer wg.Done()
for {
select {
case e, ok := <-entCh:
if !ok {
// channel is closed
return
}
m.Lock()
after.files++
m.Unlock()
2015-03-07 10:53:32 +00:00
e.Result() <- true
2015-02-15 11:57:09 +00:00
case dir, ok := <-dirCh:
if !ok {
// channel is closed
return
}
// wait for all content
for _, ch := range dir.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
2015-03-07 10:53:32 +00:00
dir.Result() <- true
2015-02-15 11:57:09 +00:00
case <-done:
// pipeline was cancelled
return
}
}
}
2015-02-15 13:44:54 +00:00
var wg sync.WaitGroup
2015-02-15 11:57:09 +00:00
done := make(chan struct{})
entCh := make(chan pipe.Entry)
dirCh := make(chan pipe.Dir)
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
2015-02-15 11:57:09 +00:00
wg.Add(1)
2015-02-15 13:44:54 +00:00
go worker(&wg, done, entCh, dirCh)
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job, 200)
2015-03-02 13:48:47 +00:00
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2015-06-28 11:15:35 +00:00
err = pipe.Walk([]string{TestWalkerPath}, done, jobs, resCh)
2015-04-09 19:15:48 +00:00
OK(t, err)
2015-03-02 13:48:47 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
2015-06-28 11:15:35 +00:00
t.Logf("walked path %s with %d dirs, %d files", TestWalkerPath,
2015-03-02 13:48:47 +00:00
after.dirs, after.files)
2015-04-09 19:15:48 +00:00
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
2015-03-02 13:48:47 +00:00
}
func TestPipelineWalker(t *testing.T) {
2015-06-28 11:15:35 +00:00
if TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
2015-06-28 11:15:35 +00:00
before, err := statPath(TestWalkerPath)
2015-04-09 19:15:48 +00:00
OK(t, err)
2015-03-02 13:48:47 +00:00
2015-06-28 11:15:35 +00:00
t.Logf("walking path %s with %d dirs, %d files", TestWalkerPath,
2015-03-02 13:48:47 +00:00
before.dirs, before.files)
// account for top level dir
before.dirs++
2015-03-02 13:48:47 +00:00
after := stats{}
m := sync.Mutex{}
2015-03-07 10:53:32 +00:00
worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) {
2015-03-02 13:48:47 +00:00
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
// channel is closed
return
}
2015-04-09 19:15:48 +00:00
Assert(t, job != nil, "job is nil")
2015-03-02 13:48:47 +00:00
switch j := job.(type) {
case pipe.Dir:
// wait for all content
for _, ch := range j.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
2015-03-07 10:53:32 +00:00
j.Result() <- true
2015-03-02 13:48:47 +00:00
case pipe.Entry:
m.Lock()
after.files++
m.Unlock()
2015-03-07 10:53:32 +00:00
j.Result() <- true
2015-03-02 13:48:47 +00:00
}
case <-done:
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
done := make(chan struct{})
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job)
2015-03-02 13:48:47 +00:00
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
2015-03-02 13:48:47 +00:00
wg.Add(1)
go worker(&wg, done, jobs)
}
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2015-06-28 11:15:35 +00:00
err = pipe.Walk([]string{TestWalkerPath}, done, jobs, resCh)
2015-04-09 19:15:48 +00:00
OK(t, err)
2015-02-15 11:57:09 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
2015-06-28 11:15:35 +00:00
t.Logf("walked path %s with %d dirs, %d files", TestWalkerPath,
2015-02-15 11:57:09 +00:00
after.dirs, after.files)
2015-04-09 19:15:48 +00:00
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
2015-02-15 11:57:09 +00:00
}
func BenchmarkPipelineWalker(b *testing.B) {
2015-06-28 11:15:35 +00:00
if TestWalkerPath == "" {
2015-03-02 13:48:47 +00:00
b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
2015-02-15 11:57:09 +00:00
}
var max time.Duration
m := sync.Mutex{}
2015-02-15 13:44:54 +00:00
fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Entry) {
2015-02-15 11:57:09 +00:00
defer wg.Done()
for {
select {
2015-02-15 13:44:54 +00:00
case e, ok := <-ch:
2015-02-15 11:57:09 +00:00
if !ok {
// channel is closed
return
}
// simulate backup
2015-02-15 13:44:54 +00:00
//time.Sleep(10 * time.Millisecond)
2015-02-15 11:57:09 +00:00
2015-03-07 10:53:32 +00:00
e.Result() <- true
2015-02-15 13:44:54 +00:00
case <-done:
// pipeline was cancelled
return
}
}
}
2015-02-15 11:57:09 +00:00
2015-02-15 13:44:54 +00:00
dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Dir) {
defer wg.Done()
for {
select {
case dir, ok := <-ch:
2015-02-15 11:57:09 +00:00
if !ok {
// channel is closed
return
}
start := time.Now()
// wait for all content
for _, ch := range dir.Entries {
<-ch
}
d := time.Since(start)
m.Lock()
if d > max {
max = d
}
m.Unlock()
2015-03-07 10:53:32 +00:00
dir.Result() <- true
2015-02-15 11:57:09 +00:00
case <-done:
// pipeline was cancelled
return
}
}
}
for i := 0; i < b.N; i++ {
2015-02-15 13:44:54 +00:00
max = 0
2015-02-15 11:57:09 +00:00
done := make(chan struct{})
2015-02-15 13:44:54 +00:00
entCh := make(chan pipe.Entry, 200)
dirCh := make(chan pipe.Dir, 200)
2015-02-15 11:57:09 +00:00
var wg sync.WaitGroup
2015-06-28 11:15:35 +00:00
b.Logf("starting %d workers", maxWorkers)
for i := 0; i < maxWorkers; i++ {
2015-02-15 13:44:54 +00:00
wg.Add(2)
go dirWorker(&wg, done, dirCh)
go fileWorker(&wg, done, entCh)
2015-02-15 11:57:09 +00:00
}
2015-03-07 10:53:32 +00:00
jobs := make(chan pipe.Job, 200)
2015-03-02 13:48:47 +00:00
wg.Add(1)
go func() {
pipe.Split(jobs, dirCh, entCh)
close(entCh)
close(dirCh)
wg.Done()
}()
2015-03-07 10:53:32 +00:00
resCh := make(chan pipe.Result, 1)
2015-06-28 11:15:35 +00:00
err := pipe.Walk([]string{TestWalkerPath}, done, jobs, resCh)
2015-04-09 19:15:48 +00:00
OK(b, err)
2015-02-15 11:57:09 +00:00
// wait for all workers to terminate
wg.Wait()
// wait for final result
<-resCh
2015-02-15 13:44:54 +00:00
b.Logf("max duration for a dir: %v", max)
}
2015-02-15 11:57:09 +00:00
}
func TestPipelineWalkerMultiple(t *testing.T) {
2015-06-28 11:15:35 +00:00
if TestWalkerPath == "" {
t.Skipf("walkerpath not set, skipping TestPipelineWalker")
}
2015-06-28 11:15:35 +00:00
paths, err := filepath.Glob(filepath.Join(TestWalkerPath, "*"))
2015-06-28 11:15:35 +00:00
before, err := statPath(TestWalkerPath)
2015-04-09 19:15:48 +00:00
OK(t, err)
t.Logf("walking paths %v with %d dirs, %d files", paths,
before.dirs, before.files)
after := stats{}
m := sync.Mutex{}
worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) {
defer wg.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
// channel is closed
return
}
2015-04-09 19:15:48 +00:00
Assert(t, job != nil, "job is nil")
switch j := job.(type) {
case pipe.Dir:
// wait for all content
for _, ch := range j.Entries {
<-ch
}
m.Lock()
after.dirs++
m.Unlock()
j.Result() <- true
case pipe.Entry:
m.Lock()
after.files++
m.Unlock()
j.Result() <- true
}
case <-done:
// pipeline was cancelled
return
}
}
}
var wg sync.WaitGroup
done := make(chan struct{})
jobs := make(chan pipe.Job)
2015-06-28 11:15:35 +00:00
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go worker(&wg, done, jobs)
}
resCh := make(chan pipe.Result, 1)
err = pipe.Walk(paths, done, jobs, resCh)
2015-04-09 19:15:48 +00:00
OK(t, err)
// wait for all workers to terminate
wg.Wait()
// wait for top-level blob
<-resCh
t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files)
2015-04-09 19:15:48 +00:00
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}