mirror of https://github.com/borgbackup/borg.git
Merge pull request #1604 from textshell/feature/remote-ratelimit
Add --remote-ratelimit
This commit is contained in:
commit
3874d62264
|
@ -1322,6 +1322,8 @@ class Archiver:
|
||||||
help='set umask to M (local and remote, default: %(default)04o)')
|
help='set umask to M (local and remote, default: %(default)04o)')
|
||||||
common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH',
|
common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH',
|
||||||
help='set remote path to executable (default: "borg")')
|
help='set remote path to executable (default: "borg")')
|
||||||
|
common_group.add_argument('--remote-ratelimit', dest='remote_ratelimit', type=int, metavar='rate',
|
||||||
|
help='set remote network upload rate limit in kiByte/s (default: 0=unlimited)')
|
||||||
common_group.add_argument('--consider-part-files', dest='consider_part_files',
|
common_group.add_argument('--consider-part-files', dest='consider_part_files',
|
||||||
action='store_true', default=False,
|
action='store_true', default=False,
|
||||||
help='treat part files like normal files (e.g. to list/extract them)')
|
help='treat part files like normal files (e.g. to list/extract them)')
|
||||||
|
|
|
@ -6,6 +6,7 @@ import select
|
||||||
import shlex
|
import shlex
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
|
|
||||||
|
@ -25,6 +26,8 @@ BUFSIZE = 10 * 1024 * 1024
|
||||||
|
|
||||||
MAX_INFLIGHT = 100
|
MAX_INFLIGHT = 100
|
||||||
|
|
||||||
|
RATELIMIT_PERIOD = 0.1
|
||||||
|
|
||||||
|
|
||||||
class ConnectionClosed(Error):
|
class ConnectionClosed(Error):
|
||||||
"""Connection closed by remote host"""
|
"""Connection closed by remote host"""
|
||||||
|
@ -166,6 +169,36 @@ class RepositoryServer: # pragma: no cover
|
||||||
return self.repository.id
|
return self.repository.id
|
||||||
|
|
||||||
|
|
||||||
|
class SleepingBandwidthLimiter:
|
||||||
|
def __init__(self, limit):
|
||||||
|
if limit:
|
||||||
|
self.ratelimit = int(limit * RATELIMIT_PERIOD)
|
||||||
|
self.ratelimit_last = time.monotonic()
|
||||||
|
self.ratelimit_quota = self.ratelimit
|
||||||
|
else:
|
||||||
|
self.ratelimit = None
|
||||||
|
|
||||||
|
def write(self, fd, to_send):
|
||||||
|
if self.ratelimit:
|
||||||
|
now = time.monotonic()
|
||||||
|
if self.ratelimit_last + RATELIMIT_PERIOD <= now:
|
||||||
|
self.ratelimit_quota += self.ratelimit
|
||||||
|
if self.ratelimit_quota > 2 * self.ratelimit:
|
||||||
|
self.ratelimit_quota = 2 * self.ratelimit
|
||||||
|
self.ratelimit_last = now
|
||||||
|
if self.ratelimit_quota == 0:
|
||||||
|
tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now
|
||||||
|
time.sleep(tosleep)
|
||||||
|
self.ratelimit_quota += self.ratelimit
|
||||||
|
self.ratelimit_last = time.monotonic()
|
||||||
|
if len(to_send) > self.ratelimit_quota:
|
||||||
|
to_send = to_send[:self.ratelimit_quota]
|
||||||
|
written = os.write(fd, to_send)
|
||||||
|
if self.ratelimit:
|
||||||
|
self.ratelimit_quota -= written
|
||||||
|
return written
|
||||||
|
|
||||||
|
|
||||||
class RemoteRepository:
|
class RemoteRepository:
|
||||||
extra_test_args = []
|
extra_test_args = []
|
||||||
|
|
||||||
|
@ -185,6 +218,8 @@ class RemoteRepository:
|
||||||
self.cache = {}
|
self.cache = {}
|
||||||
self.ignore_responses = set()
|
self.ignore_responses = set()
|
||||||
self.responses = {}
|
self.responses = {}
|
||||||
|
self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0)
|
||||||
|
|
||||||
self.unpacker = msgpack.Unpacker(use_list=False)
|
self.unpacker = msgpack.Unpacker(use_list=False)
|
||||||
self.p = None
|
self.p = None
|
||||||
testing = location.host == '__testsuite__'
|
testing = location.host == '__testsuite__'
|
||||||
|
@ -406,7 +441,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
|
||||||
|
|
||||||
if self.to_send:
|
if self.to_send:
|
||||||
try:
|
try:
|
||||||
self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):]
|
self.to_send = self.to_send[self.ratelimit.write(self.stdin_fd, self.to_send):]
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
# io.write might raise EAGAIN even though select indicates
|
# io.write might raise EAGAIN even though select indicates
|
||||||
# that the fd should be writable
|
# that the fd should be writable
|
||||||
|
|
|
@ -0,0 +1,60 @@
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from ..remote import SleepingBandwidthLimiter
|
||||||
|
|
||||||
|
|
||||||
|
class TestSleepingBandwidthLimiter:
|
||||||
|
def expect_write(self, fd, data):
|
||||||
|
self.expected_fd = fd
|
||||||
|
self.expected_data = data
|
||||||
|
|
||||||
|
def check_write(self, fd, data):
|
||||||
|
assert fd == self.expected_fd
|
||||||
|
assert data == self.expected_data
|
||||||
|
return len(data)
|
||||||
|
|
||||||
|
def test_write_unlimited(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(os, "write", self.check_write)
|
||||||
|
|
||||||
|
it = SleepingBandwidthLimiter(0)
|
||||||
|
self.expect_write(5, b"test")
|
||||||
|
it.write(5, b"test")
|
||||||
|
|
||||||
|
def test_write(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(os, "write", self.check_write)
|
||||||
|
monkeypatch.setattr(time, "monotonic", lambda: now)
|
||||||
|
monkeypatch.setattr(time, "sleep", lambda x: None)
|
||||||
|
|
||||||
|
now = 100
|
||||||
|
|
||||||
|
it = SleepingBandwidthLimiter(100)
|
||||||
|
|
||||||
|
# all fits
|
||||||
|
self.expect_write(5, b"test")
|
||||||
|
it.write(5, b"test")
|
||||||
|
|
||||||
|
# only partial write
|
||||||
|
self.expect_write(5, b"123456")
|
||||||
|
it.write(5, b"1234567890")
|
||||||
|
|
||||||
|
# sleeps
|
||||||
|
self.expect_write(5, b"123456")
|
||||||
|
it.write(5, b"123456")
|
||||||
|
|
||||||
|
# long time interval between writes
|
||||||
|
now += 10
|
||||||
|
self.expect_write(5, b"1")
|
||||||
|
it.write(5, b"1")
|
||||||
|
|
||||||
|
# long time interval between writes, filling up quota
|
||||||
|
now += 10
|
||||||
|
self.expect_write(5, b"1")
|
||||||
|
it.write(5, b"1")
|
||||||
|
|
||||||
|
# long time interval between writes, filling up quota to clip to maximum
|
||||||
|
now += 10
|
||||||
|
self.expect_write(5, b"1")
|
||||||
|
it.write(5, b"1")
|
Loading…
Reference in New Issue