mirror of
https://github.com/restic/restic.git
synced 2024-12-25 01:06:39 +00:00
Have number of connections limited by channel
Removes previous limit of 1 connection
This commit is contained in:
parent
2350419f59
commit
aaae7f33d3
1 changed files with 16 additions and 9 deletions
|
@ -5,7 +5,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
|
|
||||||
"github.com/mitchellh/goamz/aws"
|
"github.com/mitchellh/goamz/aws"
|
||||||
"github.com/mitchellh/goamz/s3"
|
"github.com/mitchellh/goamz/s3"
|
||||||
|
@ -14,6 +13,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxKeysInList = 1000
|
const maxKeysInList = 1000
|
||||||
|
const connLimit = 10
|
||||||
|
|
||||||
func s3path(t backend.Type, name string) string {
|
func s3path(t backend.Type, name string) string {
|
||||||
if t == backend.Config {
|
if t == backend.Config {
|
||||||
|
@ -23,18 +23,23 @@ func s3path(t backend.Type, name string) string {
|
||||||
}
|
}
|
||||||
|
|
||||||
type S3 struct {
|
type S3 struct {
|
||||||
bucket *s3.Bucket
|
bucket *s3.Bucket
|
||||||
mput sync.Mutex
|
connChan chan struct{}
|
||||||
path string
|
path string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a backend using an S3 bucket object
|
// Open a backend using an S3 bucket object
|
||||||
func OpenS3Bucket(bucket *s3.Bucket, bucketname string) *S3 {
|
func OpenS3Bucket(bucket *s3.Bucket, bucketname string) *S3 {
|
||||||
return &S3{bucket: bucket, path: bucketname}
|
connChan := make(chan struct{}, connLimit)
|
||||||
|
for i := 0; i < connLimit; i++ {
|
||||||
|
connChan <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &S3{bucket: bucket, path: bucketname, connChan: connChan}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens the s3 backend at bucket and region.
|
// Open opens the s3 backend at bucket and region.
|
||||||
func Open(regionname, bucketname string) (*S3, error) {
|
func Open(regionname, bucketname string) (backend.Backend, error) {
|
||||||
auth, err := aws.EnvAuth()
|
auth, err := aws.EnvAuth()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -42,7 +47,7 @@ func Open(regionname, bucketname string) (*S3, error) {
|
||||||
|
|
||||||
client := s3.New(auth, aws.Regions[regionname])
|
client := s3.New(auth, aws.Regions[regionname])
|
||||||
|
|
||||||
return &S3{bucket: client.Bucket(bucketname), path: bucketname}, nil
|
return OpenS3Bucket(client.Bucket(bucketname), bucketname), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Location returns this backend's location (the bucket name).
|
// Location returns this backend's location (the bucket name).
|
||||||
|
@ -94,9 +99,9 @@ func (bb *s3Blob) Finalize(t backend.Type, name string) error {
|
||||||
return errors.New("key already exists!")
|
return errors.New("key already exists!")
|
||||||
}
|
}
|
||||||
|
|
||||||
bb.b.mput.Lock()
|
<-bb.b.connChan
|
||||||
err = bb.b.bucket.Put(path, bb.buf.Bytes(), "binary/octet-stream", "private")
|
err = bb.b.bucket.Put(path, bb.buf.Bytes(), "binary/octet-stream", "private")
|
||||||
bb.b.mput.Unlock()
|
bb.b.connChan <- struct{}{}
|
||||||
bb.buf.Reset()
|
bb.buf.Reset()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -118,7 +123,9 @@ func (b *S3) get(t backend.Type, name string) (*s3Blob, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
path := s3path(t, name)
|
path := s3path(t, name)
|
||||||
|
<-b.connChan
|
||||||
data, err := b.bucket.Get(path)
|
data, err := b.bucket.Get(path)
|
||||||
|
b.connChan <- struct{}{}
|
||||||
blob.buf = bytes.NewBuffer(data)
|
blob.buf = bytes.NewBuffer(data)
|
||||||
return blob, err
|
return blob, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue