From 0b258cc05471f77aa2a77fd0665c35fae36764c1 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 19:45:52 +0200 Subject: [PATCH 1/3] backends: clean reader closing --- internal/backend/azure/azure.go | 22 +--------------------- internal/backend/gs/gs.go | 25 ++++--------------------- internal/backend/s3/s3.go | 25 ++++--------------------- 3 files changed, 9 insertions(+), 63 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index b282a49b7..b20579f2c 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -226,18 +226,6 @@ func (be *Backend) saveLarge(ctx context.Context, objName string, rd restic.Rewi return errors.Wrap(err, "PutBlockList") } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - f func() -} - -func (wr wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { @@ -278,15 +266,7 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, return nil, err } - closeRd := wrapReader{ - ReadCloser: rd, - f: func() { - debug.Log("Close()") - be.sem.ReleaseToken() - }, - } - - return closeRd, err + return be.sem.ReleaseTokenOnClose(rd, nil), err } // Stat returns information about a blob. diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 130c86112..443de70e5 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -263,18 +263,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe return nil } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - f func() -} - -func (wr wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { @@ -303,21 +291,16 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, be.sem.GetToken() + ctx, cancel := context.WithCancel(ctx) + r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length)) if err != nil { + cancel() be.sem.ReleaseToken() return nil, err } - closeRd := wrapReader{ - ReadCloser: r, - f: func() { - debug.Log("Close()") - be.sem.ReleaseToken() - }, - } - - return closeRd, err + return be.sem.ReleaseTokenOnClose(r, cancel), err } // Stat returns information about a blob. diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index be1830975..0d7c74bf4 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -301,18 +301,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe return errors.Wrap(err, "client.PutObject") } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - f func() -} - -func (wr wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { @@ -350,22 +338,17 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, } be.sem.GetToken() + ctx, cancel := context.WithCancel(ctx) + coreClient := minio.Core{Client: be.client} rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { + cancel() be.sem.ReleaseToken() return nil, err } - closeRd := wrapReader{ - ReadCloser: rd, - f: func() { - debug.Log("Close()") - be.sem.ReleaseToken() - }, - } - - return closeRd, err + return be.sem.ReleaseTokenOnClose(rd, cancel), err } // Stat returns information about a blob. From cd783358d33b59412e3e8ad55b02fcbf83c17d0c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 19:50:00 +0200 Subject: [PATCH 2/3] local: Limit concurrent backend operations Use a limit of 2 similar to the filereader concurrency in the archiver. --- internal/backend/local/config.go | 17 +++++- internal/backend/local/layout_test.go | 5 +- internal/backend/local/local.go | 55 +++++++++++++++---- internal/backend/local/local_internal_test.go | 2 +- internal/backend/local/local_test.go | 3 +- internal/backend/location/location_test.go | 36 ++++++++---- internal/repository/testing.go | 2 +- 7 files changed, 88 insertions(+), 32 deletions(-) diff --git a/internal/backend/local/config.go b/internal/backend/local/config.go index 13b7f67aa..e59d1f693 100644 --- a/internal/backend/local/config.go +++ b/internal/backend/local/config.go @@ -11,6 +11,15 @@ import ( type Config struct { Path string Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"` + + Connections uint `option:"connections" help:"set a limit for the number of concurrent operations (default: 2)"` +} + +// NewConfig returns a new config with default options applied. +func NewConfig() Config { + return Config{ + Connections: 2, + } } func init() { @@ -18,10 +27,12 @@ func init() { } // ParseConfig parses a local backend config. -func ParseConfig(cfg string) (interface{}, error) { - if !strings.HasPrefix(cfg, "local:") { +func ParseConfig(s string) (interface{}, error) { + if !strings.HasPrefix(s, "local:") { return nil, errors.New(`invalid format, prefix "local" not found`) } - return Config{Path: cfg[6:]}, nil + cfg := NewConfig() + cfg.Path = s[6:] + return cfg, nil } diff --git a/internal/backend/local/layout_test.go b/internal/backend/local/layout_test.go index 5b1135253..9da702877 100644 --- a/internal/backend/local/layout_test.go +++ b/internal/backend/local/layout_test.go @@ -37,8 +37,9 @@ func TestLayout(t *testing.T) { repo := filepath.Join(path, "repo") be, err := Open(context.TODO(), Config{ - Path: repo, - Layout: test.layout, + Path: repo, + Layout: test.layout, + Connections: 2, }) if err != nil { t.Fatal(err) diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index 833bde26f..0ae023b8e 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -22,6 +22,7 @@ import ( // Local is a backend in a local directory. type Local struct { Config + sem *backend.Semaphore backend.Layout } @@ -30,15 +31,28 @@ var _ restic.Backend = &Local{} const defaultLayout = "default" -// Open opens the local backend as specified by config. -func Open(ctx context.Context, cfg Config) (*Local, error) { - debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout) +func open(ctx context.Context, cfg Config) (*Local, error) { l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path) if err != nil { return nil, err } - return &Local{Config: cfg, Layout: l}, nil + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + + return &Local{ + Config: cfg, + Layout: l, + sem: sem, + }, nil +} + +// Open opens the local backend as specified by config. +func Open(ctx context.Context, cfg Config) (*Local, error) { + debug.Log("open local backend at %v (layout %q)", cfg.Path, cfg.Layout) + return open(ctx, cfg) } // Create creates all the necessary files and directories for a new local @@ -46,16 +60,11 @@ func Open(ctx context.Context, cfg Config) (*Local, error) { func Create(ctx context.Context, cfg Config) (*Local, error) { debug.Log("create local backend at %v (layout %q)", cfg.Path, cfg.Layout) - l, err := backend.ParseLayout(ctx, &backend.LocalFilesystem{}, cfg.Layout, defaultLayout, cfg.Path) + be, err := open(ctx, cfg) if err != nil { return nil, err } - be := &Local{ - Config: cfg, - Layout: l, - } - // test if config file already exists _, err = fs.Lstat(be.Filename(restic.Handle{Type: restic.ConfigFile})) if err == nil { @@ -73,6 +82,10 @@ func Create(ctx context.Context, cfg Config) (*Local, error) { return be, nil } +func (b *Local) Connections() uint { + return b.Config.Connections +} + // Location returns this backend's location (the directory name). func (b *Local) Location() string { return b.Path @@ -105,6 +118,9 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade } }() + b.sem.GetToken() + defer b.sem.ReleaseToken() + // Create new file with a temporary name. tmpname := filepath.Base(finalname) + "-tmp-" f, err := tempFile(dir, tmpname) @@ -199,24 +215,29 @@ func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, off return nil, errors.New("offset is negative") } + b.sem.GetToken() f, err := fs.Open(b.Filename(h)) if err != nil { + b.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { + b.sem.ReleaseToken() _ = f.Close() return nil, err } } + r := b.sem.ReleaseTokenOnClose(f, nil) + if length > 0 { - return backend.LimitReadCloser(f, int64(length)), nil + return backend.LimitReadCloser(r, int64(length)), nil } - return f, nil + return r, nil } // Stat returns information about a blob. @@ -226,6 +247,9 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err return restic.FileInfo{}, backoff.Permanent(err) } + b.sem.GetToken() + defer b.sem.ReleaseToken() + fi, err := fs.Stat(b.Filename(h)) if err != nil { return restic.FileInfo{}, errors.WithStack(err) @@ -237,6 +261,10 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err // Test returns true if a blob of the given type and name exists in the backend. func (b *Local) Test(ctx context.Context, h restic.Handle) (bool, error) { debug.Log("Test %v", h) + + b.sem.GetToken() + defer b.sem.ReleaseToken() + _, err := fs.Stat(b.Filename(h)) if err != nil { if b.IsNotExist(err) { @@ -253,6 +281,9 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error { debug.Log("Remove %v", h) fn := b.Filename(h) + b.sem.GetToken() + defer b.sem.ReleaseToken() + // reset read-only flag err := fs.Chmod(fn, 0666) if err != nil && !os.IsPermission(err) { diff --git a/internal/backend/local/local_internal_test.go b/internal/backend/local/local_internal_test.go index 8d2ec08c3..8de3d3c2f 100644 --- a/internal/backend/local/local_internal_test.go +++ b/internal/backend/local/local_internal_test.go @@ -27,7 +27,7 @@ func TestNoSpacePermanent(t *testing.T) { dir, cleanup := rtest.TempDir(t) defer cleanup() - be, err := Open(context.Background(), Config{Path: dir}) + be, err := Open(context.Background(), Config{Path: dir, Connections: 2}) rtest.OK(t, err) defer func() { rtest.OK(t, be.Close()) diff --git a/internal/backend/local/local_test.go b/internal/backend/local/local_test.go index 70b5e771e..75c3b8ed7 100644 --- a/internal/backend/local/local_test.go +++ b/internal/backend/local/local_test.go @@ -25,7 +25,8 @@ func newTestSuite(t testing.TB) *test.Suite { t.Logf("create new backend at %v", dir) cfg := local.Config{ - Path: dir, + Path: dir, + Connections: 2, } return cfg, nil }, diff --git a/internal/backend/location/location_test.go b/internal/backend/location/location_test.go index 3160a2af7..ded9450e9 100644 --- a/internal/backend/location/location_test.go +++ b/internal/backend/location/location_test.go @@ -30,7 +30,8 @@ var parseTests = []struct { "local:/srv/repo", Location{Scheme: "local", Config: local.Config{ - Path: "/srv/repo", + Path: "/srv/repo", + Connections: 2, }, }, }, @@ -38,7 +39,8 @@ var parseTests = []struct { "local:dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "dir1/dir2", + Path: "dir1/dir2", + Connections: 2, }, }, }, @@ -46,7 +48,8 @@ var parseTests = []struct { "local:dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "dir1/dir2", + Path: "dir1/dir2", + Connections: 2, }, }, }, @@ -54,7 +57,8 @@ var parseTests = []struct { "dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "dir1/dir2", + Path: "dir1/dir2", + Connections: 2, }, }, }, @@ -62,7 +66,8 @@ var parseTests = []struct { "/dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "/dir1/dir2", + Path: "/dir1/dir2", + Connections: 2, }, }, }, @@ -70,7 +75,8 @@ var parseTests = []struct { "local:../dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "../dir1/dir2", + Path: "../dir1/dir2", + Connections: 2, }, }, }, @@ -78,7 +84,8 @@ var parseTests = []struct { "/dir1/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "/dir1/dir2", + Path: "/dir1/dir2", + Connections: 2, }, }, }, @@ -86,7 +93,8 @@ var parseTests = []struct { "/dir1:foobar/dir2", Location{Scheme: "local", Config: local.Config{ - Path: "/dir1:foobar/dir2", + Path: "/dir1:foobar/dir2", + Connections: 2, }, }, }, @@ -94,7 +102,8 @@ var parseTests = []struct { `\dir1\foobar\dir2`, Location{Scheme: "local", Config: local.Config{ - Path: `\dir1\foobar\dir2`, + Path: `\dir1\foobar\dir2`, + Connections: 2, }, }, }, @@ -102,7 +111,8 @@ var parseTests = []struct { `c:\dir1\foobar\dir2`, Location{Scheme: "local", Config: local.Config{ - Path: `c:\dir1\foobar\dir2`, + Path: `c:\dir1\foobar\dir2`, + Connections: 2, }, }, }, @@ -110,7 +120,8 @@ var parseTests = []struct { `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`, Location{Scheme: "local", Config: local.Config{ - Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`, + Path: `C:\Users\appveyor\AppData\Local\Temp\1\restic-test-879453535\repo`, + Connections: 2, }, }, }, @@ -118,7 +129,8 @@ var parseTests = []struct { `c:/dir1/foobar/dir2`, Location{Scheme: "local", Config: local.Config{ - Path: `c:/dir1/foobar/dir2`, + Path: `c:/dir1/foobar/dir2`, + Connections: 2, }, }, }, diff --git a/internal/repository/testing.go b/internal/repository/testing.go index 899b8a7e3..d752e107e 100644 --- a/internal/repository/testing.go +++ b/internal/repository/testing.go @@ -93,7 +93,7 @@ func TestRepository(t testing.TB) (r restic.Repository, cleanup func()) { // TestOpenLocal opens a local repository. func TestOpenLocal(t testing.TB, dir string) (r restic.Repository) { - be, err := local.Open(context.TODO(), local.Config{Path: dir}) + be, err := local.Open(context.TODO(), local.Config{Path: dir, Connections: 2}) if err != nil { t.Fatal(err) } From ece06f125ef7bd07a49560abb8f3aff28f60a4ee Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Aug 2021 19:56:59 +0200 Subject: [PATCH 3/3] sftp: Limit concurrent backend operations --- internal/backend/location/location_test.go | 28 +++++----- internal/backend/sftp/config.go | 22 +++++--- internal/backend/sftp/config_test.go | 30 +++++------ internal/backend/sftp/layout_test.go | 7 +-- internal/backend/sftp/sftp.go | 61 ++++++++++++++++++++-- internal/backend/sftp/sftp_test.go | 5 +- 6 files changed, 112 insertions(+), 41 deletions(-) diff --git a/internal/backend/location/location_test.go b/internal/backend/location/location_test.go index ded9450e9..809379850 100644 --- a/internal/backend/location/location_test.go +++ b/internal/backend/location/location_test.go @@ -138,9 +138,10 @@ var parseTests = []struct { "sftp:user@host:/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "/srv/repo", + User: "user", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, @@ -148,9 +149,10 @@ var parseTests = []struct { "sftp:host:/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "", - Host: "host", - Path: "/srv/repo", + User: "", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, @@ -158,9 +160,10 @@ var parseTests = []struct { "sftp://user@host/srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "srv/repo", + User: "user", + Host: "host", + Path: "srv/repo", + Connections: 5, }, }, }, @@ -168,9 +171,10 @@ var parseTests = []struct { "sftp://user@host//srv/repo", Location{Scheme: "sftp", Config: sftp.Config{ - User: "user", - Host: "host", - Path: "/srv/repo", + User: "user", + Host: "host", + Path: "/srv/repo", + Connections: 5, }, }, }, diff --git a/internal/backend/sftp/config.go b/internal/backend/sftp/config.go index d5e0e5182..3b3d622a0 100644 --- a/internal/backend/sftp/config.go +++ b/internal/backend/sftp/config.go @@ -15,6 +15,15 @@ type Config struct { Layout string `option:"layout" help:"use this backend directory layout (default: auto-detect)"` Command string `option:"command" help:"specify command to create sftp connection"` + + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` +} + +// NewConfig returns a new config with default options applied. +func NewConfig() Config { + return Config{ + Connections: 5, + } } func init() { @@ -75,10 +84,11 @@ func ParseConfig(s string) (interface{}, error) { return nil, errors.Fatal("sftp path starts with the tilde (~) character, that fails for most sftp servers.\nUse a relative directory, most servers interpret this as relative to the user's home directory.") } - return Config{ - User: user, - Host: host, - Port: port, - Path: p, - }, nil + cfg := NewConfig() + cfg.User = user + cfg.Host = host + cfg.Port = port + cfg.Path = p + + return cfg, nil } diff --git a/internal/backend/sftp/config_test.go b/internal/backend/sftp/config_test.go index d785a4113..3772c038b 100644 --- a/internal/backend/sftp/config_test.go +++ b/internal/backend/sftp/config_test.go @@ -11,68 +11,68 @@ var configTests = []struct { // first form, user specified sftp://user@host/dir { "sftp://user@host/dir/subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, { "sftp://host/dir/subdir", - Config{Host: "host", Path: "dir/subdir"}, + Config{Host: "host", Path: "dir/subdir", Connections: 5}, }, { "sftp://host//dir/subdir", - Config{Host: "host", Path: "/dir/subdir"}, + Config{Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp://host:10022//dir/subdir", - Config{Host: "host", Port: "10022", Path: "/dir/subdir"}, + Config{Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5}, }, { "sftp://user@host:10022//dir/subdir", - Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir"}, + Config{User: "user", Host: "host", Port: "10022", Path: "/dir/subdir", Connections: 5}, }, { "sftp://user@host/dir/subdir/../other", - Config{User: "user", Host: "host", Path: "dir/other"}, + Config{User: "user", Host: "host", Path: "dir/other", Connections: 5}, }, { "sftp://user@host/dir///subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, // IPv6 address. { "sftp://user@[::1]/dir", - Config{User: "user", Host: "::1", Path: "dir"}, + Config{User: "user", Host: "::1", Path: "dir", Connections: 5}, }, // IPv6 address with port. { "sftp://user@[::1]:22/dir", - Config{User: "user", Host: "::1", Port: "22", Path: "dir"}, + Config{User: "user", Host: "::1", Port: "22", Path: "dir", Connections: 5}, }, // second form, user specified sftp:user@host:/dir { "sftp:user@host:/dir/subdir", - Config{User: "user", Host: "host", Path: "/dir/subdir"}, + Config{User: "user", Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp:user@domain@host:/dir/subdir", - Config{User: "user@domain", Host: "host", Path: "/dir/subdir"}, + Config{User: "user@domain", Host: "host", Path: "/dir/subdir", Connections: 5}, }, { "sftp:host:../dir/subdir", - Config{Host: "host", Path: "../dir/subdir"}, + Config{Host: "host", Path: "../dir/subdir", Connections: 5}, }, { "sftp:user@host:dir/subdir:suffix", - Config{User: "user", Host: "host", Path: "dir/subdir:suffix"}, + Config{User: "user", Host: "host", Path: "dir/subdir:suffix", Connections: 5}, }, { "sftp:user@host:dir/subdir/../other", - Config{User: "user", Host: "host", Path: "dir/other"}, + Config{User: "user", Host: "host", Path: "dir/other", Connections: 5}, }, { "sftp:user@host:dir///subdir", - Config{User: "user", Host: "host", Path: "dir/subdir"}, + Config{User: "user", Host: "host", Path: "dir/subdir", Connections: 5}, }, } diff --git a/internal/backend/sftp/layout_test.go b/internal/backend/sftp/layout_test.go index 0d0214669..3b654b1bb 100644 --- a/internal/backend/sftp/layout_test.go +++ b/internal/backend/sftp/layout_test.go @@ -43,9 +43,10 @@ func TestLayout(t *testing.T) { repo := filepath.Join(path, "repo") be, err := sftp.Open(context.TODO(), sftp.Config{ - Command: fmt.Sprintf("%q -e", sftpServer), - Path: repo, - Layout: test.layout, + Command: fmt.Sprintf("%q -e", sftpServer), + Path: repo, + Layout: test.layout, + Connections: 5, }) if err != nil { t.Fatal(err) diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 3e803a0f4..ad38e19ab 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -31,6 +31,7 @@ type SFTP struct { cmd *exec.Cmd result <-chan error + sem *backend.Semaphore backend.Layout Config } @@ -116,6 +117,11 @@ func (r *SFTP) clientError() error { func Open(ctx context.Context, cfg Config) (*SFTP, error) { debug.Log("open backend with config %#v", cfg) + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + cmd, args, err := buildSSHCommand(cfg) if err != nil { return nil, err @@ -136,6 +142,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) { sftp.Config = cfg sftp.p = cfg.Path + sftp.sem = sem return sftp, nil } @@ -238,6 +245,10 @@ func Create(ctx context.Context, cfg Config) (*SFTP, error) { return Open(ctx, cfg) } +func (r *SFTP) Connections() uint { + return r.Config.Connections +} + // Location returns this backend's location (the directory name). func (r *SFTP) Location() string { return r.p @@ -280,6 +291,9 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader tmpFilename := filename + "-restic-temp-" + tempSuffix() dirname := r.Dirname(h) + r.sem.GetToken() + defer r.sem.ReleaseToken() + // create new file f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) @@ -371,6 +385,19 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) } +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + io.WriterTo + f func() +} + +func (wr *wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} + func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { @@ -381,26 +408,38 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs return nil, errors.New("offset is negative") } + r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { + r.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { + r.sem.ReleaseToken() _ = f.Close() return nil, err } } + // use custom close wrapper to also provide WriteTo() on the wrapper + rd := &wrapReader{ + ReadCloser: f, + WriterTo: f, + f: func() { + r.sem.ReleaseToken() + }, + } + if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(f, int64(length)), nil + return backend.LimitReadCloser(rd, int64(length)), nil } - return f, nil + return rd, nil } // Stat returns information about a blob. @@ -414,6 +453,9 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro return restic.FileInfo{}, backoff.Permanent(err) } + r.sem.GetToken() + defer r.sem.ReleaseToken() + fi, err := r.c.Lstat(r.Filename(h)) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "Lstat") @@ -429,6 +471,9 @@ func (r *SFTP) Test(ctx context.Context, h restic.Handle) (bool, error) { return false, err } + r.sem.GetToken() + defer r.sem.ReleaseToken() + _, err := r.c.Lstat(r.Filename(h)) if os.IsNotExist(errors.Cause(err)) { return false, nil @@ -448,6 +493,9 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { return err } + r.sem.GetToken() + defer r.sem.ReleaseToken() + return r.c.Remove(r.Filename(h)) } @@ -458,7 +506,14 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) - for walker.Step() { + for { + r.sem.GetToken() + ok := walker.Step() + r.sem.ReleaseToken() + if !ok { + break + } + if walker.Err() != nil { if r.IsNotExist(walker.Err()) { debug.Log("ignoring non-existing directory") diff --git a/internal/backend/sftp/sftp_test.go b/internal/backend/sftp/sftp_test.go index 61bc49dc8..f0573dcb5 100644 --- a/internal/backend/sftp/sftp_test.go +++ b/internal/backend/sftp/sftp_test.go @@ -42,8 +42,9 @@ func newTestSuite(t testing.TB) *test.Suite { t.Logf("create new backend at %v", dir) cfg := sftp.Config{ - Path: dir, - Command: fmt.Sprintf("%q -e", sftpServer), + Path: dir, + Command: fmt.Sprintf("%q -e", sftpServer), + Connections: 5, } return cfg, nil },