restic/pipe/pipe.go

195 lines
3.7 KiB
Go

package pipe
import (
"errors"
"fmt"
"os"
"path/filepath"
"sort"
"github.com/restic/restic/debug"
)
type Entry struct {
Path string
Info os.FileInfo
Error error
Result chan<- interface{}
}
type Dir struct {
Path string
Error error
Info os.FileInfo
Entries [](<-chan interface{})
Result chan<- interface{}
}
// readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries.
// taken from filepath/path.go
func readDirNames(dirname string) ([]string, error) {
f, err := os.Open(dirname)
if err != nil {
return nil, err
}
names, err := f.Readdirnames(-1)
f.Close()
if err != nil {
return nil, err
}
sort.Strings(names)
return names, nil
}
func isDir(fi os.FileInfo) bool {
return fi.IsDir()
}
func isFile(fi os.FileInfo) bool {
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
}
var errCancelled = errors.New("walk cancelled")
func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- interface{}) error {
info, err := os.Lstat(path)
if err != nil {
return err
}
if !info.IsDir() {
select {
case jobs <- Entry{Info: info, Path: path, Result: res}:
case <-done:
return errCancelled
}
return nil
}
names, err := readDirNames(path)
if err != nil {
return err
}
entries := make([]<-chan interface{}, 0, len(names))
for _, name := range names {
subpath := filepath.Join(path, name)
ch := make(chan interface{}, 1)
entries = append(entries, ch)
fi, err := os.Lstat(subpath)
if err != nil {
select {
case jobs <- Entry{Info: fi, Error: err, Result: ch}:
case <-done:
return errCancelled
}
continue
}
if isDir(fi) {
err = walk(subpath, done, jobs, ch)
if err != nil {
return err
}
} else {
select {
case jobs <- Entry{Info: fi, Path: subpath, Result: ch}:
case <-done:
return errCancelled
}
}
}
select {
case jobs <- Dir{Path: path, Info: info, Entries: entries, Result: res}:
case <-done:
return errCancelled
}
return nil
}
// Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops.
func Walk(paths []string, done chan struct{}, jobs chan<- interface{}) (<-chan interface{}, error) {
resCh := make(chan interface{}, 1)
defer func() {
close(resCh)
close(jobs)
debug.Log("pipe.Walk", "output channel closed")
}()
entries := make([]<-chan interface{}, 0, len(paths))
for _, path := range paths {
debug.Log("pipe.Walk", "start walker for %v", path)
ch := make(chan interface{}, 1)
entries = append(entries, ch)
err := walk(path, done, jobs, ch)
if err != nil {
return nil, err
}
debug.Log("pipe.Walk", "walker for %v done", path)
}
resCh <- Dir{Entries: entries}
return resCh, nil
}
// Split feeds all elements read from inChan to dirChan and entChan.
func Split(inChan <-chan interface{}, dirChan chan<- Dir, entChan chan<- Entry) {
debug.Log("pipe.Split", "start")
defer debug.Log("pipe.Split", "done")
inCh := inChan
dirCh := dirChan
entCh := entChan
var (
dir Dir
ent Entry
)
// deactivate sending until we received at least one job
dirCh = nil
entCh = nil
for {
select {
case job, ok := <-inCh:
if !ok {
// channel is closed
return
}
if job == nil {
panic("nil job received")
}
// disable receiving until the current job has been sent
inCh = nil
switch j := job.(type) {
case Dir:
dir = j
dirCh = dirChan
case Entry:
ent = j
entCh = entChan
default:
panic(fmt.Sprintf("unknown job type %v", j))
}
case dirCh <- dir:
// disable sending, re-enable receiving
dirCh = nil
inCh = inChan
case entCh <- ent:
// disable sending, re-enable receiving
entCh = nil
inCh = inChan
}
}
}