remote: deal with partial lines, fixes #2637

due to block buffering (in borg, pipes, sshd, ssh) partial lines might
be received. for plain text, this causes cosmetic issues, for json it
causes tracebacks due to parse errors.

the code now makes sure handle_remote_line() only gets called with a
complete line (which is terminated by any universal newline char, a
pure \r seems to be needed for remote progress displays).

it also fixes a yet undiscovered partial utf-8-sequence decoding issue
that might occur for the same reason.

(cherry picked from commit 8646216a06)
This commit is contained in:
Thomas Waldmann 2017-09-25 01:15:13 +02:00
parent 1c97903a12
commit f2b48cd5c8
2 changed files with 24 additions and 9 deletions

View File

@ -528,6 +528,7 @@ class RemoteRepository:
self.rx_bytes = 0 self.rx_bytes = 0
self.tx_bytes = 0 self.tx_bytes = 0
self.to_send = b'' self.to_send = b''
self.stderr_received = b'' # incomplete stderr line bytes received (no \n yet)
self.chunkid_to_msgids = {} self.chunkid_to_msgids = {}
self.ignore_responses = set() self.ignore_responses = set()
self.responses = {} self.responses = {}
@ -828,9 +829,16 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
if not data: if not data:
raise ConnectionClosed() raise ConnectionClosed()
self.rx_bytes += len(data) self.rx_bytes += len(data)
data = data.decode('utf-8') # deal with incomplete lines (may appear due to block buffering)
for line in data.splitlines(keepends=True): if self.stderr_received:
handle_remote_line(line) data = self.stderr_received + data
self.stderr_received = b''
lines = data.splitlines(keepends=True)
if lines and not lines[-1].endswith((b'\r', b'\n')):
self.stderr_received = lines.pop()
# now we have complete lines in <lines> and any partial line in self.stderr_received.
for line in lines:
handle_remote_line(line.decode('utf-8')) # decode late, avoid partial utf-8 sequences
if w: if w:
while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT: while not self.to_send and (calls or self.preload_ids) and len(waiting_for) < MAX_INFLIGHT:
if calls: if calls:
@ -961,6 +969,7 @@ def handle_remote_line(line):
This function is remarkably complex because it handles multiple wire formats. This function is remarkably complex because it handles multiple wire formats.
""" """
assert line.endswith(('\r', '\n'))
if line.startswith('{'): if line.startswith('{'):
# This format is used by Borg since 1.1.0b6 for new-protocol clients. # This format is used by Borg since 1.1.0b6 for new-protocol clients.
# It is the same format that is exposed by --log-json. # It is the same format that is exposed by --log-json.

View File

@ -944,16 +944,22 @@ class RemoteLoggerTestCase(BaseTestCase):
sys.stderr = self.old_stderr sys.stderr = self.old_stderr
def test_stderr_messages(self): def test_stderr_messages(self):
handle_remote_line("unstructured stderr message") handle_remote_line("unstructured stderr message\n")
self.assert_equal(self.stream.getvalue(), '') self.assert_equal(self.stream.getvalue(), '')
# stderr messages don't get an implicit newline # stderr messages don't get an implicit newline
self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr message') self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr message\n')
def test_stderr_progress_messages(self):
handle_remote_line("unstructured stderr progress message\r")
self.assert_equal(self.stream.getvalue(), '')
# stderr messages don't get an implicit newline
self.assert_equal(self.stderr.getvalue(), 'Remote: unstructured stderr progress message\r')
def test_pre11_format_messages(self): def test_pre11_format_messages(self):
self.handler.setLevel(logging.DEBUG) self.handler.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
handle_remote_line("$LOG INFO Remote: borg < 1.1 format message") handle_remote_line("$LOG INFO Remote: borg < 1.1 format message\n")
self.assert_equal(self.stream.getvalue(), 'Remote: borg < 1.1 format message\n') self.assert_equal(self.stream.getvalue(), 'Remote: borg < 1.1 format message\n')
self.assert_equal(self.stderr.getvalue(), '') self.assert_equal(self.stderr.getvalue(), '')
@ -961,7 +967,7 @@ class RemoteLoggerTestCase(BaseTestCase):
self.handler.setLevel(logging.DEBUG) self.handler.setLevel(logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG) logging.getLogger().setLevel(logging.DEBUG)
handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message") handle_remote_line("$LOG INFO borg.repository Remote: borg >= 1.1 format message\n")
self.assert_equal(self.stream.getvalue(), 'Remote: borg >= 1.1 format message\n') self.assert_equal(self.stream.getvalue(), 'Remote: borg >= 1.1 format message\n')
self.assert_equal(self.stderr.getvalue(), '') self.assert_equal(self.stderr.getvalue(), '')
@ -970,7 +976,7 @@ class RemoteLoggerTestCase(BaseTestCase):
self.handler.setLevel(logging.WARNING) self.handler.setLevel(logging.WARNING)
logging.getLogger().setLevel(logging.WARNING) logging.getLogger().setLevel(logging.WARNING)
handle_remote_line("$LOG INFO borg.repository Remote: new format info message") handle_remote_line("$LOG INFO borg.repository Remote: new format info message\n")
self.assert_equal(self.stream.getvalue(), '') self.assert_equal(self.stream.getvalue(), '')
self.assert_equal(self.stderr.getvalue(), '') self.assert_equal(self.stderr.getvalue(), '')
@ -990,7 +996,7 @@ class RemoteLoggerTestCase(BaseTestCase):
foo_handler.setLevel(logging.INFO) foo_handler.setLevel(logging.INFO)
logging.getLogger('borg.repository.foo').handlers[:] = [foo_handler] logging.getLogger('borg.repository.foo').handlers[:] = [foo_handler]
handle_remote_line("$LOG INFO borg.repository Remote: new format child message") handle_remote_line("$LOG INFO borg.repository Remote: new format child message\n")
self.assert_equal(foo_stream.getvalue(), '') self.assert_equal(foo_stream.getvalue(), '')
self.assert_equal(child_stream.getvalue(), 'Remote: new format child message\n') self.assert_equal(child_stream.getvalue(), 'Remote: new format child message\n')
self.assert_equal(self.stream.getvalue(), '') self.assert_equal(self.stream.getvalue(), '')