1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-02-20 21:27:21 +00:00

Merge pull request #3977 from greatroar/progress

ui/backup: Replace channels with a mutex
This commit is contained in:
Michael Eischer 2022-10-29 21:33:04 +02:00 committed by GitHub
commit 3b0bb02a68
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 196 additions and 175 deletions

View file

@ -467,11 +467,11 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
} else { } else {
progressPrinter = backup.NewTextProgress(term, gopts.verbosity) progressPrinter = backup.NewTextProgress(term, gopts.verbosity)
} }
progressReporter := backup.NewProgress(progressPrinter) progressReporter := backup.NewProgress(progressPrinter,
calculateProgressInterval(!gopts.Quiet, gopts.JSON))
if opts.DryRun { if opts.DryRun {
repo.SetDryRun() repo.SetDryRun()
progressReporter.SetDryRun()
} }
// use the terminal for stdout/stderr // use the terminal for stdout/stderr
@ -481,12 +481,10 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
}() }()
gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr() gopts.stdout, gopts.stderr = progressPrinter.Stdout(), progressPrinter.Stderr()
progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet, gopts.JSON))
wg, wgCtx := errgroup.WithContext(ctx) wg, wgCtx := errgroup.WithContext(ctx)
cancelCtx, cancel := context.WithCancel(wgCtx) cancelCtx, cancel := context.WithCancel(wgCtx)
defer cancel() defer cancel()
wg.Go(func() error { return progressReporter.Run(cancelCtx) }) wg.Go(func() error { progressReporter.Run(cancelCtx); return nil })
if !gopts.JSON { if !gopts.JSON {
progressPrinter.V("lock repository") progressPrinter.V("lock repository")
@ -588,7 +586,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
sc := archiver.NewScanner(targetFS) sc := archiver.NewScanner(targetFS)
sc.SelectByName = selectByNameFilter sc.SelectByName = selectByNameFilter
sc.Select = selectFilter sc.Select = selectFilter
sc.Error = progressReporter.ScannerError sc.Error = progressPrinter.ScannerError
sc.Result = progressReporter.ReportTotal sc.Result = progressReporter.ReportTotal
if !gopts.JSON { if !gopts.JSON {
@ -643,7 +641,7 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
} }
// Report finished execution // Report finished execution
progressReporter.Finish(id) progressReporter.Finish(id, opts.DryRun)
if !gopts.JSON && !opts.DryRun { if !gopts.JSON && !opts.DryRun {
progressPrinter.P("snapshot %s saved\n", id.Str()) progressPrinter.P("snapshot %s saved\n", id.Str())
} }

View file

@ -11,6 +11,8 @@ import (
"github.com/restic/restic/internal/ui/signals" "github.com/restic/restic/internal/ui/signals"
) )
// A ProgressPrinter can print various progress messages.
// It must be safe to call its methods from concurrent goroutines.
type ProgressPrinter interface { type ProgressPrinter interface {
Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64)
Error(item string, err error) error Error(item string, err error) error
@ -24,23 +26,15 @@ type ProgressPrinter interface {
Stdout() io.WriteCloser Stdout() io.WriteCloser
Stderr() io.WriteCloser Stderr() io.WriteCloser
E(msg string, args ...interface{})
P(msg string, args ...interface{}) P(msg string, args ...interface{})
V(msg string, args ...interface{}) V(msg string, args ...interface{})
VV(msg string, args ...interface{})
} }
type Counter struct { type Counter struct {
Files, Dirs, Bytes uint64 Files, Dirs, Bytes uint64
} }
type fileWorkerMessage struct {
filename string
done bool
}
type Summary struct { type Summary struct {
sync.Mutex
Files, Dirs struct { Files, Dirs struct {
New uint New uint
Changed uint Changed uint
@ -52,37 +46,30 @@ type Summary struct {
// Progress reports progress for the `backup` command. // Progress reports progress for the `backup` command.
type Progress struct { type Progress struct {
MinUpdatePause time.Duration mu sync.Mutex
start time.Time interval time.Duration
dry bool start time.Time
totalCh chan Counter scanStarted, scanFinished bool
processedCh chan Counter
errCh chan struct{}
workerCh chan fileWorkerMessage
closed chan struct{}
summary *Summary currentFiles map[string]struct{}
processed, total Counter
errors uint
closed chan struct{}
summary Summary
printer ProgressPrinter printer ProgressPrinter
} }
func NewProgress(printer ProgressPrinter) *Progress { func NewProgress(printer ProgressPrinter, interval time.Duration) *Progress {
return &Progress{ return &Progress{
// limit to 60fps by default interval: interval,
MinUpdatePause: time.Second / 60, start: time.Now(),
start: time.Now(),
// use buffered channels for the information used to update the status currentFiles: make(map[string]struct{}),
// the shutdown of the `Run()` method is somewhat racy, but won't affect closed: make(chan struct{}),
// the final backup statistics
totalCh: make(chan Counter, 100),
processedCh: make(chan Counter, 100),
errCh: make(chan struct{}),
workerCh: make(chan fileWorkerMessage, 100),
closed: make(chan struct{}),
summary: &Summary{},
printer: printer, printer: printer,
} }
@ -90,111 +77,83 @@ func NewProgress(printer ProgressPrinter) *Progress {
// Run regularly updates the status lines. It should be called in a separate // Run regularly updates the status lines. It should be called in a separate
// goroutine. // goroutine.
func (p *Progress) Run(ctx context.Context) error { func (p *Progress) Run(ctx context.Context) {
var (
lastUpdate time.Time
total, processed Counter
errors uint
started bool
currentFiles = make(map[string]struct{})
secondsRemaining uint64
)
t := time.NewTicker(time.Second)
signalsCh := signals.GetProgressChannel()
defer t.Stop()
defer close(p.closed) defer close(p.closed)
// Reset status when finished // Reset status when finished
defer p.printer.Reset() defer p.printer.Reset()
var tick <-chan time.Time
if p.interval != 0 {
t := time.NewTicker(p.interval)
defer t.Stop()
tick = t.C
}
signalsCh := signals.GetProgressChannel()
for { for {
forceUpdate := false var now time.Time
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return
case t, ok := <-p.totalCh: case now = <-tick:
if ok {
total = t
started = true
} else {
// scan has finished
p.totalCh = nil
}
case s := <-p.processedCh:
processed.Files += s.Files
processed.Dirs += s.Dirs
processed.Bytes += s.Bytes
started = true
case <-p.errCh:
errors++
started = true
case m := <-p.workerCh:
if m.done {
delete(currentFiles, m.filename)
} else {
currentFiles[m.filename] = struct{}{}
}
case <-t.C:
if !started {
continue
}
if p.totalCh == nil {
secs := float64(time.Since(p.start) / time.Second)
todo := float64(total.Bytes - processed.Bytes)
secondsRemaining = uint64(secs / float64(processed.Bytes) * todo)
}
case <-signalsCh: case <-signalsCh:
forceUpdate = true now = time.Now()
} }
// limit update frequency p.mu.Lock()
if !forceUpdate && (time.Since(lastUpdate) < p.MinUpdatePause || p.MinUpdatePause == 0) { if p.scanStarted {
p.mu.Unlock()
continue continue
} }
lastUpdate = time.Now()
p.printer.Update(total, processed, errors, currentFiles, p.start, secondsRemaining) var secondsRemaining uint64
if p.scanFinished {
secs := float64(now.Sub(p.start) / time.Second)
todo := float64(p.total.Bytes - p.processed.Bytes)
secondsRemaining = uint64(secs / float64(p.processed.Bytes) * todo)
}
p.printer.Update(p.total, p.processed, p.errors, p.currentFiles, p.start, secondsRemaining)
p.mu.Unlock()
} }
} }
// ScannerError is the error callback function for the scanner, it prints the
// error in verbose mode and returns nil.
func (p *Progress) ScannerError(item string, err error) error {
return p.printer.ScannerError(item, err)
}
// Error is the error callback function for the archiver, it prints the error and returns nil. // Error is the error callback function for the archiver, it prints the error and returns nil.
func (p *Progress) Error(item string, err error) error { func (p *Progress) Error(item string, err error) error {
cbErr := p.printer.Error(item, err) p.mu.Lock()
p.errors++
p.scanStarted = true
p.mu.Unlock()
select { return p.printer.Error(item, err)
case p.errCh <- struct{}{}:
case <-p.closed:
}
return cbErr
} }
// StartFile is called when a file is being processed by a worker. // StartFile is called when a file is being processed by a worker.
func (p *Progress) StartFile(filename string) { func (p *Progress) StartFile(filename string) {
select { p.mu.Lock()
case p.workerCh <- fileWorkerMessage{filename: filename}: defer p.mu.Unlock()
case <-p.closed: p.currentFiles[filename] = struct{}{}
} }
func (p *Progress) addProcessed(c Counter) {
p.processed.Files += c.Files
p.processed.Dirs += c.Dirs
p.processed.Bytes += c.Bytes
p.scanStarted = true
} }
// CompleteBlob is called for all saved blobs for files. // CompleteBlob is called for all saved blobs for files.
func (p *Progress) CompleteBlob(bytes uint64) { func (p *Progress) CompleteBlob(bytes uint64) {
select { p.mu.Lock()
case p.processedCh <- Counter{Bytes: bytes}: p.addProcessed(Counter{Bytes: bytes})
case <-p.closed: p.mu.Unlock()
}
} }
// CompleteItem is the status callback function for the archiver when a // CompleteItem is the status callback function for the archiver when a
// file/dir has been saved successfully. // file/dir has been saved successfully.
func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) { func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
p.summary.Lock() p.mu.Lock()
p.summary.ItemStats.Add(s) p.summary.ItemStats.Add(s)
// for the last item "/", current is nil // for the last item "/", current is nil
@ -202,110 +161,87 @@ func (p *Progress) CompleteItem(item string, previous, current *restic.Node, s a
p.summary.ProcessedBytes += current.Size p.summary.ProcessedBytes += current.Size
} }
p.summary.Unlock() p.mu.Unlock()
if current == nil { if current == nil {
// error occurred, tell the status display to remove the line // error occurred, tell the status display to remove the line
select { p.mu.Lock()
case p.workerCh <- fileWorkerMessage{filename: item, done: true}: delete(p.currentFiles, item)
case <-p.closed: p.mu.Unlock()
}
return return
} }
switch current.Type { switch current.Type {
case "file":
select {
case p.processedCh <- Counter{Files: 1}:
case <-p.closed:
}
select {
case p.workerCh <- fileWorkerMessage{filename: item, done: true}:
case <-p.closed:
}
case "dir": case "dir":
select { p.mu.Lock()
case p.processedCh <- Counter{Dirs: 1}: p.addProcessed(Counter{Dirs: 1})
case <-p.closed: p.mu.Unlock()
}
}
if current.Type == "dir" { switch {
if previous == nil { case previous == nil:
p.printer.CompleteItem("dir new", item, previous, current, s, d) p.printer.CompleteItem("dir new", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Dirs.New++ p.summary.Dirs.New++
p.summary.Unlock() p.mu.Unlock()
return
}
if previous.Equals(*current) { case previous.Equals(*current):
p.printer.CompleteItem("dir unchanged", item, previous, current, s, d) p.printer.CompleteItem("dir unchanged", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Dirs.Unchanged++ p.summary.Dirs.Unchanged++
p.summary.Unlock() p.mu.Unlock()
} else {
default:
p.printer.CompleteItem("dir modified", item, previous, current, s, d) p.printer.CompleteItem("dir modified", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Dirs.Changed++ p.summary.Dirs.Changed++
p.summary.Unlock() p.mu.Unlock()
} }
} else if current.Type == "file" { case "file":
select { p.mu.Lock()
case p.workerCh <- fileWorkerMessage{done: true, filename: item}: p.addProcessed(Counter{Files: 1})
case <-p.closed: delete(p.currentFiles, item)
} p.mu.Unlock()
if previous == nil { switch {
case previous == nil:
p.printer.CompleteItem("file new", item, previous, current, s, d) p.printer.CompleteItem("file new", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Files.New++ p.summary.Files.New++
p.summary.Unlock() p.mu.Unlock()
return
}
if previous.Equals(*current) { case previous.Equals(*current):
p.printer.CompleteItem("file unchanged", item, previous, current, s, d) p.printer.CompleteItem("file unchanged", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Files.Unchanged++ p.summary.Files.Unchanged++
p.summary.Unlock() p.mu.Unlock()
} else {
default:
p.printer.CompleteItem("file modified", item, previous, current, s, d) p.printer.CompleteItem("file modified", item, previous, current, s, d)
p.summary.Lock() p.mu.Lock()
p.summary.Files.Changed++ p.summary.Files.Changed++
p.summary.Unlock() p.mu.Unlock()
} }
} }
} }
// ReportTotal sets the total stats up to now // ReportTotal sets the total stats up to now
func (p *Progress) ReportTotal(item string, s archiver.ScanStats) { func (p *Progress) ReportTotal(item string, s archiver.ScanStats) {
select { p.mu.Lock()
case p.totalCh <- Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}: defer p.mu.Unlock()
case <-p.closed:
} p.total = Counter{Files: uint64(s.Files), Dirs: uint64(s.Dirs), Bytes: s.Bytes}
if item == "" { if item == "" {
p.printer.ReportTotal(item, p.start, s) p.printer.ReportTotal(item, p.start, s)
close(p.totalCh) p.scanStarted = true
return return
} }
} }
// Finish prints the finishing messages. // Finish prints the finishing messages.
func (p *Progress) Finish(snapshotID restic.ID) { func (p *Progress) Finish(snapshotID restic.ID, dryrun bool) {
// wait for the status update goroutine to shut down // wait for the status update goroutine to shut down
<-p.closed <-p.closed
p.printer.Finish(snapshotID, p.start, p.summary, p.dry) p.printer.Finish(snapshotID, p.start, &p.summary, dryrun)
}
// SetMinUpdatePause sets b.MinUpdatePause.
func (p *Progress) SetMinUpdatePause(d time.Duration) {
p.MinUpdatePause = d
}
// SetDryRun marks the backup as a "dry run".
func (p *Progress) SetDryRun() {
p.dry = true
} }

View file

@ -0,0 +1,87 @@
package backup
import (
"context"
"io"
"sync"
"testing"
"time"
"github.com/restic/restic/internal/archiver"
"github.com/restic/restic/internal/restic"
)
type mockPrinter struct {
sync.Mutex
dirUnchanged, fileNew bool
id restic.ID
}
func (p *mockPrinter) Update(total, processed Counter, errors uint, currentFiles map[string]struct{}, start time.Time, secs uint64) {
}
func (p *mockPrinter) Error(item string, err error) error { return err }
func (p *mockPrinter) ScannerError(item string, err error) error { return err }
func (p *mockPrinter) CompleteItem(messageType string, item string, previous, current *restic.Node, s archiver.ItemStats, d time.Duration) {
p.Lock()
defer p.Unlock()
switch messageType {
case "dir unchanged":
p.dirUnchanged = true
case "file new":
p.fileNew = true
}
}
func (p *mockPrinter) ReportTotal(_ string, _ time.Time, _ archiver.ScanStats) {}
func (p *mockPrinter) Finish(id restic.ID, _ time.Time, summary *Summary, dryRun bool) {
p.Lock()
defer p.Unlock()
_ = *summary // Should not be nil.
p.id = id
}
func (p *mockPrinter) Reset() {}
func (p *mockPrinter) Stdout() io.WriteCloser { return nil }
func (p *mockPrinter) Stderr() io.WriteCloser { return nil }
func (p *mockPrinter) P(msg string, args ...interface{}) {}
func (p *mockPrinter) V(msg string, args ...interface{}) {}
func TestProgress(t *testing.T) {
t.Parallel()
prnt := &mockPrinter{}
prog := NewProgress(prnt, time.Millisecond)
ctx, cancel := context.WithCancel(context.Background())
go prog.Run(ctx)
prog.StartFile("foo")
prog.CompleteBlob(1024)
// "dir unchanged"
node := restic.Node{Type: "dir"}
prog.CompleteItem("foo", &node, &node, archiver.ItemStats{}, 0)
// "file new"
node.Type = "file"
prog.CompleteItem("foo", nil, &node, archiver.ItemStats{}, 0)
time.Sleep(10 * time.Millisecond)
cancel()
id := restic.NewRandomID()
prog.Finish(id, false)
if !prnt.dirUnchanged {
t.Error(`"dir unchanged" event not seen`)
}
if !prnt.fileNew {
t.Error(`"file new" event not seen`)
}
if prnt.id != id {
t.Errorf("id not stored (has %v)", prnt.id)
}
}