From 84b3295a0d0c9dab9f64d22832cc4ab68b74cb47 Mon Sep 17 00:00:00 2001 From: Martin Hostettler Date: Thu, 15 Sep 2016 22:13:35 +0200 Subject: [PATCH] Archiver,RemoteRepository: Add --remote-ratelimit The --remote-ratelimit option adds a very simple rate limit for the sending data to the remote. Currently implemented by sleeping if the transmission speed is greater than the limit. --- src/borg/archiver.py | 2 ++ src/borg/remote.py | 37 +++++++++++++++++++++- src/borg/testsuite/remote.py | 60 ++++++++++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 src/borg/testsuite/remote.py diff --git a/src/borg/archiver.py b/src/borg/archiver.py index a63513de1..1327980a6 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -1322,6 +1322,8 @@ class Archiver: help='set umask to M (local and remote, default: %(default)04o)') common_group.add_argument('--remote-path', dest='remote_path', metavar='PATH', help='set remote path to executable (default: "borg")') + common_group.add_argument('--remote-ratelimit', dest='remote_ratelimit', type=int, metavar='rate', + help='set remote network upload rate limit in kiByte/s (default: 0=unlimited)') common_group.add_argument('--consider-part-files', dest='consider_part_files', action='store_true', default=False, help='treat part files like normal files (e.g. to list/extract them)') diff --git a/src/borg/remote.py b/src/borg/remote.py index 4632a50a5..294bd40cf 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -6,6 +6,7 @@ import select import shlex import sys import tempfile +import time import traceback from subprocess import Popen, PIPE @@ -25,6 +26,8 @@ BUFSIZE = 10 * 1024 * 1024 MAX_INFLIGHT = 100 +RATELIMIT_PERIOD = 0.1 + class ConnectionClosed(Error): """Connection closed by remote host""" @@ -166,6 +169,36 @@ class RepositoryServer: # pragma: no cover return self.repository.id +class SleepingBandwidthLimiter: + def __init__(self, limit): + if limit: + self.ratelimit = int(limit * RATELIMIT_PERIOD) + self.ratelimit_last = time.monotonic() + self.ratelimit_quota = self.ratelimit + else: + self.ratelimit = None + + def write(self, fd, to_send): + if self.ratelimit: + now = time.monotonic() + if self.ratelimit_last + RATELIMIT_PERIOD <= now: + self.ratelimit_quota += self.ratelimit + if self.ratelimit_quota > 2 * self.ratelimit: + self.ratelimit_quota = 2 * self.ratelimit + self.ratelimit_last = now + if self.ratelimit_quota == 0: + tosleep = self.ratelimit_last + RATELIMIT_PERIOD - now + time.sleep(tosleep) + self.ratelimit_quota += self.ratelimit + self.ratelimit_last = time.monotonic() + if len(to_send) > self.ratelimit_quota: + to_send = to_send[:self.ratelimit_quota] + written = os.write(fd, to_send) + if self.ratelimit: + self.ratelimit_quota -= written + return written + + class RemoteRepository: extra_test_args = [] @@ -185,6 +218,8 @@ class RemoteRepository: self.cache = {} self.ignore_responses = set() self.responses = {} + self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) + self.unpacker = msgpack.Unpacker(use_list=False) self.p = None testing = location.host == '__testsuite__' @@ -406,7 +441,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. if self.to_send: try: - self.to_send = self.to_send[os.write(self.stdin_fd, self.to_send):] + self.to_send = self.to_send[self.ratelimit.write(self.stdin_fd, self.to_send):] except OSError as e: # io.write might raise EAGAIN even though select indicates # that the fd should be writable diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py new file mode 100644 index 000000000..b9eddabd6 --- /dev/null +++ b/src/borg/testsuite/remote.py @@ -0,0 +1,60 @@ +import os +import time + +import pytest + +from ..remote import SleepingBandwidthLimiter + + +class TestSleepingBandwidthLimiter: + def expect_write(self, fd, data): + self.expected_fd = fd + self.expected_data = data + + def check_write(self, fd, data): + assert fd == self.expected_fd + assert data == self.expected_data + return len(data) + + def test_write_unlimited(self, monkeypatch): + monkeypatch.setattr(os, "write", self.check_write) + + it = SleepingBandwidthLimiter(0) + self.expect_write(5, b"test") + it.write(5, b"test") + + def test_write(self, monkeypatch): + monkeypatch.setattr(os, "write", self.check_write) + monkeypatch.setattr(time, "monotonic", lambda: now) + monkeypatch.setattr(time, "sleep", lambda x: None) + + now = 100 + + it = SleepingBandwidthLimiter(100) + + # all fits + self.expect_write(5, b"test") + it.write(5, b"test") + + # only partial write + self.expect_write(5, b"123456") + it.write(5, b"1234567890") + + # sleeps + self.expect_write(5, b"123456") + it.write(5, b"123456") + + # long time interval between writes + now += 10 + self.expect_write(5, b"1") + it.write(5, b"1") + + # long time interval between writes, filling up quota + now += 10 + self.expect_write(5, b"1") + it.write(5, b"1") + + # long time interval between writes, filling up quota to clip to maximum + now += 10 + self.expect_write(5, b"1") + it.write(5, b"1")