diff --git a/borg/archiver.py b/borg/archiver.py index bfd56bf0b..41373e259 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -25,7 +25,7 @@ from .helpers import Error, location_validator, archivename_validator, format_li EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ErrorIgnoringTextIOWrapper from .logger import create_logger, setup_logging logger = create_logger() -from .compress import Compressor, COMPR_BUFFER +from .compress import Compressor from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader from .repository import Repository from .cache import Cache @@ -240,9 +240,7 @@ class Archiver: dry_run = args.dry_run t0 = datetime.utcnow() if not dry_run: - compr_args = dict(buffer=COMPR_BUFFER) - compr_args.update(args.compression) - key.compressor = Compressor(**compr_args) + key.compressor = Compressor(**args.compression) with Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) as cache: archive = Archive(repository, key, manifest, args.location.archive, cache=cache, create=True, checkpoint_interval=args.checkpoint_interval, diff --git a/borg/compress.pyx b/borg/compress.pyx index 3bb88def7..1330fbf2f 100644 --- a/borg/compress.pyx +++ b/borg/compress.pyx @@ -7,6 +7,7 @@ except ImportError: cdef extern from "lz4.h": int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil + int LZ4_compressBound(int inputSize) nogil cdef class CompressorBase: @@ -52,40 +53,35 @@ class CNONE(CompressorBase): return data -cdef class LZ4(CompressorBase): +class LZ4(CompressorBase): """ raw LZ4 compression / decompression (liblz4). Features: - lz4 is super fast - wrapper releases CPython's GIL to support multithreaded code - - buffer given by caller, avoiding frequent reallocation and buffer duplication - uses safe lz4 methods that never go beyond the end of the output buffer - - But beware: - - this is not very generic, the given buffer MUST be large enough to - handle all compression or decompression output (or it will fail). - - you must not do method calls to the same LZ4 instance from different - threads at the same time - create one LZ4 instance per thread! """ ID = b'\x01\x00' name = 'lz4' - cdef char *buffer # helper buffer for (de)compression output - cdef int bufsize # size of this buffer + def __init__(self, **kwargs): + self.buffer = None - def __cinit__(self, **kwargs): - buffer = kwargs['buffer'] - self.buffer = buffer - self.bufsize = len(buffer) + def _create_buffer(self, size): + # we keep a reference to the buffer until this instance is destroyed + self.buffer = bytes(int(size)) def compress(self, idata): if not isinstance(idata, bytes): idata = bytes(idata) # code below does not work with memoryview cdef int isize = len(idata) - cdef int osize = self.bufsize + cdef int osize cdef char *source = idata - cdef char *dest = self.buffer + cdef char *dest + osize = LZ4_compressBound(isize) + self._create_buffer(osize) + dest = self.buffer with nogil: osize = LZ4_compress_limitedOutput(source, dest, isize, osize) if not osize: @@ -97,15 +93,26 @@ cdef class LZ4(CompressorBase): idata = bytes(idata) # code below does not work with memoryview idata = super().decompress(idata) cdef int isize = len(idata) - cdef int osize = self.bufsize + cdef int osize + cdef int rsize cdef char *source = idata - cdef char *dest = self.buffer - with nogil: - osize = LZ4_decompress_safe(source, dest, isize, osize) - if osize < 0: - # malformed input data, buffer too small, ... - raise Exception('lz4 decompress failed') - return dest[:osize] + cdef char *dest + # a bit more than 8MB is enough for the usual data sizes yielded by the chunker. + # allocate more if isize * 3 is already bigger, to avoid having to resize often. + osize = max(int(1.1 * 2**23), isize * 3) + while True: + self._create_buffer(osize) + dest = self.buffer + with nogil: + rsize = LZ4_decompress_safe(source, dest, isize, osize) + if rsize >= 0: + break + if osize > 2 ** 30: + # this is insane, get out of here + raise Exception('lz4 decompress failed') + # likely the buffer was too small, get a bigger one: + osize = int(1.5 * osize) + return dest[:rsize] class LZMA(CompressorBase): @@ -192,8 +199,3 @@ class Compressor: return cls(**self.params).decompress(data) else: raise ValueError('No decompressor for this data found: %r.', data[:2]) - - -# a buffer used for (de)compression result, which can be slightly bigger -# than the chunk buffer in the worst (incompressible data) case, add 10%: -COMPR_BUFFER = bytes(int(1.1 * 2 ** 23)) # CHUNK_MAX_EXP == 23 diff --git a/borg/helpers.py b/borg/helpers.py index bacb434ba..4275d783e 100644 --- a/borg/helpers.py +++ b/borg/helpers.py @@ -492,8 +492,6 @@ def timestamp(s): def ChunkerParams(s): chunk_min, chunk_max, chunk_mask, window_size = s.split(',') if int(chunk_max) > 23: - # do not go beyond 2**23 (8MB) chunk size now, - # COMPR_BUFFER can only cope with up to this size raise ValueError('max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)') return int(chunk_min), int(chunk_max), int(chunk_mask), int(window_size) diff --git a/borg/key.py b/borg/key.py index be79dfc14..95178f7c8 100644 --- a/borg/key.py +++ b/borg/key.py @@ -12,7 +12,7 @@ from .logger import create_logger logger = create_logger() from .crypto import AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks -from .compress import Compressor, COMPR_BUFFER +from .compress import Compressor import msgpack PREFIX = b'\0' * 8 @@ -70,7 +70,7 @@ class KeyBase: self.TYPE_STR = bytes([self.TYPE]) self.repository = repository self.target = None # key location file path / repo obj - self.compressor = Compressor('none', buffer=COMPR_BUFFER) + self.compressor = Compressor('none') def id_hash(self, data): """Return HMAC hash using the "id" HMAC key diff --git a/borg/testsuite/compress.py b/borg/testsuite/compress.py index 1a4353583..ff9d42713 100644 --- a/borg/testsuite/compress.py +++ b/borg/testsuite/compress.py @@ -1,3 +1,4 @@ +import os import zlib try: import lzma @@ -11,13 +12,13 @@ from ..compress import get_compressor, Compressor, CNONE, ZLIB, LZ4 buffer = bytes(2**16) data = b'fooooooooobaaaaaaaar' * 10 -params = dict(name='zlib', level=6, buffer=buffer) +params = dict(name='zlib', level=6) def test_get_compressor(): c = get_compressor(name='none') assert isinstance(c, CNONE) - c = get_compressor(name='lz4', buffer=buffer) + c = get_compressor(name='lz4') assert isinstance(c, LZ4) c = get_compressor(name='zlib') assert isinstance(c, ZLIB) @@ -35,13 +36,21 @@ def test_cnull(): def test_lz4(): - c = get_compressor(name='lz4', buffer=buffer) + c = get_compressor(name='lz4') cdata = c.compress(data) assert len(cdata) < len(data) assert data == c.decompress(cdata) assert data == Compressor(**params).decompress(cdata) # autodetect +def test_lz4_buffer_allocation(): + # test with a rather huge data object to see if buffer allocation / resizing works + data = os.urandom(50 * 2**20) # 50MiB incompressible data + c = get_compressor(name='lz4') + cdata = c.compress(data) + assert data == c.decompress(cdata) + + def test_zlib(): c = get_compressor(name='zlib') cdata = c.compress(data) @@ -83,16 +92,16 @@ def test_zlib_compat(): def test_compressor(): params_list = [ - dict(name='none', buffer=buffer), - dict(name='lz4', buffer=buffer), - dict(name='zlib', level=0, buffer=buffer), - dict(name='zlib', level=6, buffer=buffer), - dict(name='zlib', level=9, buffer=buffer), + dict(name='none'), + dict(name='lz4'), + dict(name='zlib', level=0), + dict(name='zlib', level=6), + dict(name='zlib', level=9), ] if lzma: params_list += [ - dict(name='lzma', level=0, buffer=buffer), - dict(name='lzma', level=6, buffer=buffer), + dict(name='lzma', level=0), + dict(name='lzma', level=6), # we do not test lzma on level 9 because of the huge memory needs ] for params in params_list: