diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 41f97b5df..8d34f8ddb 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -25,6 +25,7 @@ import ( "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/retry" "github.com/restic/restic/internal/backend/s3" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/backend/sftp" "github.com/restic/restic/internal/backend/swift" "github.com/restic/restic/internal/cache" @@ -744,8 +745,8 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err) } - // wrap with debug logging - be = logger.New(be) + // wrap with debug logging and connection limiting + be = logger.New(sema.New(be)) // wrap backend if a test specified an inner hook if gopts.backendInnerTestHook != nil { @@ -820,5 +821,5 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend return nil, err } - return logger.New(be), nil + return logger.New(sema.New(be)), nil } diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 4d7a4a57b..82d55960f 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -14,7 +14,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -26,7 +25,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" - "github.com/cenkalti/backoff/v4" ) // Backend stores data on an azure endpoint. @@ -34,7 +32,6 @@ type Backend struct { cfg Config container *azContainer.Client connections uint - sem sema.Semaphore prefix string listMaxItems int layout.Layout @@ -96,16 +93,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.New("no azure authentication information found") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ container: client, cfg: cfg, connections: cfg.Connections, - sem: sem, Layout: &layout.DefaultLayout{ Path: cfg.Prefix, Join: path.Join, @@ -186,14 +177,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - be.sem.GetToken() - debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName) var err error @@ -205,7 +190,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = be.saveLarge(ctx, objName, rd) } - be.sem.ReleaseToken() return err } @@ -297,7 +281,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, objName := be.Filename(h) blockBlobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{ Range: azblob.HTTPRange{ Offset: offset, @@ -306,11 +289,10 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, }) if err != nil { - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(resp.Body, nil), err + return resp.Body, err } // Stat returns information about a blob. @@ -318,9 +300,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, objName := be.Filename(h) blobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() props, err := blobClient.GetProperties(ctx, nil) - be.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") @@ -338,9 +318,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) blob := be.container.NewBlobClient(objName) - be.sem.GetToken() _, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{}) - be.sem.ReleaseToken() if be.IsNotExist(err) { return nil @@ -368,9 +346,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F lister := be.container.NewListBlobsFlatPager(opts) for lister.More() { - be.sem.GetToken() resp, err := lister.NextPage(ctx) - be.sem.ReleaseToken() if err != nil { return err diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 10f1a715b..0827f727b 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -11,12 +11,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/kurin/blazer/b2" "github.com/kurin/blazer/base" ) @@ -28,7 +26,6 @@ type b2Backend struct { cfg Config listMaxItems int layout.Layout - sem sema.Semaphore canDelete bool } @@ -92,11 +89,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend return nil, errors.Wrap(err, "Bucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -106,7 +98,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, canDelete: true, } @@ -134,11 +125,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe return nil, errors.Wrap(err, "NewBucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -148,7 +134,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, } _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) @@ -202,20 +187,18 @@ func (be *b2Backend) IsNotExist(err error) bool { // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - ctx, cancel := context.WithCancel(ctx) - - be.sem.GetToken() - name := be.Layout.Filename(h) obj := be.bucket.Object(name) if offset == 0 && length == 0 { - rd := obj.NewReader(ctx) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewReader(ctx), nil } // pass a negative length to NewRangeReader so that the remainder of the @@ -224,8 +207,7 @@ func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int length = -1 } - rd := obj.NewRangeReader(ctx, offset, int64(length)) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewRangeReader(ctx, offset, int64(length)), nil } // Save stores data in the backend at the handle. @@ -233,15 +215,7 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) - debug.Log("Save %v, name %v", h, name) obj := be.bucket.Object(name) // b2 always requires sha1 checksums for uploaded file parts @@ -262,9 +236,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind // Stat returns information about a blob. func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) obj := be.bucket.Object(name) info, err := obj.Attrs(ctx) @@ -276,9 +247,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI // Remove removes the blob with the given name and type. func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { - be.sem.GetToken() - defer be.sem.ReleaseToken() - // the retry backend will also repeat the remove method up to 10 times for i := 0; i < 3; i++ { obj := be.bucket.Object(be.Filename(h)) @@ -313,20 +281,13 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { return errors.New("failed to delete all file versions") } -type semLocker struct { - sema.Semaphore -} - -func (sm *semLocker) Lock() { sm.GetToken() } -func (sm *semLocker) Unlock() { sm.ReleaseToken() } - // List returns a channel that yields all names of blobs of type t. func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { ctx, cancel := context.WithCancel(ctx) defer cancel() prefix, _ := be.Basedir(t) - iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem})) + iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems)) for iter.Next() { obj := iter.Object() diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index faf8b9858..12458a79c 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -37,7 +36,6 @@ type Backend struct { gcsClient *storage.Client projectID string connections uint - sem sema.Semaphore bucketName string bucket *storage.BucketHandle prefix string @@ -99,16 +97,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.Wrap(err, "getStorageClient") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ gcsClient: gcsClient, projectID: cfg.ProjectID, connections: cfg.Connections, - sem: sem, bucketName: cfg.Bucket, bucket: gcsClient.Bucket(cfg.Bucket), prefix: cfg.Prefix, @@ -203,16 +195,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return err - } - objName := be.Filename(h) - be.sem.GetToken() - - debug.Log("InsertObject(%v, %v)", be.bucketName, objName) - // Set chunk size to zero to disable resumable uploads. // // With a non-zero chunk size (the default is @@ -247,8 +231,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = cerr } - be.sem.ReleaseToken() - if err != nil { return errors.Wrap(err, "service.Objects.Insert") } @@ -263,6 +245,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } @@ -274,27 +259,19 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, objName := be.Filename(h) - be.sem.GetToken() - - ctx, cancel := context.WithCancel(ctx) - r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length)) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(r, cancel), err + return r, err } // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { objName := be.Filename(h) - be.sem.GetToken() attr, err := be.bucket.Object(objName).Attrs(ctx) - be.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") @@ -307,9 +284,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.bucket.Object(objName).Delete(ctx) - be.sem.ReleaseToken() if err == storage.ErrObjectNotExist { err = nil @@ -334,9 +309,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix}) for { - be.sem.GetToken() attrs, err := itr.Next() - be.sem.ReleaseToken() if err == iterator.Done { break } diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index a1f3c6091..ca806f754 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -10,7 +10,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" @@ -22,7 +21,6 @@ import ( // Local is a backend in a local directory. type Local struct { Config - sem sema.Semaphore layout.Layout backend.Modes } @@ -38,11 +36,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return nil, err } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - fi, err := fs.Stat(l.Filename(restic.Handle{Type: restic.ConfigFile})) m := backend.DeriveModesFromFileInfo(fi, err) debug.Log("using (%03O file, %03O dir) permissions", m.File, m.Dir) @@ -50,7 +43,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return &Local{ Config: cfg, Layout: l, - sem: sem, Modes: m, }, nil } @@ -114,10 +106,6 @@ func (b *Local) IsNotExist(err error) bool { // Save stores data in the backend at the handle. func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - finalname := b.Filename(h) dir := filepath.Dir(finalname) @@ -128,9 +116,6 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade } }() - b.sem.GetToken() - defer b.sem.ReleaseToken() - // Create new file with a temporary name. tmpname := filepath.Base(finalname) + "-tmp-" f, err := tempFile(dir, tmpname) @@ -216,40 +201,28 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in } func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - b.sem.GetToken() f, err := fs.Open(b.Filename(h)) if err != nil { - b.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - b.sem.ReleaseToken() _ = f.Close() return nil, err } } - r := b.sem.ReleaseTokenOnClose(f, nil) - if length > 0 { - return backend.LimitReadCloser(r, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return r, nil + return f, nil } // Stat returns information about a blob. func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - b.sem.GetToken() - defer b.sem.ReleaseToken() - fi, err := fs.Stat(b.Filename(h)) if err != nil { return restic.FileInfo{}, errors.WithStack(err) @@ -262,9 +235,6 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err func (b *Local) Remove(ctx context.Context, h restic.Handle) error { fn := b.Filename(h) - b.sem.GetToken() - defer b.sem.ReleaseToken() - // reset read-only flag err := fs.Chmod(fn, 0666) if err != nil && !os.IsPermission(err) { diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 59e89286e..4db4c9821 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -10,12 +10,9 @@ import ( "github.com/cespare/xxhash/v2" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) type memMap map[restic.Handle][]byte @@ -32,19 +29,12 @@ const connectionCount = 2 type MemoryBackend struct { data memMap m sync.Mutex - sem sema.Semaphore } // New returns a new backend that saves all data in a map in memory. func New() *MemoryBackend { - sem, err := sema.New(connectionCount) - if err != nil { - panic(err) - } - be := &MemoryBackend{ data: make(memMap), - sem: sem, } debug.Log("created new memory backend") @@ -59,13 +49,6 @@ func (be *MemoryBackend) IsNotExist(err error) bool { // Save adds new Data to the backend. func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -113,8 +96,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, } func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - - be.sem.GetToken() be.m.Lock() defer be.m.Unlock() @@ -123,21 +104,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length h.Name = "" } - debug.Log("Load %v offset %v len %v", h, offset, length) - - if offset < 0 { - be.sem.ReleaseToken() - return nil, errors.New("offset is negative") - } - if _, ok := be.data[h]; !ok { - be.sem.ReleaseToken() return nil, errNotFound } buf := be.data[h] if offset > int64(len(buf)) { - be.sem.ReleaseToken() return nil, errors.New("offset beyond end of file") } @@ -146,18 +118,11 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length buf = buf[:length] } - return be.sem.ReleaseTokenOnClose(io.NopCloser(bytes.NewReader(buf)), nil), ctx.Err() + return io.NopCloser(bytes.NewReader(buf)), ctx.Err() } // Stat returns information about a file in the backend. func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -176,9 +141,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File // Remove deletes a file from the backend. func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index ad5af1629..a88e26daa 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -12,12 +12,9 @@ import ( "strings" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) // make sure the rest backend implements restic.Backend @@ -27,7 +24,6 @@ var _ restic.Backend = &Backend{} type Backend struct { url *url.URL connections uint - sem sema.Semaphore client http.Client layout.Layout } @@ -40,11 +36,6 @@ const ( // Open opens the REST backend with the given config. func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - // use url without trailing slash for layout url := cfg.URL.String() if url[len(url)-1] == '/' { @@ -56,7 +47,6 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { client: http.Client{Transport: rt}, Layout: &layout.RESTLayout{URL: url, Join: path.Join}, connections: cfg.Connections, - sem: sem, } return be, nil @@ -123,10 +113,6 @@ func (b *Backend) HasAtomicReplace() bool { // Save stores data in the backend at the handle. func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -143,9 +129,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea // let's the server know what's coming. req.ContentLength = rd.Length() - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() var cerr error if resp != nil { @@ -212,18 +196,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - req, err := http.NewRequestWithContext(ctx, "GET", b.Filename(h), nil) if err != nil { return nil, errors.WithStack(err) @@ -237,9 +209,7 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o req.Header.Set("Accept", ContentTypeV2) debug.Log("Load(%v) send range %v", h, byteRange) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { if resp != nil { @@ -264,19 +234,13 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o // Stat returns information about a blob. func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, http.MethodHead, b.Filename(h), nil) if err != nil { return restic.FileInfo{}, errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.WithStack(err) } @@ -309,19 +273,13 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e // Remove removes the blob with the given name and type. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, "DELETE", b.Filename(h), nil) if err != nil { return errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "client.Do") @@ -358,9 +316,7 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "List") diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 872fb0441..79c6453b9 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -13,12 +13,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -26,7 +24,6 @@ import ( // Backend stores data on an S3 endpoint. type Backend struct { client *minio.Client - sem sema.Semaphore cfg Config layout.Layout } @@ -102,14 +99,8 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro return nil, errors.Wrap(err, "minio.New") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ client: client, - sem: sem, cfg: cfg, } @@ -271,15 +262,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - opts := minio.PutObjectOptions{StorageClass: be.cfg.StorageClass} opts.ContentType = "application/octet-stream" // the only option with the high-level api is to let the library handle the checksum computation @@ -301,6 +285,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } @@ -321,18 +308,13 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, return nil, errors.Wrap(err, "SetRange") } - be.sem.GetToken() - ctx, cancel := context.WithCancel(ctx) - coreClient := minio.Core{Client: be.client} rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(rd, cancel), err + return rd, err } // Stat returns information about a blob. @@ -342,17 +324,14 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf opts := minio.GetObjectOptions{} - be.sem.GetToken() obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - be.sem.ReleaseToken() return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") } // make sure that the object is closed properly. defer func() { e := obj.Close() - be.sem.ReleaseToken() if err == nil { err = errors.Wrap(e, "Close") } @@ -370,9 +349,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{}) - be.sem.ReleaseToken() if be.IsNotExist(err) { err = nil diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go new file mode 100644 index 000000000..4b6f55b50 --- /dev/null +++ b/internal/backend/sema/backend.go @@ -0,0 +1,87 @@ +package sema + +import ( + "context" + "io" + + "github.com/cenkalti/backoff/v4" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// make sure that SemaphoreBackend implements restic.Backend +var _ restic.Backend = &SemaphoreBackend{} + +// SemaphoreBackend limits the number of concurrent operations. +type SemaphoreBackend struct { + restic.Backend + sem semaphore +} + +// New creates a backend that limits the concurrent operations on the underlying backend +func New(be restic.Backend) *SemaphoreBackend { + sem, err := newSemaphore(be.Connections()) + if err != nil { + panic(err) + } + + return &SemaphoreBackend{ + Backend: be, + sem: sem, + } +} + +// Save adds new Data to the backend. +func (be *SemaphoreBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Save(ctx, h, rd) +} + +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *SemaphoreBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + if offset < 0 { + return backoff.Permanent(errors.New("offset is negative")) + } + if length < 0 { + return backoff.Permanent(errors.Errorf("invalid length %d", length)) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Load(ctx, h, length, offset, fn) +} + +// Stat returns information about a file in the backend. +func (be *SemaphoreBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + if err := h.Valid(); err != nil { + return restic.FileInfo{}, backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Stat(ctx, h) +} + +// Remove deletes a file from the backend. +func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Remove(ctx, h) +} diff --git a/internal/backend/sema/semaphore.go b/internal/backend/sema/semaphore.go index 7ee912979..c664eef7c 100644 --- a/internal/backend/sema/semaphore.go +++ b/internal/backend/sema/semaphore.go @@ -2,64 +2,30 @@ package sema import ( - "context" - "io" - + "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" ) -// A Semaphore limits access to a restricted resource. -type Semaphore struct { +// A semaphore limits access to a restricted resource. +type semaphore struct { ch chan struct{} } -// New returns a new semaphore with capacity n. -func New(n uint) (Semaphore, error) { +// newSemaphore returns a new semaphore with capacity n. +func newSemaphore(n uint) (semaphore, error) { if n == 0 { - return Semaphore{}, errors.New("capacity must be a positive number") + return semaphore{}, errors.New("capacity must be a positive number") } - return Semaphore{ + return semaphore{ ch: make(chan struct{}, n), }, nil } // GetToken blocks until a Token is available. -func (s Semaphore) GetToken() { s.ch <- struct{}{} } +func (s semaphore) GetToken() { + s.ch <- struct{}{} + debug.Log("acquired token") +} // ReleaseToken returns a token. -func (s Semaphore) ReleaseToken() { <-s.ch } - -// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. -// Before returning the token, cancel, if not nil, will be run -// to free up context resources. -func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { - return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel} -} - -type wrapReader struct { - io.ReadCloser - eofSeen bool - sem Semaphore - cancel context.CancelFunc -} - -func (wr *wrapReader) Read(p []byte) (int, error) { - if wr.eofSeen { // XXX Why do we do this? - return 0, io.EOF - } - - n, err := wr.ReadCloser.Read(p) - if err == io.EOF { - wr.eofSeen = true - } - return n, err -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - if wr.cancel != nil { - wr.cancel() - } - wr.sem.ReleaseToken() - return err -} +func (s semaphore) ReleaseToken() { <-s.ch } diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index afe3fc394..e97a5f9c8 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -15,7 +15,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -35,7 +34,6 @@ type SFTP struct { posixRename bool - sem sema.Semaphore layout.Layout Config backend.Modes @@ -140,11 +138,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) { } func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - + var err error sftp.Layout, err = layout.ParseLayout(ctx, sftp, cfg.Layout, defaultLayout, cfg.Path) if err != nil { return nil, err @@ -158,7 +152,6 @@ func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { sftp.Config = cfg sftp.p = cfg.Path - sftp.sem = sem sftp.Modes = m return sftp, nil } @@ -308,17 +301,10 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader return err } - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - filename := r.Filename(h) tmpFilename := filename + "-restic-temp-" + tempSuffix() dirname := r.Dirname(h) - r.sem.GetToken() - defer r.sem.ReleaseToken() - // create new file f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) @@ -414,52 +400,27 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - io.WriterTo - f func() -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { - r.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - r.sem.ReleaseToken() _ = f.Close() return nil, err } } - // use custom close wrapper to also provide WriteTo() on the wrapper - rd := &wrapReader{ - ReadCloser: f, - WriterTo: f, - f: func() { - r.sem.ReleaseToken() - }, - } - if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(rd, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return rd, nil + return f, nil } // Stat returns information about a blob. @@ -468,13 +429,6 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro return restic.FileInfo{}, err } - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - r.sem.GetToken() - defer r.sem.ReleaseToken() - fi, err := r.c.Lstat(r.Filename(h)) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "Lstat") @@ -489,9 +443,6 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { return err } - r.sem.GetToken() - defer r.sem.ReleaseToken() - return r.c.Remove(r.Filename(h)) } @@ -501,9 +452,7 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) for { - r.sem.GetToken() ok := walker.Step() - r.sem.ReleaseToken() if !ok { break } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 99940df5c..dbf4ba0d1 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -15,12 +15,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/ncw/swift/v2" ) @@ -28,7 +26,6 @@ import ( type beSwift struct { conn *swift.Connection connections uint - sem sema.Semaphore container string // Container name prefix string // Prefix of object names in the container layout.Layout @@ -42,11 +39,6 @@ var _ restic.Backend = &beSwift{} func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { debug.Log("config %#v", cfg) - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &beSwift{ conn: &swift.Connection{ UserName: cfg.UserName, @@ -72,7 +64,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Transport: rt, }, connections: cfg.Connections, - sem: sem, container: cfg.Container, prefix: cfg.Prefix, Layout: &layout.DefaultLayout{ @@ -159,27 +150,17 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, debug.Log("Load(%v) send range %v", h, headers["Range"]) } - be.sem.GetToken() obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers) if err != nil { - be.sem.ReleaseToken() return nil, errors.Wrap(err, "conn.ObjectOpen") } - return be.sem.ReleaseTokenOnClose(obj, nil), nil + return obj, nil } // Save stores data in the backend at the handle. func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - - be.sem.GetToken() - defer be.sem.ReleaseToken() - encoding := "binary/octet-stream" debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) @@ -196,9 +177,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - obj, _, err := be.conn.Object(ctx, be.container, objName) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "conn.Object") @@ -211,9 +189,6 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - err := be.conn.ObjectDelete(ctx, be.container, objName) return errors.Wrap(err, "conn.ObjectDelete") } @@ -226,9 +201,7 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F err := be.conn.ObjectsWalk(ctx, be.container, &swift.ObjectsOpts{Prefix: prefix}, func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) { - be.sem.GetToken() newObjects, err := be.conn.Objects(ctx, be.container, opts) - be.sem.ReleaseToken() if err != nil { return nil, errors.Wrap(err, "conn.ObjectNames") diff --git a/internal/backend/utils.go b/internal/backend/utils.go index 1c1607e04..bf8a7ad6d 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -6,7 +6,6 @@ import ( "fmt" "io" - "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -63,15 +62,7 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error), fn func(rd io.Reader) error) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - if offset < 0 { - return errors.New("offset is negative") - } - if length < 0 { - return errors.Errorf("invalid length %d", length) - } + rd, err := openReader(ctx, h, length, offset) if err != nil { return err