mirror of
https://github.com/restic/restic.git
synced 2025-03-11 22:53:23 +00:00
Those methods now only allow modifying snapshots. Internal data types used by the repository are now read-only. The repository-internal code can bypass the restrictions by wrapping the repository in an `internalRepository` type. The restriction itself is implemented by using a new datatype WriteableFileType in the SaveUnpacked and RemoveUnpacked methods. This statically ensures that code cannot bypass the access restrictions. The test changes are somewhat noisy as some of them modify repository internals and therefore require some way to bypass the access restrictions. This works by capturing an `internalRepository` or `Backend` when creating the Repository using a test helper function.
307 lines
8.7 KiB
Go
307 lines
8.7 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/restic/restic/internal/backend"
|
|
"github.com/restic/restic/internal/debug"
|
|
"github.com/restic/restic/internal/errors"
|
|
"github.com/restic/restic/internal/restic"
|
|
)
|
|
|
|
type lockContext struct {
|
|
lock *restic.Lock
|
|
cancel context.CancelFunc
|
|
refreshWG sync.WaitGroup
|
|
}
|
|
|
|
type locker struct {
|
|
retrySleepStart time.Duration
|
|
retrySleepMax time.Duration
|
|
refreshInterval time.Duration
|
|
refreshabilityTimeout time.Duration
|
|
}
|
|
|
|
const defaultRefreshInterval = 5 * time.Minute
|
|
|
|
var lockerInst = &locker{
|
|
retrySleepStart: 5 * time.Second,
|
|
retrySleepMax: 60 * time.Second,
|
|
refreshInterval: defaultRefreshInterval,
|
|
// consider a lock refresh failed a bit before the lock actually becomes stale
|
|
// the difference allows to compensate for a small time drift between clients.
|
|
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
|
|
}
|
|
|
|
func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
|
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
|
|
}
|
|
|
|
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
|
// cancelling the original context also stops the lock refresh
|
|
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
|
var lock *restic.Lock
|
|
var err error
|
|
|
|
retrySleep := minDuration(l.retrySleepStart, retryLock)
|
|
retryMessagePrinted := false
|
|
retryTimeout := time.After(retryLock)
|
|
|
|
repo := &internalRepository{r}
|
|
|
|
retryLoop:
|
|
for {
|
|
lock, err = restic.NewLock(ctx, repo, exclusive)
|
|
if err != nil && restic.IsAlreadyLocked(err) {
|
|
|
|
if !retryMessagePrinted {
|
|
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
|
|
retryMessagePrinted = true
|
|
}
|
|
|
|
debug.Log("repo already locked, retrying in %v", retrySleep)
|
|
retrySleepCh := time.After(retrySleep)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx, ctx.Err()
|
|
case <-retryTimeout:
|
|
debug.Log("repo already locked, timeout expired")
|
|
// Last lock attempt
|
|
lock, err = restic.NewLock(ctx, repo, exclusive)
|
|
break retryLoop
|
|
case <-retrySleepCh:
|
|
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
|
|
}
|
|
} else {
|
|
// anything else, either a successful lock or another error
|
|
break retryLoop
|
|
}
|
|
}
|
|
if restic.IsInvalidLock(err) {
|
|
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
|
|
}
|
|
if err != nil {
|
|
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
|
|
}
|
|
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
lockInfo := &lockContext{
|
|
lock: lock,
|
|
cancel: cancel,
|
|
}
|
|
lockInfo.refreshWG.Add(2)
|
|
refreshChan := make(chan struct{})
|
|
forceRefreshChan := make(chan refreshLockRequest)
|
|
|
|
go l.refreshLocks(ctx, repo.be, lockInfo, refreshChan, forceRefreshChan, logger)
|
|
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
|
|
|
return &Unlocker{lockInfo}, ctx, nil
|
|
}
|
|
|
|
func minDuration(a, b time.Duration) time.Duration {
|
|
if a <= b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
type refreshLockRequest struct {
|
|
result chan bool
|
|
}
|
|
|
|
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
debug.Log("start")
|
|
lock := lockInfo.lock
|
|
ticker := time.NewTicker(l.refreshInterval)
|
|
lastRefresh := lock.Time
|
|
|
|
defer func() {
|
|
ticker.Stop()
|
|
// ensure that the context was cancelled before removing the lock
|
|
lockInfo.cancel()
|
|
|
|
// remove the lock from the repo
|
|
debug.Log("unlocking repository with lock %v", lock)
|
|
if err := lock.Unlock(ctx); err != nil {
|
|
debug.Log("error while unlocking: %v", err)
|
|
logger("error while unlocking: %v", err)
|
|
}
|
|
|
|
lockInfo.refreshWG.Done()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate")
|
|
return
|
|
|
|
case req := <-forceRefresh:
|
|
debug.Log("trying to refresh stale lock")
|
|
// keep on going if our current lock still exists
|
|
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
|
|
// inform refresh goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case req.result <- success:
|
|
}
|
|
|
|
if success {
|
|
// update lock refresh time
|
|
lastRefresh = lock.Time
|
|
}
|
|
|
|
case <-ticker.C:
|
|
if time.Since(lastRefresh) > l.refreshabilityTimeout {
|
|
// the lock is too old, wait until the expiry monitor cancels the context
|
|
continue
|
|
}
|
|
|
|
debug.Log("refreshing locks")
|
|
err := lock.Refresh(context.TODO())
|
|
if err != nil {
|
|
logger("unable to refresh lock: %v\n", err)
|
|
} else {
|
|
lastRefresh = lock.Time
|
|
// inform monitor goroutine about successful refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case refreshed <- struct{}{}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
|
|
// time.Now() might use a monotonic timer which is paused during standby
|
|
// convert to unix time to ensure we compare real time values
|
|
lastRefresh := time.Now().UnixNano()
|
|
pollDuration := 1 * time.Second
|
|
if l.refreshInterval < pollDuration {
|
|
// required for TestLockFailedRefresh
|
|
pollDuration = l.refreshInterval / 5
|
|
}
|
|
// timers are paused during standby, which is a problem as the refresh timeout
|
|
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
|
// https://github.com/golang/go/issues/35012
|
|
ticker := time.NewTicker(pollDuration)
|
|
defer func() {
|
|
ticker.Stop()
|
|
lockInfo.cancel()
|
|
lockInfo.refreshWG.Done()
|
|
}()
|
|
|
|
var refreshStaleLockResult chan bool
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
debug.Log("terminate expiry monitoring")
|
|
return
|
|
case <-refreshed:
|
|
if refreshStaleLockResult != nil {
|
|
// ignore delayed refresh notifications while the stale lock is refreshed
|
|
continue
|
|
}
|
|
lastRefresh = time.Now().UnixNano()
|
|
case <-ticker.C:
|
|
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
|
continue
|
|
}
|
|
|
|
debug.Log("trying to refreshStaleLock")
|
|
// keep on going if our current lock still exists
|
|
refreshReq := refreshLockRequest{
|
|
result: make(chan bool),
|
|
}
|
|
refreshStaleLockResult = refreshReq.result
|
|
|
|
// inform refresh goroutine about forced refresh
|
|
select {
|
|
case <-ctx.Done():
|
|
case forceRefresh <- refreshReq:
|
|
}
|
|
case success := <-refreshStaleLockResult:
|
|
if success {
|
|
lastRefresh = time.Now().UnixNano()
|
|
refreshStaleLockResult = nil
|
|
continue
|
|
}
|
|
|
|
logger("Fatal: failed to refresh lock in time\n")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
|
|
freeze := backend.AsBackend[backend.FreezeBackend](be)
|
|
if freeze != nil {
|
|
debug.Log("freezing backend")
|
|
freeze.Freeze()
|
|
defer freeze.Unfreeze()
|
|
}
|
|
|
|
err := lock.RefreshStaleLock(ctx)
|
|
if err != nil {
|
|
logger("failed to refresh stale lock: %v\n", err)
|
|
// cancel context while the backend is still frozen to prevent accidental modifications
|
|
cancel()
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
type Unlocker struct {
|
|
info *lockContext
|
|
}
|
|
|
|
func (l *Unlocker) Unlock() {
|
|
l.info.cancel()
|
|
l.info.refreshWG.Wait()
|
|
}
|
|
|
|
// RemoveStaleLocks deletes all locks detected as stale from the repository.
|
|
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
|
|
var processed uint
|
|
err := restic.ForAllLocks(ctx, repo, nil, func(id restic.ID, lock *restic.Lock, err error) error {
|
|
if err != nil {
|
|
// ignore locks that cannot be loaded
|
|
debug.Log("ignore lock %v: %v", id, err)
|
|
return nil
|
|
}
|
|
|
|
if lock.Stale() {
|
|
err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
|
|
if err == nil {
|
|
processed++
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
return processed, err
|
|
}
|
|
|
|
// RemoveAllLocks removes all locks forcefully.
|
|
func RemoveAllLocks(ctx context.Context, repo *Repository) (uint, error) {
|
|
var processed uint32
|
|
err := restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, _ int64) error {
|
|
err := (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
|
|
if err == nil {
|
|
atomic.AddUint32(&processed, 1)
|
|
}
|
|
return err
|
|
})
|
|
return uint(processed), err
|
|
}
|