1
0
Fork 0
mirror of https://github.com/restic/restic.git synced 2025-03-12 07:08:28 +00:00
restic/internal/repository/lock.go
Michael Eischer 99e105eeb6 repository: restrict SaveUnpacked and RemoveUnpacked
Those methods now only allow modifying snapshots. Internal data types
used by the repository are now read-only. The repository-internal code
can bypass the restrictions by wrapping the repository in an
`internalRepository` type.

The restriction itself is implemented by using a new datatype
WriteableFileType in the SaveUnpacked and RemoveUnpacked methods. This
statically ensures that code cannot bypass the access restrictions.

The test changes are somewhat noisy as some of them modify repository
internals and therefore require some way to bypass the access
restrictions. This works by capturing an `internalRepository` or
`Backend` when creating the Repository using a test helper function.
2025-01-13 22:39:57 +01:00

307 lines
8.7 KiB
Go

package repository
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/restic/restic/internal/backend"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
type lockContext struct {
lock *restic.Lock
cancel context.CancelFunc
refreshWG sync.WaitGroup
}
type locker struct {
retrySleepStart time.Duration
retrySleepMax time.Duration
refreshInterval time.Duration
refreshabilityTimeout time.Duration
}
const defaultRefreshInterval = 5 * time.Minute
var lockerInst = &locker{
retrySleepStart: 5 * time.Second,
retrySleepMax: 60 * time.Second,
refreshInterval: defaultRefreshInterval,
// consider a lock refresh failed a bit before the lock actually becomes stale
// the difference allows to compensate for a small time drift between clients.
refreshabilityTimeout: restic.StaleLockTimeout - defaultRefreshInterval*3/2,
}
func Lock(ctx context.Context, repo *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
return lockerInst.Lock(ctx, repo, exclusive, retryLock, printRetry, logger)
}
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
// cancelling the original context also stops the lock refresh
func (l *locker) Lock(ctx context.Context, r *Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
var lock *restic.Lock
var err error
retrySleep := minDuration(l.retrySleepStart, retryLock)
retryMessagePrinted := false
retryTimeout := time.After(retryLock)
repo := &internalRepository{r}
retryLoop:
for {
lock, err = restic.NewLock(ctx, repo, exclusive)
if err != nil && restic.IsAlreadyLocked(err) {
if !retryMessagePrinted {
printRetry(fmt.Sprintf("repo already locked, waiting up to %s for the lock\n", retryLock))
retryMessagePrinted = true
}
debug.Log("repo already locked, retrying in %v", retrySleep)
retrySleepCh := time.After(retrySleep)
select {
case <-ctx.Done():
return nil, ctx, ctx.Err()
case <-retryTimeout:
debug.Log("repo already locked, timeout expired")
// Last lock attempt
lock, err = restic.NewLock(ctx, repo, exclusive)
break retryLoop
case <-retrySleepCh:
retrySleep = minDuration(retrySleep*2, l.retrySleepMax)
}
} else {
// anything else, either a successful lock or another error
break retryLoop
}
}
if restic.IsInvalidLock(err) {
return nil, ctx, errors.Fatalf("%v\n\nthe `unlock --remove-all` command can be used to remove invalid locks. Make sure that no other restic process is accessing the repository when running the command", err)
}
if err != nil {
return nil, ctx, fmt.Errorf("unable to create lock in backend: %w", err)
}
debug.Log("create lock %p (exclusive %v)", lock, exclusive)
ctx, cancel := context.WithCancel(ctx)
lockInfo := &lockContext{
lock: lock,
cancel: cancel,
}
lockInfo.refreshWG.Add(2)
refreshChan := make(chan struct{})
forceRefreshChan := make(chan refreshLockRequest)
go l.refreshLocks(ctx, repo.be, lockInfo, refreshChan, forceRefreshChan, logger)
go l.monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
return &Unlocker{lockInfo}, ctx, nil
}
func minDuration(a, b time.Duration) time.Duration {
if a <= b {
return a
}
return b
}
type refreshLockRequest struct {
result chan bool
}
func (l *locker) refreshLocks(ctx context.Context, backend backend.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest, logger func(format string, args ...interface{})) {
debug.Log("start")
lock := lockInfo.lock
ticker := time.NewTicker(l.refreshInterval)
lastRefresh := lock.Time
defer func() {
ticker.Stop()
// ensure that the context was cancelled before removing the lock
lockInfo.cancel()
// remove the lock from the repo
debug.Log("unlocking repository with lock %v", lock)
if err := lock.Unlock(ctx); err != nil {
debug.Log("error while unlocking: %v", err)
logger("error while unlocking: %v", err)
}
lockInfo.refreshWG.Done()
}()
for {
select {
case <-ctx.Done():
debug.Log("terminate")
return
case req := <-forceRefresh:
debug.Log("trying to refresh stale lock")
// keep on going if our current lock still exists
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel, logger)
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case req.result <- success:
}
if success {
// update lock refresh time
lastRefresh = lock.Time
}
case <-ticker.C:
if time.Since(lastRefresh) > l.refreshabilityTimeout {
// the lock is too old, wait until the expiry monitor cancels the context
continue
}
debug.Log("refreshing locks")
err := lock.Refresh(context.TODO())
if err != nil {
logger("unable to refresh lock: %v\n", err)
} else {
lastRefresh = lock.Time
// inform monitor goroutine about successful refresh
select {
case <-ctx.Done():
case refreshed <- struct{}{}:
}
}
}
}
}
func (l *locker) monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest, logger func(format string, args ...interface{})) {
// time.Now() might use a monotonic timer which is paused during standby
// convert to unix time to ensure we compare real time values
lastRefresh := time.Now().UnixNano()
pollDuration := 1 * time.Second
if l.refreshInterval < pollDuration {
// required for TestLockFailedRefresh
pollDuration = l.refreshInterval / 5
}
// timers are paused during standby, which is a problem as the refresh timeout
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
// https://github.com/golang/go/issues/35012
ticker := time.NewTicker(pollDuration)
defer func() {
ticker.Stop()
lockInfo.cancel()
lockInfo.refreshWG.Done()
}()
var refreshStaleLockResult chan bool
for {
select {
case <-ctx.Done():
debug.Log("terminate expiry monitoring")
return
case <-refreshed:
if refreshStaleLockResult != nil {
// ignore delayed refresh notifications while the stale lock is refreshed
continue
}
lastRefresh = time.Now().UnixNano()
case <-ticker.C:
if time.Now().UnixNano()-lastRefresh < l.refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
continue
}
debug.Log("trying to refreshStaleLock")
// keep on going if our current lock still exists
refreshReq := refreshLockRequest{
result: make(chan bool),
}
refreshStaleLockResult = refreshReq.result
// inform refresh goroutine about forced refresh
select {
case <-ctx.Done():
case forceRefresh <- refreshReq:
}
case success := <-refreshStaleLockResult:
if success {
lastRefresh = time.Now().UnixNano()
refreshStaleLockResult = nil
continue
}
logger("Fatal: failed to refresh lock in time\n")
return
}
}
}
func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.Lock, cancel context.CancelFunc, logger func(format string, args ...interface{})) bool {
freeze := backend.AsBackend[backend.FreezeBackend](be)
if freeze != nil {
debug.Log("freezing backend")
freeze.Freeze()
defer freeze.Unfreeze()
}
err := lock.RefreshStaleLock(ctx)
if err != nil {
logger("failed to refresh stale lock: %v\n", err)
// cancel context while the backend is still frozen to prevent accidental modifications
cancel()
return false
}
return true
}
type Unlocker struct {
info *lockContext
}
func (l *Unlocker) Unlock() {
l.info.cancel()
l.info.refreshWG.Wait()
}
// RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo *Repository) (uint, error) {
var processed uint
err := restic.ForAllLocks(ctx, repo, nil, func(id restic.ID, lock *restic.Lock, err error) error {
if err != nil {
// ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err)
return nil
}
if lock.Stale() {
err = (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
if err == nil {
processed++
}
return err
}
return nil
})
return processed, err
}
// RemoveAllLocks removes all locks forcefully.
func RemoveAllLocks(ctx context.Context, repo *Repository) (uint, error) {
var processed uint32
err := restic.ParallelList(ctx, repo, restic.LockFile, repo.Connections(), func(ctx context.Context, id restic.ID, _ int64) error {
err := (&internalRepository{repo}).RemoveUnpacked(ctx, restic.LockFile, id)
if err == nil {
atomic.AddUint32(&processed, 1)
}
return err
})
return uint(processed), err
}