mirror of https://github.com/restic/restic.git
repository: remove global list of locks
This commit is contained in:
parent
cbb5f89252
commit
e8df50fa3c
|
@ -2,16 +2,10 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/restic/restic/internal/repository"
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var globalLocks struct {
|
|
||||||
sync.Once
|
|
||||||
}
|
|
||||||
|
|
||||||
func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun bool, exclusive bool) (context.Context, *repository.Repository, func(), error) {
|
func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun bool, exclusive bool) (context.Context, *repository.Repository, func(), error) {
|
||||||
repo, err := OpenRepository(ctx, gopts)
|
repo, err := OpenRepository(ctx, gopts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -20,22 +14,22 @@ func internalOpenWithLocked(ctx context.Context, gopts GlobalOptions, dryRun boo
|
||||||
|
|
||||||
unlock := func() {}
|
unlock := func() {}
|
||||||
if !dryRun {
|
if !dryRun {
|
||||||
var lock *restic.Lock
|
var lock *repository.Unlocker
|
||||||
|
|
||||||
// make sure that a repository is unlocked properly and after cancel() was
|
|
||||||
// called by the cleanup handler in global.go
|
|
||||||
globalLocks.Do(func() {
|
|
||||||
AddCleanupHandler(repository.UnlockAll)
|
|
||||||
})
|
|
||||||
|
|
||||||
lock, ctx, err = repository.Lock(ctx, repo, exclusive, gopts.RetryLock, func(msg string) {
|
lock, ctx, err = repository.Lock(ctx, repo, exclusive, gopts.RetryLock, func(msg string) {
|
||||||
if !gopts.JSON {
|
if !gopts.JSON {
|
||||||
Verbosef("%s", msg)
|
Verbosef("%s", msg)
|
||||||
}
|
}
|
||||||
}, Warnf)
|
}, Warnf)
|
||||||
unlock = func() {
|
|
||||||
repository.Unlock(lock)
|
unlock = lock.Unlock
|
||||||
}
|
// make sure that a repository is unlocked properly and after cancel() was
|
||||||
|
// called by the cleanup handler in global.go
|
||||||
|
AddCleanupHandler(func(code int) (int, error) {
|
||||||
|
lock.Unlock()
|
||||||
|
return code, nil
|
||||||
|
})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,6 @@ type lockContext struct {
|
||||||
refreshWG sync.WaitGroup
|
refreshWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
var globalLocks struct {
|
|
||||||
locks map[*restic.Lock]*lockContext
|
|
||||||
sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
retrySleepStart = 5 * time.Second
|
retrySleepStart = 5 * time.Second
|
||||||
retrySleepMax = 60 * time.Second
|
retrySleepMax = 60 * time.Second
|
||||||
|
@ -37,7 +32,7 @@ func minDuration(a, b time.Duration) time.Duration {
|
||||||
|
|
||||||
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
// Lock wraps the ctx such that it is cancelled when the repository is unlocked
|
||||||
// cancelling the original context also stops the lock refresh
|
// cancelling the original context also stops the lock refresh
|
||||||
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*restic.Lock, context.Context, error) {
|
func Lock(ctx context.Context, repo restic.Repository, exclusive bool, retryLock time.Duration, printRetry func(msg string), logger func(format string, args ...interface{})) (*Unlocker, context.Context, error) {
|
||||||
|
|
||||||
lockFn := restic.NewLock
|
lockFn := restic.NewLock
|
||||||
if exclusive {
|
if exclusive {
|
||||||
|
@ -97,13 +92,10 @@ retryLoop:
|
||||||
refreshChan := make(chan struct{})
|
refreshChan := make(chan struct{})
|
||||||
forceRefreshChan := make(chan refreshLockRequest)
|
forceRefreshChan := make(chan refreshLockRequest)
|
||||||
|
|
||||||
globalLocks.Lock()
|
|
||||||
globalLocks.locks[lock] = lockInfo
|
|
||||||
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
|
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan, logger)
|
||||||
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan, logger)
|
||||||
globalLocks.Unlock()
|
|
||||||
|
|
||||||
return lock, ctx, err
|
return &Unlocker{lockInfo}, ctx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var refreshInterval = 5 * time.Minute
|
var refreshInterval = 5 * time.Minute
|
||||||
|
@ -261,41 +253,11 @@ func tryRefreshStaleLock(ctx context.Context, be backend.Backend, lock *restic.L
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func Unlock(lock *restic.Lock) {
|
type Unlocker struct {
|
||||||
if lock == nil {
|
info *lockContext
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
globalLocks.Lock()
|
func (l *Unlocker) Unlock() {
|
||||||
lockInfo, exists := globalLocks.locks[lock]
|
l.info.cancel()
|
||||||
delete(globalLocks.locks, lock)
|
l.info.refreshWG.Wait()
|
||||||
globalLocks.Unlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
debug.Log("unable to find lock %v in the global list of locks, ignoring", lock)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
lockInfo.cancel()
|
|
||||||
lockInfo.refreshWG.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func UnlockAll(code int) (int, error) {
|
|
||||||
globalLocks.Lock()
|
|
||||||
locks := globalLocks.locks
|
|
||||||
debug.Log("unlocking %d locks", len(globalLocks.locks))
|
|
||||||
for _, lockInfo := range globalLocks.locks {
|
|
||||||
lockInfo.cancel()
|
|
||||||
}
|
|
||||||
globalLocks.locks = make(map[*restic.Lock]*lockContext)
|
|
||||||
globalLocks.Unlock()
|
|
||||||
|
|
||||||
for _, lockInfo := range locks {
|
|
||||||
lockInfo.refreshWG.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
return code, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
globalLocks.locks = make(map[*restic.Lock]*lockContext)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,11 +37,11 @@ func openLockTestRepo(t *testing.T, wrapper backendWrapper) restic.Repository {
|
||||||
return repo
|
return repo
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, retryLock time.Duration) (*restic.Lock, context.Context) {
|
func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository, retryLock time.Duration) (*Unlocker, context.Context) {
|
||||||
lock, wrappedCtx, err := Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
lock, wrappedCtx, err := Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
test.OK(t, wrappedCtx.Err())
|
test.OK(t, wrappedCtx.Err())
|
||||||
if lock.Stale() {
|
if lock.info.lock.Stale() {
|
||||||
t.Fatal("lock returned stale lock")
|
t.Fatal("lock returned stale lock")
|
||||||
}
|
}
|
||||||
return lock, wrappedCtx
|
return lock, wrappedCtx
|
||||||
|
@ -51,7 +51,7 @@ func TestLock(t *testing.T) {
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
||||||
Unlock(lock)
|
lock.Unlock()
|
||||||
if wrappedCtx.Err() == nil {
|
if wrappedCtx.Err() == nil {
|
||||||
t.Fatal("unlock did not cancel context")
|
t.Fatal("unlock did not cancel context")
|
||||||
}
|
}
|
||||||
|
@ -69,21 +69,7 @@ func TestLockCancel(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock should not crash
|
// Unlock should not crash
|
||||||
Unlock(lock)
|
lock.Unlock()
|
||||||
}
|
|
||||||
|
|
||||||
func TestLockUnlockAll(t *testing.T) {
|
|
||||||
repo := openLockTestRepo(t, nil)
|
|
||||||
|
|
||||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, 0)
|
|
||||||
_, err := UnlockAll(0)
|
|
||||||
test.OK(t, err)
|
|
||||||
if wrappedCtx.Err() == nil {
|
|
||||||
t.Fatal("canceled parent context did not cancel context")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unlock should not crash
|
|
||||||
Unlock(lock)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockConflict(t *testing.T) {
|
func TestLockConflict(t *testing.T) {
|
||||||
|
@ -94,7 +80,7 @@ func TestLockConflict(t *testing.T) {
|
||||||
|
|
||||||
lock, _, err := Lock(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
lock, _, err := Lock(context.Background(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
defer Unlock(lock)
|
defer lock.Unlock()
|
||||||
_, _, err = Lock(context.Background(), repo2, false, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
_, _, err = Lock(context.Background(), repo2, false, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Fatal("second lock should have failed")
|
t.Fatal("second lock should have failed")
|
||||||
|
@ -137,7 +123,7 @@ func TestLockFailedRefresh(t *testing.T) {
|
||||||
t.Fatal("failed lock refresh did not cause context cancellation")
|
t.Fatal("failed lock refresh did not cause context cancellation")
|
||||||
}
|
}
|
||||||
// Unlock should not crash
|
// Unlock should not crash
|
||||||
Unlock(lock)
|
lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
type loggingBackend struct {
|
type loggingBackend struct {
|
||||||
|
@ -186,7 +172,7 @@ func TestLockSuccessfulRefresh(t *testing.T) {
|
||||||
// expected lock refresh to work
|
// expected lock refresh to work
|
||||||
}
|
}
|
||||||
// Unlock should not crash
|
// Unlock should not crash
|
||||||
Unlock(lock)
|
lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
type slowBackend struct {
|
type slowBackend struct {
|
||||||
|
@ -248,19 +234,21 @@ func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unlock should not crash
|
// Unlock should not crash
|
||||||
Unlock(lock)
|
lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockWaitTimeout(t *testing.T) {
|
func TestLockWaitTimeout(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
|
defer elock.Unlock()
|
||||||
|
|
||||||
retryLock := 200 * time.Millisecond
|
retryLock := 200 * time.Millisecond
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
lock, _, err := Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
_, _, err = Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
|
|
||||||
test.Assert(t, err != nil,
|
test.Assert(t, err != nil,
|
||||||
|
@ -269,16 +257,15 @@ func TestLockWaitTimeout(t *testing.T) {
|
||||||
"create normal lock with exclusively locked repo didn't return the correct error")
|
"create normal lock with exclusively locked repo didn't return the correct error")
|
||||||
test.Assert(t, retryLock <= duration && duration < retryLock*3/2,
|
test.Assert(t, retryLock <= duration && duration < retryLock*3/2,
|
||||||
"create normal lock with exclusively locked repo didn't wait for the specified timeout")
|
"create normal lock with exclusively locked repo didn't wait for the specified timeout")
|
||||||
|
|
||||||
test.OK(t, lock.Unlock())
|
|
||||||
test.OK(t, elock.Unlock())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockWaitCancel(t *testing.T) {
|
func TestLockWaitCancel(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
|
defer elock.Unlock()
|
||||||
|
|
||||||
retryLock := 200 * time.Millisecond
|
retryLock := 200 * time.Millisecond
|
||||||
cancelAfter := 40 * time.Millisecond
|
cancelAfter := 40 * time.Millisecond
|
||||||
|
@ -287,7 +274,7 @@ func TestLockWaitCancel(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.TODO())
|
ctx, cancel := context.WithCancel(context.TODO())
|
||||||
time.AfterFunc(cancelAfter, cancel)
|
time.AfterFunc(cancelAfter, cancel)
|
||||||
|
|
||||||
lock, _, err := Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
_, _, err = Lock(ctx, repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
|
|
||||||
test.Assert(t, err != nil,
|
test.Assert(t, err != nil,
|
||||||
|
@ -296,12 +283,10 @@ func TestLockWaitCancel(t *testing.T) {
|
||||||
"create normal lock with exclusively locked repo didn't return the correct error")
|
"create normal lock with exclusively locked repo didn't return the correct error")
|
||||||
test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond,
|
test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond,
|
||||||
"create normal lock with exclusively locked repo didn't return in time, duration %v", duration)
|
"create normal lock with exclusively locked repo didn't return in time, duration %v", duration)
|
||||||
|
|
||||||
test.OK(t, lock.Unlock())
|
|
||||||
test.OK(t, elock.Unlock())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLockWaitSuccess(t *testing.T) {
|
func TestLockWaitSuccess(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
repo := openLockTestRepo(t, nil)
|
repo := openLockTestRepo(t, nil)
|
||||||
|
|
||||||
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
elock, _, err := Lock(context.TODO(), repo, true, 0, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
|
@ -311,11 +296,10 @@ func TestLockWaitSuccess(t *testing.T) {
|
||||||
unlockAfter := 40 * time.Millisecond
|
unlockAfter := 40 * time.Millisecond
|
||||||
|
|
||||||
time.AfterFunc(unlockAfter, func() {
|
time.AfterFunc(unlockAfter, func() {
|
||||||
test.OK(t, elock.Unlock())
|
elock.Unlock()
|
||||||
})
|
})
|
||||||
|
|
||||||
lock, _, err := Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
lock, _, err := Lock(context.TODO(), repo, false, retryLock, func(msg string) {}, func(format string, args ...interface{}) {})
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
|
lock.Unlock()
|
||||||
test.OK(t, lock.Unlock())
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue