mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-26 01:37:20 +00:00
Merge pull request #2032 from ThomasWaldmann/fix-pipe-write
borg serve: fix transmission data loss of pipe writes, fixes #1268
This commit is contained in:
commit
986740b7fe
1 changed files with 21 additions and 3 deletions
|
@ -7,6 +7,7 @@
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
import textwrap
|
import textwrap
|
||||||
|
import time
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
|
|
||||||
from . import __version__
|
from . import __version__
|
||||||
|
@ -28,6 +29,23 @@
|
||||||
MAX_INFLIGHT = 100
|
MAX_INFLIGHT = 100
|
||||||
|
|
||||||
|
|
||||||
|
def os_write(fd, data):
|
||||||
|
"""os.write wrapper so we do not lose data for partial writes."""
|
||||||
|
# This is happening frequently on cygwin due to its small pipe buffer size of only 64kiB
|
||||||
|
# and also due to its different blocking pipe behaviour compared to Linux/*BSD.
|
||||||
|
# Neither Linux nor *BSD ever do partial writes on blocking pipes, unless interrupted by a
|
||||||
|
# signal, in which case serve() would terminate.
|
||||||
|
amount = remaining = len(data)
|
||||||
|
while remaining:
|
||||||
|
count = os.write(fd, data)
|
||||||
|
remaining -= count
|
||||||
|
if not remaining:
|
||||||
|
break
|
||||||
|
data = data[count:]
|
||||||
|
time.sleep(count * 1e-09)
|
||||||
|
return amount
|
||||||
|
|
||||||
|
|
||||||
class ConnectionClosed(Error):
|
class ConnectionClosed(Error):
|
||||||
"""Connection closed by remote host"""
|
"""Connection closed by remote host"""
|
||||||
|
|
||||||
|
@ -106,7 +124,7 @@ def serve(self):
|
||||||
if self.repository is not None:
|
if self.repository is not None:
|
||||||
self.repository.close()
|
self.repository.close()
|
||||||
else:
|
else:
|
||||||
os.write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
|
os_write(stderr_fd, "Borg {}: Got connection close before repository was opened.\n"
|
||||||
.format(__version__).encode())
|
.format(__version__).encode())
|
||||||
return
|
return
|
||||||
unpacker.feed(data)
|
unpacker.feed(data)
|
||||||
|
@ -133,9 +151,9 @@ def serve(self):
|
||||||
logging.exception('Borg %s: exception in RPC call:', __version__)
|
logging.exception('Borg %s: exception in RPC call:', __version__)
|
||||||
logging.error(sysinfo())
|
logging.error(sysinfo())
|
||||||
exc = "Remote Exception (see remote log for the traceback)"
|
exc = "Remote Exception (see remote log for the traceback)"
|
||||||
os.write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
os_write(stdout_fd, msgpack.packb((1, msgid, e.__class__.__name__, exc)))
|
||||||
else:
|
else:
|
||||||
os.write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
os_write(stdout_fd, msgpack.packb((1, msgid, None, res)))
|
||||||
if es:
|
if es:
|
||||||
self.repository.close()
|
self.repository.close()
|
||||||
return
|
return
|
||||||
|
|
Loading…
Reference in a new issue