1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2024-12-23 16:26:11 +00:00

check: Cleanup tree loading and switch to use errgroup

The helper methods are now wired up in the Structure method.
This commit is contained in:
Michael Eischer 2020-11-07 00:23:45 +01:00 committed by Alexander Neumann
parent a4689eb3b9
commit 1d7bb01a6b

View file

@ -316,72 +316,24 @@ type treeJob struct {
// loadTreeWorker loads trees from repo and sends them to out. // loadTreeWorker loads trees from repo and sends them to out.
func loadTreeWorker(ctx context.Context, repo restic.Repository, func loadTreeWorker(ctx context.Context, repo restic.Repository,
in <-chan restic.ID, out chan<- treeJob, in <-chan restic.ID, out chan<- treeJob) {
wg *sync.WaitGroup) {
defer func() { for treeID := range in {
debug.Log("exiting") tree, err := repo.LoadTree(ctx, treeID)
wg.Done() debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err)
}() job := treeJob{ID: treeID, error: err, Tree: tree}
var (
inCh = in
outCh = out
job treeJob
)
outCh = nil
for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case out <- job:
case treeID, ok := <-inCh:
if !ok {
return
}
debug.Log("load tree %v", treeID)
tree, err := repo.LoadTree(ctx, treeID)
debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err)
job = treeJob{ID: treeID, error: err, Tree: tree}
outCh = out
inCh = nil
case outCh <- job:
debug.Log("sent tree %v", job.ID)
outCh = nil
inCh = in
} }
} }
} }
// checkTreeWorker checks the trees received and sends out errors to errChan. // checkTreeWorker checks the trees received and sends out errors to errChan.
func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error, wg *sync.WaitGroup) { func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error) {
defer func() { for job := range in {
debug.Log("exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
treeError TreeError
)
outCh = nil
for {
select {
case <-ctx.Done():
debug.Log("done channel closed, exiting")
return
case job, ok := <-inCh:
if !ok {
debug.Log("input channel closed, exiting")
return
}
debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error)
var errs []error var errs []error
@ -391,40 +343,29 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch
errs = c.checkTree(job.ID, job.Tree) errs = c.checkTree(job.ID, job.Tree)
} }
if len(errs) > 0 { if len(errs) == 0 {
debug.Log("checked tree %v: %v errors", job.ID, len(errs)) continue
treeError = TreeError{ID: job.ID, Errors: errs}
outCh = out
inCh = nil
} }
treeError := TreeError{ID: job.ID, Errors: errs}
case outCh <- treeError: select {
case <-ctx.Done():
return
case out <- treeError:
debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors)) debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
outCh = nil
inCh = in
} }
} }
} }
func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) { func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
defer func() {
debug.Log("closing output channels")
close(loaderChan)
close(out)
}()
var ( var (
inCh = in inCh = in
outCh = out outCh chan<- treeJob
loadCh = loaderChan loadCh chan<- restic.ID
job treeJob job treeJob
nextTreeID restic.ID nextTreeID restic.ID
outstandingLoadTreeJobs = 0 outstandingLoadTreeJobs = 0
) )
outCh = nil
loadCh = nil
for { for {
if loadCh == nil && len(backlog) > 0 { if loadCh == nil && len(backlog) > 0 {
// process last added ids first, that is traverse the tree in depth-first order // process last added ids first, that is traverse the tree in depth-first order
@ -528,8 +469,6 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids resti
// subtrees are available in the index. errChan is closed after all trees have // subtrees are available in the index. errChan is closed after all trees have
// been traversed. // been traversed.
func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { func (c *Checker) Structure(ctx context.Context, errChan chan<- error) {
defer close(errChan)
trees, errs := loadSnapshotTreeIDs(ctx, c.repo) trees, errs := loadSnapshotTreeIDs(ctx, c.repo)
debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
@ -541,18 +480,42 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) {
} }
} }
treeIDChan := make(chan restic.ID) loaderChan := make(chan restic.ID)
treeJobChan1 := make(chan treeJob) loadedTreeChan := make(chan treeJob)
treeJobChan2 := make(chan treeJob) treeStream := make(chan treeJob)
wg, ctx := errgroup.WithContext(ctx)
var loadTreeWg sync.WaitGroup
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ { for i := 0; i < defaultParallelism; i++ {
wg.Add(2) loadTreeWg.Add(1)
go loadTreeWorker(ctx, c.repo, treeIDChan, treeJobChan1, &wg) wg.Go(func() error {
go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg) defer loadTreeWg.Done()
loadTreeWorker(ctx, c.repo, loaderChan, loadedTreeChan)
return nil
})
}
// close once all loadTreeWorkers have completed
wg.Go(func() error {
loadTreeWg.Wait()
close(loadedTreeChan)
return nil
})
defer close(errChan)
for i := 0; i < defaultParallelism; i++ {
wg.Go(func() error {
c.checkTreeWorker(ctx, treeStream, errChan)
return nil
})
} }
c.filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2) wg.Go(func() error {
defer close(loaderChan)
defer close(treeStream)
c.filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream)
return nil
})
wg.Wait() wg.Wait()
} }