mirror of
https://github.com/restic/restic.git
synced 2024-12-23 08:16:36 +00:00
67e2ba0d40
This allows the Lock function to access the backend, even once the Backend method is removed from the interface.
274 lines
7.7 KiB
Go
274 lines
7.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/restic/restic/internal/backend"
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
)
|
|
|
|
type lockContext struct {
|
|
lock *restic.Lock
|
|
cancel context.CancelFunc
|
|
refreshWG sync.WaitGroup
|
|
}
|
|
|
|
type locker struct {
|
|
retrySleepStart time.Duration
|
|
retrySleepMax time.Duration
|
|
refreshInterval time.Duration
|
|
refreshabilityTimeout time.Duration
|
|
}
|
|
|
|
const defaultRefreshInterval = 5 * time.Minute
|
|
|
|
var lockerInst = &locker{
|
|
retrySleepStart: 5 * time.Second,
|
|
retrySleepMax: 60 * time.Second,
|
|
refreshInterval: defaultRefreshInterval,
|
|
// consider a lock refresh failed a bit before the lock actually becomes stale
|
|
// the difference allows to compensate for a small time drift between clients.
|
|
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
|
|
}
|
|
|
|
func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
|
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
|
|
}
|
|
|
|
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
|
// cancelling the original context also stops the lock refresh
|
|
func (l *locker) Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
|
|
|
lockFn := restic.NewLock
|
|
if exclusive {
|
|
lockFn = restic.NewExclusiveLock
|
|
}
|
|
|
|
var lock *restic.Lock
|
|
var err error
|
|
|
|
retrySleep := minDuration(l.retrySleepStart, retryLock)
|
|
retryMessagePrinted := false
|
|
retryTimeout := time.After(retryLock)
|
|
|
|
retryLoop:
|
|
for {
|
|
lock, err = lockFn(ctx, repo)
|
|
if err != nil && restic.IsAlreadyLocked(err) {
|
|
|
|
if !retryMessagePrinted {
|
|
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
|
|
retryMessagePrinted = true
|
|
}
|
|
|
|
debug.Log("repo already locked, retrying in %v", retrySleep)
|
|
retrySleepCh := time.After(retrySleep)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx, ctx.Err()
|
|
case <-retryTimeout:
|
|
debug.Log("repo already locked, timeout expired")
|
|
// Last lock attempt
|
|
lock, err = lockFn(ctx, repo)
|
|
break retryLoop
|
|
case <-retrySleepCh:
|
|
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
|
|
}
|
|
} else {
|
|
// anything else, either a successful lock or another error
|
|
break retryLoop
|
|
}
|
|
}
|
|
if restic.IsInvalidLock(err) {
|
|
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
|
|
}
|
|
if err != nil {
|
|
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
|
|
}
|
|
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
lockInfo := &lockContext{
|
|
lock: lock,
|
|
cancel: cancel,
|
|
}
|
|
lockInfo.refreshWG.Add(2)
|
|
refreshChan := make(chan struct{})
|
|
forceRefreshChan := make(chan refreshLockRequest)
|
|
|
|
go l.refreshLocks(ctx, repo.be, lockInfo, refreshChan, forceRefreshChan, logger)
|
|
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
|
|
|
return &Unlocker{lockInfo}, ctx, nil
|
|
}
|
|
|
|
func minDuration(a, b time.Duration) time.Duration {
|
|
if a <= b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
type refreshLockRequest struct {
|
|
result chan bool
|
|
}
|
|
|
|
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
debug.Log("start")
|
|
lock := lockInfo.lock
|
|
ticker := time.NewTicker(l.refreshInterval)
|
|
lastRefresh := lock.Time
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
// ensure that the context was cancelled before removing the lock
|
|
lockInfo.cancel()
|
|
|
|
// remove the lock from the repo
|
|
debug.Log("unlocking repository with lock %v", lock)
|
|
if err := lock.Unlock(); err != nil {
|
|
debug.Log("error while unlocking: %v", err)
|
|
logger("error while unlocking: %v", err)
|
|
}
|
|
|
|
lockInfo.refreshWG.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate")
|
|
return
|
|
|
|
case req := <-forceRefresh:
|
|
debug.Log("trying to refresh stale lock")
|
|
// keep on going if our current lock still exists
|
|
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
|
|
// inform refresh goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case req.result <- success:
|
|
}
|
|
|
|
if success {
|
|
// update lock refresh time
|
|
lastRefresh = lock.Time
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if time.Since(lastRefresh) > l.refreshabilityTimeout {
|
|
// the lock is too old, wait until the expiry monitor cancels the context
|
|
continue
|
|
}
|
|
|
|
debug.Log("refreshing locks")
|
|
err := lock.Refresh(context.TODO())
|
|
if err != nil {
|
|
logger("unable to refresh lock: %v\n", err)
|
|
} else {
|
|
lastRefresh = lock.Time
|
|
// inform monitor goroutine about successful refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case refreshed <- struct{}{}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
// time.Now() might use a monotonic timer which is paused during standby
|
|
// convert to unix time to ensure we compare real time values
|
|
lastRefresh := time.Now().UnixNano()
|
|
pollDuration := 1 * time.Second
|
|
if l.refreshInterval < pollDuration {
|
|
// required for TestLockFailedRefresh
|
|
pollDuration = l.refreshInterval / 5
|
|
}
|
|
// timers are paused during standby, which is a problem as the refresh timeout
|
|
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
|
// https://github.com/golang/go/issues/35012
|
|
ticker := time.NewTicker(pollDuration)
|
|
defer func() {
|
|
ticker.Stop()
|
|
lockInfo.cancel()
|
|
lockInfo.refreshWG.Done()
|
|
}()
|
|
|
|
var refreshStaleLockResult chan bool
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate expiry monitoring")
|
|
return
|
|
case <-refreshed:
|
|
if refreshStaleLockResult != nil {
|
|
// ignore delayed refresh notifications while the stale lock is refreshed
|
|
continue
|
|
}
|
|
lastRefresh = time.Now().UnixNano()
|
|
case <-ticker.C:
|
|
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
|
continue
|
|
}
|
|
|
|
debug.Log("trying to refreshStaleLock")
|
|
// keep on going if our current lock still exists
|
|
refreshReq := refreshLockRequest{
|
|
result: make(chan bool),
|
|
}
|
|
refreshStaleLockResult = refreshReq.result
|
|
|
|
// inform refresh goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case forceRefresh <- refreshReq:
|
|
}
|
|
case success := <-refreshStaleLockResult:
|
|
if success {
|
|
lastRefresh = time.Now().UnixNano()
|
|
refreshStaleLockResult = nil
|
|
continue
|
|
}
|
|
|
|
logger("Fatal: failed to refresh lock in time\n")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
|
|
freeze := backend.AsBackend[backend.FreezeBackend](be)
|
|
if freeze != nil {
|
|
debug.Log("freezing backend")
|
|
freeze.Freeze()
|
|
defer freeze.Unfreeze()
|
|
}
|
|
|
|
err := lock.RefreshStaleLock(ctx)
|
|
if err != nil {
|
|
logger("failed to refresh stale lock: %v\n", err)
|
|
// cancel context while the backend is still frozen to prevent accidental modifications
|
|
cancel()
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type Unlocker struct {
|
|
info *lockContext
|
|
}
|
|
|
|
func (l *Unlocker) Unlock() {
|
|
l.info.cancel()
|
|
l.info.refreshWG.Wait()
|
|
}
|