Merge pull request #1455 from ThomasWaldmann/fix-compr-buffer-size

LZ4: dynamically enlarge the (de)compression buffer, fixes #1453
This commit is contained in:
TW 2016-08-09 18:36:55 +02:00 committed by GitHub
commit 316c0cd6f0
5 changed files with 61 additions and 48 deletions

View File

@ -25,7 +25,7 @@ from .helpers import Error, location_validator, archivename_validator, format_li
EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ErrorIgnoringTextIOWrapper EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR, log_multi, PatternMatcher, ErrorIgnoringTextIOWrapper
from .logger import create_logger, setup_logging from .logger import create_logger, setup_logging
logger = create_logger() logger = create_logger()
from .compress import Compressor, COMPR_BUFFER from .compress import Compressor
from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader from .upgrader import AtticRepositoryUpgrader, BorgRepositoryUpgrader
from .repository import Repository from .repository import Repository
from .cache import Cache from .cache import Cache
@ -240,9 +240,7 @@ class Archiver:
dry_run = args.dry_run dry_run = args.dry_run
t0 = datetime.utcnow() t0 = datetime.utcnow()
if not dry_run: if not dry_run:
compr_args = dict(buffer=COMPR_BUFFER) key.compressor = Compressor(**args.compression)
compr_args.update(args.compression)
key.compressor = Compressor(**compr_args)
with Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) as cache: with Cache(repository, key, manifest, do_files=args.cache_files, lock_wait=self.lock_wait) as cache:
archive = Archive(repository, key, manifest, args.location.archive, cache=cache, archive = Archive(repository, key, manifest, args.location.archive, cache=cache,
create=True, checkpoint_interval=args.checkpoint_interval, create=True, checkpoint_interval=args.checkpoint_interval,

View File

@ -1,3 +1,4 @@
import threading
import zlib import zlib
try: try:
import lzma import lzma
@ -7,6 +8,18 @@ except ImportError:
cdef extern from "lz4.h": cdef extern from "lz4.h":
int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil int LZ4_compress_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil
int LZ4_compressBound(int inputSize) nogil
thread_local = threading.local()
thread_local.buffer = bytes()
cdef char *get_buffer(size):
size = int(size)
if len(thread_local.buffer) < size:
thread_local.buffer = bytes(size)
return <char *> thread_local.buffer
cdef class CompressorBase: cdef class CompressorBase:
@ -52,40 +65,30 @@ class CNONE(CompressorBase):
return data return data
cdef class LZ4(CompressorBase): class LZ4(CompressorBase):
""" """
raw LZ4 compression / decompression (liblz4). raw LZ4 compression / decompression (liblz4).
Features: Features:
- lz4 is super fast - lz4 is super fast
- wrapper releases CPython's GIL to support multithreaded code - wrapper releases CPython's GIL to support multithreaded code
- buffer given by caller, avoiding frequent reallocation and buffer duplication
- uses safe lz4 methods that never go beyond the end of the output buffer - uses safe lz4 methods that never go beyond the end of the output buffer
But beware:
- this is not very generic, the given buffer MUST be large enough to
handle all compression or decompression output (or it will fail).
- you must not do method calls to the same LZ4 instance from different
threads at the same time - create one LZ4 instance per thread!
""" """
ID = b'\x01\x00' ID = b'\x01\x00'
name = 'lz4' name = 'lz4'
cdef char *buffer # helper buffer for (de)compression output def __init__(self, **kwargs):
cdef int bufsize # size of this buffer pass
def __cinit__(self, **kwargs):
buffer = kwargs['buffer']
self.buffer = buffer
self.bufsize = len(buffer)
def compress(self, idata): def compress(self, idata):
if not isinstance(idata, bytes): if not isinstance(idata, bytes):
idata = bytes(idata) # code below does not work with memoryview idata = bytes(idata) # code below does not work with memoryview
cdef int isize = len(idata) cdef int isize = len(idata)
cdef int osize = self.bufsize cdef int osize
cdef char *source = idata cdef char *source = idata
cdef char *dest = self.buffer cdef char *dest
osize = LZ4_compressBound(isize)
dest = get_buffer(osize)
with nogil: with nogil:
osize = LZ4_compress_limitedOutput(source, dest, isize, osize) osize = LZ4_compress_limitedOutput(source, dest, isize, osize)
if not osize: if not osize:
@ -97,15 +100,25 @@ cdef class LZ4(CompressorBase):
idata = bytes(idata) # code below does not work with memoryview idata = bytes(idata) # code below does not work with memoryview
idata = super().decompress(idata) idata = super().decompress(idata)
cdef int isize = len(idata) cdef int isize = len(idata)
cdef int osize = self.bufsize cdef int osize
cdef int rsize
cdef char *source = idata cdef char *source = idata
cdef char *dest = self.buffer cdef char *dest
# a bit more than 8MB is enough for the usual data sizes yielded by the chunker.
# allocate more if isize * 3 is already bigger, to avoid having to resize often.
osize = max(int(1.1 * 2**23), isize * 3)
while True:
dest = get_buffer(osize)
with nogil: with nogil:
osize = LZ4_decompress_safe(source, dest, isize, osize) rsize = LZ4_decompress_safe(source, dest, isize, osize)
if osize < 0: if rsize >= 0:
# malformed input data, buffer too small, ... break
if osize > 2 ** 30:
# this is insane, get out of here
raise Exception('lz4 decompress failed') raise Exception('lz4 decompress failed')
return dest[:osize] # likely the buffer was too small, get a bigger one:
osize = int(1.5 * osize)
return dest[:rsize]
class LZMA(CompressorBase): class LZMA(CompressorBase):
@ -192,8 +205,3 @@ class Compressor:
return cls(**self.params).decompress(data) return cls(**self.params).decompress(data)
else: else:
raise ValueError('No decompressor for this data found: %r.', data[:2]) raise ValueError('No decompressor for this data found: %r.', data[:2])
# a buffer used for (de)compression result, which can be slightly bigger
# than the chunk buffer in the worst (incompressible data) case, add 10%:
COMPR_BUFFER = bytes(int(1.1 * 2 ** 23)) # CHUNK_MAX_EXP == 23

View File

@ -492,8 +492,6 @@ def timestamp(s):
def ChunkerParams(s): def ChunkerParams(s):
chunk_min, chunk_max, chunk_mask, window_size = s.split(',') chunk_min, chunk_max, chunk_mask, window_size = s.split(',')
if int(chunk_max) > 23: if int(chunk_max) > 23:
# do not go beyond 2**23 (8MB) chunk size now,
# COMPR_BUFFER can only cope with up to this size
raise ValueError('max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)') raise ValueError('max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)')
return int(chunk_min), int(chunk_max), int(chunk_mask), int(window_size) return int(chunk_min), int(chunk_max), int(chunk_mask), int(window_size)

View File

@ -12,7 +12,7 @@ from .logger import create_logger
logger = create_logger() logger = create_logger()
from .crypto import AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks from .crypto import AES, bytes_to_long, long_to_bytes, bytes_to_int, num_aes_blocks
from .compress import Compressor, COMPR_BUFFER from .compress import Compressor
import msgpack import msgpack
PREFIX = b'\0' * 8 PREFIX = b'\0' * 8
@ -70,7 +70,7 @@ class KeyBase:
self.TYPE_STR = bytes([self.TYPE]) self.TYPE_STR = bytes([self.TYPE])
self.repository = repository self.repository = repository
self.target = None # key location file path / repo obj self.target = None # key location file path / repo obj
self.compressor = Compressor('none', buffer=COMPR_BUFFER) self.compressor = Compressor('none')
def id_hash(self, data): def id_hash(self, data):
"""Return HMAC hash using the "id" HMAC key """Return HMAC hash using the "id" HMAC key

View File

@ -1,3 +1,4 @@
import os
import zlib import zlib
try: try:
import lzma import lzma
@ -11,13 +12,13 @@ from ..compress import get_compressor, Compressor, CNONE, ZLIB, LZ4
buffer = bytes(2**16) buffer = bytes(2**16)
data = b'fooooooooobaaaaaaaar' * 10 data = b'fooooooooobaaaaaaaar' * 10
params = dict(name='zlib', level=6, buffer=buffer) params = dict(name='zlib', level=6)
def test_get_compressor(): def test_get_compressor():
c = get_compressor(name='none') c = get_compressor(name='none')
assert isinstance(c, CNONE) assert isinstance(c, CNONE)
c = get_compressor(name='lz4', buffer=buffer) c = get_compressor(name='lz4')
assert isinstance(c, LZ4) assert isinstance(c, LZ4)
c = get_compressor(name='zlib') c = get_compressor(name='zlib')
assert isinstance(c, ZLIB) assert isinstance(c, ZLIB)
@ -35,13 +36,21 @@ def test_cnull():
def test_lz4(): def test_lz4():
c = get_compressor(name='lz4', buffer=buffer) c = get_compressor(name='lz4')
cdata = c.compress(data) cdata = c.compress(data)
assert len(cdata) < len(data) assert len(cdata) < len(data)
assert data == c.decompress(cdata) assert data == c.decompress(cdata)
assert data == Compressor(**params).decompress(cdata) # autodetect assert data == Compressor(**params).decompress(cdata) # autodetect
def test_lz4_buffer_allocation():
# test with a rather huge data object to see if buffer allocation / resizing works
data = os.urandom(50 * 2**20) # 50MiB incompressible data
c = get_compressor(name='lz4')
cdata = c.compress(data)
assert data == c.decompress(cdata)
def test_zlib(): def test_zlib():
c = get_compressor(name='zlib') c = get_compressor(name='zlib')
cdata = c.compress(data) cdata = c.compress(data)
@ -83,16 +92,16 @@ def test_zlib_compat():
def test_compressor(): def test_compressor():
params_list = [ params_list = [
dict(name='none', buffer=buffer), dict(name='none'),
dict(name='lz4', buffer=buffer), dict(name='lz4'),
dict(name='zlib', level=0, buffer=buffer), dict(name='zlib', level=0),
dict(name='zlib', level=6, buffer=buffer), dict(name='zlib', level=6),
dict(name='zlib', level=9, buffer=buffer), dict(name='zlib', level=9),
] ]
if lzma: if lzma:
params_list += [ params_list += [
dict(name='lzma', level=0, buffer=buffer), dict(name='lzma', level=0),
dict(name='lzma', level=6, buffer=buffer), dict(name='lzma', level=6),
# we do not test lzma on level 9 because of the huge memory needs # we do not test lzma on level 9 because of the huge memory needs
] ]
for params in params_list: for params in params_list: