Merge branch 'refactor-stats'

This commit is contained in:
Alexander Neumann 2014-11-23 13:52:25 +01:00
commit f7b5c00fdb
2 changed files with 128 additions and 38 deletions

View File

@ -6,6 +6,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"github.com/fd0/khepri/backend" "github.com/fd0/khepri/backend"
"github.com/fd0/khepri/chunker" "github.com/fd0/khepri/chunker"
@ -15,6 +16,8 @@ import (
const ( const (
maxConcurrentFiles = 32 maxConcurrentFiles = 32
maxConcurrentBlobs = 32 maxConcurrentBlobs = 32
statTimeout = 20 * time.Millisecond
) )
type Archiver struct { type Archiver struct {
@ -32,10 +35,11 @@ type Archiver struct {
Error func(dir string, fi os.FileInfo, err error) error Error func(dir string, fi os.FileInfo, err error) error
Filter func(item string, fi os.FileInfo) bool Filter func(item string, fi os.FileInfo) bool
ScannerUpdate func(stats Stats) ScannerStats chan Stats
SaveUpdate func(stats Stats) SaveStats chan Stats
sum sync.Mutex // for SaveUpdate statsMutex sync.Mutex
updateStats Stats
} }
type Stats struct { type Stats struct {
@ -45,6 +49,13 @@ type Stats struct {
Bytes uint64 Bytes uint64
} }
func (s *Stats) Add(other Stats) {
s.Bytes += other.Bytes
s.Directories += other.Directories
s.Files += other.Files
s.Other += other.Other
}
func NewArchiver(be backend.Server, key *Key) (*Archiver, error) { func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
var err error var err error
arch := &Archiver{ arch := &Archiver{
@ -67,8 +78,6 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
arch.Error = func(string, os.FileInfo, error) error { return err } arch.Error = func(string, os.FileInfo, error) error { return err }
// allow all files // allow all files
arch.Filter = func(string, os.FileInfo) bool { return true } arch.Filter = func(string, os.FileInfo) bool { return true }
// do nothing
arch.ScannerUpdate = func(Stats) {}
arch.bl = NewBlobList() arch.bl = NewBlobList()
arch.ch, err = NewContentHandler(be, key) arch.ch, err = NewContentHandler(be, key)
@ -85,11 +94,31 @@ func NewArchiver(be backend.Server, key *Key) (*Archiver, error) {
return arch, nil return arch, nil
} }
func (arch *Archiver) saveUpdate(stats Stats) { func (arch *Archiver) update(ch chan Stats, stats Stats) {
if arch.SaveUpdate != nil { if ch == nil {
arch.sum.Lock() return
defer arch.sum.Unlock() }
arch.SaveUpdate(stats)
// load old stats from global state
arch.statsMutex.Lock()
stats.Add(arch.updateStats)
arch.updateStats = Stats{}
arch.statsMutex.Unlock()
// try to send stats through the channel, with a timeout
timeout := time.After(statTimeout)
select {
case ch <- stats:
break
case _ = <-timeout:
// save cumulated stats to global state
arch.statsMutex.Lock()
arch.updateStats.Add(stats)
arch.statsMutex.Unlock()
break
} }
} }
@ -140,7 +169,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
return err return err
} }
arch.saveUpdate(Stats{Bytes: blob.Size}) arch.update(arch.SaveStats, Stats{Bytes: blob.Size})
blobs = Blobs{blob} blobs = Blobs{blob}
} else { } else {
@ -169,7 +198,7 @@ func (arch *Archiver) SaveFile(node *Node) error {
panic(err) panic(err)
} }
arch.saveUpdate(Stats{Bytes: blob.Size}) arch.update(arch.SaveStats, Stats{Bytes: blob.Size})
arch.blobToken <- token arch.blobToken <- token
ch <- blob ch <- blob
}(resCh) }(resCh)
@ -240,12 +269,15 @@ func (arch *Archiver) loadTree(dir string) (*Tree, error) {
} }
} }
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &tree, nil return &tree, nil
} }
func (arch *Archiver) LoadTree(path string) (*Tree, error) { func (arch *Archiver) LoadTree(path string) (*Tree, error) {
// reset global stats
arch.updateStats = Stats{}
fi, err := os.Lstat(path) fi, err := os.Lstat(path)
if err != nil { if err != nil {
return nil, arrar.Annotatef(err, "Lstat(%q)", path) return nil, arrar.Annotatef(err, "Lstat(%q)", path)
@ -259,7 +291,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
if node.Type != "dir" { if node.Type != "dir" {
arch.Stats.Files = 1 arch.Stats.Files = 1
arch.Stats.Bytes = node.Size arch.Stats.Bytes = node.Size
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil return &Tree{node}, nil
} }
@ -269,7 +301,7 @@ func (arch *Archiver) LoadTree(path string) (*Tree, error) {
return nil, arrar.Annotate(err, "loadTree()") return nil, arrar.Annotate(err, "loadTree()")
} }
arch.ScannerUpdate(arch.Stats) arch.update(arch.ScannerStats, arch.Stats)
return &Tree{node}, nil return &Tree{node}, nil
} }
@ -284,7 +316,7 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
return Blob{}, err return Blob{}, err
} }
node.Subtree = b.ID node.Subtree = b.ID
arch.saveUpdate(Stats{Directories: 1}) arch.update(arch.SaveStats, Stats{Directories: 1})
} else if node.Type == "file" && len(node.Content) == 0 { } else if node.Type == "file" && len(node.Content) == 0 {
// start goroutine // start goroutine
wg.Add(1) wg.Add(1)
@ -299,10 +331,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
// TODO: handle error // TODO: handle error
arch.SaveFile(n) arch.SaveFile(n)
arch.saveUpdate(Stats{Files: 1}) arch.update(arch.SaveStats, Stats{Files: 1})
}(node) }(node)
} else { } else {
arch.saveUpdate(Stats{Other: 1}) arch.update(arch.SaveStats, Stats{Other: 1})
} }
} }
@ -317,6 +349,9 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) {
} }
func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) { func (arch *Archiver) Snapshot(dir string, t *Tree) (*Snapshot, backend.ID, error) {
// reset global stats
arch.updateStats = Stats{}
sn := NewSnapshot(dir) sn := NewSnapshot(dir)
blob, err := arch.saveTree(t) blob, err := arch.saveTree(t)

View File

@ -17,18 +17,30 @@ func format_bytes(c uint64) string {
switch { switch {
case c > 1<<40: case c > 1<<40:
return fmt.Sprintf("%.3f TiB", b/(1<<40)) return fmt.Sprintf("%.3fTiB", b/(1<<40))
case c > 1<<30: case c > 1<<30:
return fmt.Sprintf("%.3f GiB", b/(1<<30)) return fmt.Sprintf("%.3fGiB", b/(1<<30))
case c > 1<<20: case c > 1<<20:
return fmt.Sprintf("%.3f MiB", b/(1<<20)) return fmt.Sprintf("%.3fMiB", b/(1<<20))
case c > 1<<10: case c > 1<<10:
return fmt.Sprintf("%.3f KiB", b/(1<<10)) return fmt.Sprintf("%.3fKiB", b/(1<<10))
default: default:
return fmt.Sprintf("%d B", c) return fmt.Sprintf("%dB", c)
} }
} }
func format_duration(sec uint64) string {
hours := sec / 3600
sec -= hours * 3600
min := sec / 60
sec -= min * 60
if hours > 0 {
return fmt.Sprintf("%d:%02d:%02d", hours, min, sec)
}
return fmt.Sprintf("%d:%02d", min, sec)
}
func print_tree2(indent int, t *khepri.Tree) { func print_tree2(indent int, t *khepri.Tree) {
for _, node := range *t { for _, node := range *t {
if node.Tree != nil { if node.Tree != nil {
@ -60,11 +72,18 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
fmt.Printf("scanning %s\n", target) fmt.Printf("scanning %s\n", target)
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.ScannerUpdate = func(stats khepri.Stats) { ch := make(chan khepri.Stats, 20)
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes)) arch.ScannerStats = ch
}
go func(ch <-chan khepri.Stats) {
for stats := range ch {
fmt.Printf("\r%6d directories, %6d files, %14s", stats.Directories, stats.Files, format_bytes(stats.Bytes))
}
}(ch)
} }
fmt.Printf("done\n")
// TODO: add filter // TODO: add filter
// arch.Filter = func(dir string, fi os.FileInfo) bool { // arch.Filter = func(dir string, fi os.FileInfo) bool {
// return true // return true
@ -79,27 +98,63 @@ func commandBackup(be backend.Server, key *khepri.Key, args []string) error {
fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes)) fmt.Printf("\r%6d directories, %6d files, %14s\n", arch.Stats.Directories, arch.Stats.Files, format_bytes(arch.Stats.Bytes))
stats := khepri.Stats{} stats := khepri.Stats{}
start := time.Now()
if terminal.IsTerminal(int(os.Stdout.Fd())) { if terminal.IsTerminal(int(os.Stdout.Fd())) {
arch.SaveUpdate = func(s khepri.Stats) { ch := make(chan khepri.Stats, 20)
stats.Files += s.Files arch.SaveStats = ch
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
fmt.Printf("\r%3.2f%% %d/%d directories, %d/%d files, %s/%s", ticker := time.NewTicker(time.Second)
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100, var eta, bps uint64
stats.Directories, arch.Stats.Directories,
stats.Files, arch.Stats.Files, go func(ch <-chan khepri.Stats) {
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes))
} status := func(d time.Duration) {
fmt.Printf("\r[%s] %3.2f%% %s/s %s / %s ETA %s",
format_duration(uint64(d/time.Second)),
float64(stats.Bytes)/float64(arch.Stats.Bytes)*100,
format_bytes(bps),
format_bytes(stats.Bytes), format_bytes(arch.Stats.Bytes),
format_duration(eta))
}
defer ticker.Stop()
for {
select {
case s, ok := <-ch:
if !ok {
return
}
stats.Files += s.Files
stats.Directories += s.Directories
stats.Other += s.Other
stats.Bytes += s.Bytes
status(time.Since(start))
case <-ticker.C:
d := time.Since(start)
bps = stats.Bytes * uint64(time.Second) / uint64(d)
if bps > 0 {
eta = (arch.Stats.Bytes - stats.Bytes) / bps
}
status(d)
}
}
}(ch)
} }
start := time.Now()
sn, id, err := arch.Snapshot(target, t) sn, id, err := arch.Snapshot(target, t)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err) fmt.Fprintf(os.Stderr, "error: %v\n", err)
} }
if terminal.IsTerminal(int(os.Stdout.Fd())) {
// close channels so that the goroutines terminate
close(arch.SaveStats)
close(arch.ScannerStats)
}
fmt.Printf("\nsnapshot %s saved: %v\n", id, sn) fmt.Printf("\nsnapshot %s saved: %v\n", id, sn)
duration := time.Now().Sub(start) duration := time.Now().Sub(start)
fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20)) fmt.Printf("duration: %s, %.2fMiB/s\n", duration, float64(arch.Stats.Bytes)/float64(duration/time.Second)/(1<<20))