diff --git a/docs/usage/common-options.rst.inc b/docs/usage/common-options.rst.inc index 6a2be732..10966b69 100644 --- a/docs/usage/common-options.rst.inc +++ b/docs/usage/common-options.rst.inc @@ -14,6 +14,7 @@ --umask M set umask to M (local only, default: 0077) --remote-path PATH use PATH as borg executable on the remote (default: "borg") --remote-ratelimit RATE set remote network upload rate limit in kiByte/s (default: 0=unlimited) +--remote-buffer UPLOAD_BUFFER set upload buffer size in MiB. (default: 0=no buffer) --consider-part-files treat part files like normal files (e.g. to list/extract them) --debug-profile FILE Write execution profile in Borg format into FILE. For local use a Python-compatible file can be generated by suffixing FILE with ".pyprof". --rsh RSH Use this command to connect to the 'borg serve' process (default: 'ssh') diff --git a/scripts/shell_completions/zsh/_borg b/scripts/shell_completions/zsh/_borg index b6a91a84..d97fc3b1 100644 --- a/scripts/shell_completions/zsh/_borg +++ b/scripts/shell_completions/zsh/_borg @@ -650,6 +650,7 @@ __borg_setup_common_options() { '--umask=[set umask to M (local only, default: 0077)]:M' '--remote-path=[set remote path to executable (default: "borg")]: :_cmdstring' '--remote-ratelimit=[set remote network upload rate limit in kiByte/s (default: 0=unlimited)]: : _borg_guard_unsigned_number "RATE"' + '--remote-buffer=[set upload buffer size in MiB. (default: 0=no buffer)]: : _borg_guard_unsigned_number "UPLOAD_BUFFER"' '--consider-part-files[treat part files like normal files (e.g. to list/extract them)]' '--debug-profile=[write execution profile in Borg format into FILE]:FILE:_files' '--rsh=[use COMMAND instead of ssh]: :_cmdstring' diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 651dcf3b..5ab053c0 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -2708,6 +2708,8 @@ class Archiver: help='use PATH as borg executable on the remote (default: "borg")') add_common_option('--remote-ratelimit', metavar='RATE', dest='remote_ratelimit', type=int, help='set remote network upload rate limit in kiByte/s (default: 0=unlimited)') + add_common_option('--remote-buffer', metavar='UPLOAD_BUFFER', dest='remote_buffer', type=int, + help='set upload buffer size in MiB. (default: 0=no buffer)') add_common_option('--consider-part-files', dest='consider_part_files', action='store_true', help='treat part files like normal files (e.g. to list/extract them)') add_common_option('--debug-profile', metavar='FILE', dest='debug_profile', default=None, diff --git a/src/borg/helpers/datastruct.py b/src/borg/helpers/datastruct.py index 1650d3cd..31192a8c 100644 --- a/src/borg/helpers/datastruct.py +++ b/src/borg/helpers/datastruct.py @@ -49,3 +49,83 @@ class Buffer: if size is not None: self.resize(size, init) return self.buffer + + +class EfficientCollectionQueue: + """ + An efficient FIFO queue that splits received elements into chunks. + """ + + class SizeUnderflow(Error): + """Could not pop_front first {} elements, collection only has {} elements..""" + + def __init__(self, split_size, member_type): + """ + Initializes empty queue. + Requires split_size to define maximum chunk size. + Requires member_type to be type defining what base collection looks like. + """ + self.buffers = [] + self.size = 0 + self.split_size = split_size + self.member_type = member_type + + def peek_front(self): + """ + Returns first chunk from queue without removing it. + Returned collection will have between 1 and split_size length. + Returns empty collection when nothing is queued. + """ + if not self.buffers: + return self.member_type() + buffer = self.buffers[0] + return buffer + + def pop_front(self, size): + """ + Removes first size elements from queue. + Throws if requested removal size is larger than whole queue. + """ + if size > self.size: + raise EfficientCollectionQueue.SizeUnderflow(size, self.size) + while size > 0: + buffer = self.buffers[0] + to_remove = min(size, len(buffer)) + buffer = buffer[to_remove:] + if buffer: + self.buffers[0] = buffer + else: + del self.buffers[0] + size -= to_remove + self.size -= to_remove + + def push_back(self, data): + """ + Adds data at end of queue. + Takes care to chunk data into split_size sized elements. + """ + if not self.buffers: + self.buffers = [self.member_type()] + while data: + buffer = self.buffers[-1] + if len(buffer) >= self.split_size: + buffer = self.member_type() + self.buffers.append(buffer) + + to_add = min(len(data), self.split_size - len(buffer)) + buffer += data[:to_add] + data = data[to_add:] + self.buffers[-1] = buffer + self.size += to_add + + def __len__(self): + """ + Current queue length for all elements in all chunks. + """ + return self.size + + def __bool__(self): + """ + Returns true if queue isn't empty. + """ + return self.size != 0 diff --git a/src/borg/remote.py b/src/borg/remote.py index b23b9153..ab4d434d 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -32,6 +32,7 @@ from .helpers import msgpack from .repository import Repository from .version import parse_version, format_version from .algorithms.checksums import xxh64 +from .helpers.datastruct import EfficientCollectionQueue logger = create_logger(__name__) @@ -535,7 +536,7 @@ class RemoteRepository: self.msgid = 0 self.rx_bytes = 0 self.tx_bytes = 0 - self.to_send = b'' + self.to_send = EfficientCollectionQueue(1024 * 1024, bytes) self.stderr_received = b'' # incomplete stderr line bytes received (no \n yet) self.chunkid_to_msgids = {} self.ignore_responses = set() @@ -543,6 +544,7 @@ class RemoteRepository: self.async_responses = {} self.shutdown_time = None self.ratelimit = SleepingBandwidthLimiter(args.remote_ratelimit * 1024 if args and args.remote_ratelimit else 0) + self.upload_buffer_size_limit = args.remote_buffer * 1024 * 1024 if args and args.remote_buffer else 0 self.unpacker = get_limited_unpacker('client') self.server_version = parse_version('1.0.8') # fallback version if server is too old to send version information self.p = None @@ -711,6 +713,19 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. if not calls and cmd != 'async_responses': return + def send_buffer(): + if self.to_send: + try: + written = self.ratelimit.write(self.stdin_fd, self.to_send.peek_front()) + self.tx_bytes += written + self.to_send.pop_front(written) + except OSError as e: + # io.write might raise EAGAIN even though select indicates + # that the fd should be writable. + # EWOULDBLOCK is added for defensive programming sake. + if e.errno not in [errno.EAGAIN, errno.EWOULDBLOCK]: + raise + def pop_preload_msgid(chunkid): msgid = self.chunkid_to_msgids[chunkid].pop(0) if not self.chunkid_to_msgids[chunkid]: @@ -760,6 +775,8 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. calls = list(calls) waiting_for = [] + maximum_to_send = 0 if wait else self.upload_buffer_size_limit + send_buffer() # Try to send data, as some cases (async_response) will never try to send data otherwise. while wait or calls: if self.shutdown_time and time.monotonic() > self.shutdown_time: # we are shutting this RemoteRepository down already, make sure we do not waste @@ -850,7 +867,7 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. for line in lines: handle_remote_line(line.decode()) # decode late, avoid partial utf-8 sequences if w: - while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: + while (len(self.to_send) <= maximum_to_send) and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: if calls: if is_preloaded: assert cmd == 'get', "is_preload is only supported for 'get'" @@ -864,29 +881,20 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+. self.msgid += 1 waiting_for.append(self.msgid) if self.dictFormat: - self.to_send = msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args}) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) else: - self.to_send = msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args))) + self.to_send.push_back(msgpack.packb((1, self.msgid, cmd, self.named_to_positional(cmd, args)))) if not self.to_send and self.preload_ids: chunk_id = self.preload_ids.pop(0) args = {'id': chunk_id} self.msgid += 1 self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) if self.dictFormat: - self.to_send = msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args}) + self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: 'get', ARGS: args})) else: - self.to_send = msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args))) + self.to_send.push_back(msgpack.packb((1, self.msgid, 'get', self.named_to_positional('get', args)))) - if self.to_send: - try: - written = self.ratelimit.write(self.stdin_fd, self.to_send) - self.tx_bytes += written - self.to_send = self.to_send[written:] - except OSError as e: - # io.write might raise EAGAIN even though select indicates - # that the fd should be writable - if e.errno != errno.EAGAIN: - raise + send_buffer() self.ignore_responses |= set(waiting_for) # we lose order here @api(since=parse_version('1.0.0'), diff --git a/src/borg/testsuite/efficient_collection_queue.py b/src/borg/testsuite/efficient_collection_queue.py new file mode 100644 index 00000000..e23673f5 --- /dev/null +++ b/src/borg/testsuite/efficient_collection_queue.py @@ -0,0 +1,51 @@ +import pytest + +from ..helpers.datastruct import EfficientCollectionQueue + + +class TestEfficientQueue: + def test_base_usage(self): + queue = EfficientCollectionQueue(100, bytes) + assert queue.peek_front() == b'' + queue.push_back(b'1234') + assert queue.peek_front() == b'1234' + assert len(queue) == 4 + assert queue + queue.pop_front(4) + assert queue.peek_front() == b'' + assert len(queue) == 0 + assert not queue + + def test_usage_with_arrays(self): + queue = EfficientCollectionQueue(100, list) + assert queue.peek_front() == [] + queue.push_back([1, 2, 3, 4]) + assert queue.peek_front() == [1, 2, 3, 4] + assert len(queue) == 4 + assert queue + queue.pop_front(4) + assert queue.peek_front() == [] + assert len(queue) == 0 + assert not queue + + def test_chunking(self): + queue = EfficientCollectionQueue(2, bytes) + queue.push_back(b'1') + queue.push_back(b'23') + queue.push_back(b'4567') + assert len(queue) == 7 + assert queue.peek_front() == b'12' + queue.pop_front(3) + assert queue.peek_front() == b'4' + queue.pop_front(1) + assert queue.peek_front() == b'56' + queue.pop_front(2) + assert len(queue) == 1 + assert queue + with pytest.raises(EfficientCollectionQueue.SizeUnderflow): + queue.pop_front(2) + assert queue.peek_front() == b'7' + queue.pop_front(1) + assert queue.peek_front() == b'' + assert len(queue) == 0 + assert not queue