""" borg.compress ============= Compression is applied to chunks after ID hashing (so the ID is a direct function of the plain chunk, compression is irrelevant to it), and of course before encryption. The "auto" mode (e.g. --compression auto,lzma,4) is implemented as a meta Compressor, meaning that Auto acts like a Compressor, but defers actual work to others (namely LZ4 as a heuristic whether compression is worth it, and the specified Compressor for the actual compression). Decompression is normally handled through Compressor.decompress which will detect which compressor has been used to compress the data and dispatch to the correct decompressor. """ from argparse import ArgumentTypeError import random from struct import Struct import zlib try: import lzma except ImportError: lzma = None from .constants import MAX_DATA_SIZE from .helpers import Buffer, DecompressionError API_VERSION = '1.2_02' cdef extern from "lz4.h": int LZ4_compress_default(const char* source, char* dest, int inputSize, int maxOutputSize) nogil int LZ4_decompress_safe(const char* source, char* dest, int inputSize, int maxOutputSize) nogil int LZ4_compressBound(int inputSize) nogil cdef extern from "zstd.h": size_t ZSTD_compress(void* dst, size_t dstCapacity, const void* src, size_t srcSize, int compressionLevel) nogil size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t compressedSize) nogil size_t ZSTD_compressBound(size_t srcSize) nogil unsigned long long ZSTD_CONTENTSIZE_UNKNOWN unsigned long long ZSTD_CONTENTSIZE_ERROR unsigned long long ZSTD_getFrameContentSize(const void *src, size_t srcSize) nogil unsigned ZSTD_isError(size_t code) nogil const char* ZSTD_getErrorName(size_t code) nogil buffer = Buffer(bytearray, size=0) cdef class CompressorBase: """ base class for all (de)compression classes, also handles compression format auto detection and adding/stripping the ID header (which enable auto detection). """ ID = 0xFF # reserved and not used # overwrite with a unique 1-byte bytestring in child classes name = 'baseclass' @classmethod def detect(cls, data): return data and data[0] == cls.ID def __init__(self, level=255, legacy_mode=False, **kwargs): assert 0 <= level <= 255 self.level = level self.legacy_mode = legacy_mode # True: support prefixed ctype/clevel bytes def decide(self, data): """ Return which compressor will perform the actual compression for *data*. This exists for a very specific case: If borg recreate is instructed to recompress using Auto compression it needs to determine the _actual_ target compression of a chunk in order to detect whether it should be recompressed. Any compressor may return a compressor other than *self*, like e.g. the CNONE compressor, and should actually do so if *data* would become larger on compression. """ return self def compress(self, meta, data): """ Compress *data* (bytes) and return compression metadata and compressed bytes. """ if not isinstance(data, bytes): data = bytes(data) # code below does not work with memoryview if self.legacy_mode: return None, bytes((self.ID, self.level)) + data else: meta["ctype"] = self.ID meta["clevel"] = self.level meta["csize"] = len(data) return meta, data def decompress(self, meta, data): """ Decompress *data* (preferably a memoryview, bytes also acceptable) and return bytes result. Legacy mode: The leading Compressor ID bytes need to be present. Only handles input generated by _this_ Compressor - for a general purpose decompression method see *Compressor.decompress*. """ if self.legacy_mode: assert meta is None meta = {} meta["ctype"] = data[0] meta["clevel"] = data[1] meta["csize"] = len(data) return meta, data[2:] else: assert isinstance(meta, dict) assert "ctype" in meta assert "clevel" in meta return meta, data def check_fix_size(self, meta, data): if "size" in meta: assert meta["size"] == len(data) elif self.legacy_mode: meta["size"] = len(data) else: pass # raise ValueError("size not present and not in legacy mode") cdef class DecidingCompressor(CompressorBase): """ base class for (de)compression classes that (based on an internal _decide method) decide whether and how to compress data. """ name = 'decidebaseclass' def __init__(self, level=255, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) def _decide(self, meta, data): """ Decides what to do with *data*. Returns (compressor, meta, compressed_data). *compressed_data* can be the result of *data* being processed by *compressor*, if that is generated as a side-effect of the decision process, or None otherwise. This private method allows for more efficient implementation of compress() and decide_compress() making use of *compressed_data*, if already generated. """ raise NotImplementedError def decide(self, meta, data): return self._decide(meta, data)[0] def decide_compress(self, meta, data): """ Decides what to do with *data* and handle accordingly. Returns (compressor, compressed_data). *compressed_data* is the result of *data* being processed by *compressor*. """ compressor, (meta, compressed_data) = self._decide(meta, data) if compressed_data is None: meta, compressed_data = compressor.compress(meta, data) if compressor is self: # call super class to add ID bytes return self, super().compress(meta, compressed_data) return compressor, (meta, compressed_data) def compress(self, meta, data): meta["size"] = len(data) return self.decide_compress(meta, data)[1] class CNONE(CompressorBase): """ none - no compression, just pass through data """ ID = 0x00 name = 'none' def __init__(self, level=255, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) # no defined levels for CNONE, so just say "unknown" def compress(self, meta, data): meta["size"] = len(data) return super().compress(meta, data) def decompress(self, meta, data): meta, data = super().decompress(meta, data) if not isinstance(data, bytes): data = bytes(data) self.check_fix_size(meta, data) return meta, data class LZ4(DecidingCompressor): """ raw LZ4 compression / decompression (liblz4). Features: - lz4 is super fast - wrapper releases CPython's GIL to support multithreaded code - uses safe lz4 methods that never go beyond the end of the output buffer """ ID = 0x01 name = 'lz4' def __init__(self, level=255, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) # no defined levels for LZ4, so just say "unknown" def _decide(self, meta, idata): """ Decides what to do with *data*. Returns (compressor, lz4_data). *lz4_data* is the LZ4 result if *compressor* is LZ4 as well, otherwise it is None. """ if not isinstance(idata, bytes): idata = bytes(idata) # code below does not work with memoryview cdef int isize = len(idata) cdef int osize cdef char *source = idata cdef char *dest osize = LZ4_compressBound(isize) buf = buffer.get(osize) dest = buf with nogil: osize = LZ4_compress_default(source, dest, isize, osize) if not osize: raise Exception('lz4 compress failed') # only compress if the result actually is smaller if osize < isize: return self, (meta, dest[:osize]) else: return NONE_COMPRESSOR, (meta, None) def decompress(self, meta, data): meta, idata = super().decompress(meta, data) if not isinstance(idata, bytes): idata = bytes(idata) # code below does not work with memoryview cdef int isize = len(idata) cdef int osize cdef int rsize cdef char *source = idata cdef char *dest # a bit more than 8MB is enough for the usual data sizes yielded by the chunker. # allocate more if isize * 3 is already bigger, to avoid having to resize often. osize = max(int(1.1 * 2**23), isize * 3) while True: try: buf = buffer.get(osize) except MemoryError: raise DecompressionError('MemoryError') dest = buf with nogil: rsize = LZ4_decompress_safe(source, dest, isize, osize) if rsize >= 0: break if osize > 2 ** 27: # 128MiB (should be enough, considering max. repo obj size and very good compression) # this is insane, get out of here raise DecompressionError('lz4 decompress failed') # likely the buffer was too small, get a bigger one: osize = int(1.5 * osize) data = dest[:rsize] self.check_fix_size(meta, data) return meta, data class LZMA(DecidingCompressor): """ lzma compression / decompression """ ID = 0x02 name = 'lzma' def __init__(self, level=6, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) self.level = level if lzma is None: raise ValueError('No lzma support found.') def _decide(self, meta, data): """ Decides what to do with *data*. Returns (compressor, lzma_data). *lzma_data* is the LZMA result if *compressor* is LZMA as well, otherwise it is None. """ # we do not need integrity checks in lzma, we do that already lzma_data = lzma.compress(data, preset=self.level, check=lzma.CHECK_NONE) if len(lzma_data) < len(data): return self, (meta, lzma_data) else: return NONE_COMPRESSOR, (meta, None) def decompress(self, meta, data): meta, data = super().decompress(meta, data) try: data = lzma.decompress(data) self.check_fix_size(meta, data) return meta, data except lzma.LZMAError as e: raise DecompressionError(str(e)) from None class ZSTD(DecidingCompressor): """zstd compression / decompression (pypi: zstandard, gh: python-zstandard)""" # This is a NOT THREAD SAFE implementation. # Only ONE python context must be created at a time. # It should work flawlessly as long as borg will call ONLY ONE compression job at time. ID = 0x03 name = 'zstd' def __init__(self, level=3, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) self.level = level def _decide(self, meta, idata): """ Decides what to do with *data*. Returns (compressor, zstd_data). *zstd_data* is the ZSTD result if *compressor* is ZSTD as well, otherwise it is None. """ if not isinstance(idata, bytes): idata = bytes(idata) # code below does not work with memoryview cdef int isize = len(idata) cdef int osize cdef char *source = idata cdef char *dest cdef int level = self.level osize = ZSTD_compressBound(isize) buf = buffer.get(osize) dest = buf with nogil: osize = ZSTD_compress(dest, osize, source, isize, level) if ZSTD_isError(osize): raise Exception('zstd compress failed: %s' % ZSTD_getErrorName(osize)) # only compress if the result actually is smaller if osize < isize: return self, (meta, dest[:osize]) else: return NONE_COMPRESSOR, (meta, None) def decompress(self, meta, data): meta, idata = super().decompress(meta, data) if not isinstance(idata, bytes): idata = bytes(idata) # code below does not work with memoryview cdef int isize = len(idata) cdef unsigned long long osize cdef unsigned long long rsize cdef char *source = idata cdef char *dest osize = ZSTD_getFrameContentSize(source, isize) if osize == ZSTD_CONTENTSIZE_ERROR: raise DecompressionError('zstd get size failed: data was not compressed by zstd') if osize == ZSTD_CONTENTSIZE_UNKNOWN: raise DecompressionError('zstd get size failed: original size unknown') try: buf = buffer.get(osize) except MemoryError: raise DecompressionError('MemoryError') dest = buf with nogil: rsize = ZSTD_decompress(dest, osize, source, isize) if ZSTD_isError(rsize): raise DecompressionError('zstd decompress failed: %s' % ZSTD_getErrorName(rsize)) if rsize != osize: raise DecompressionError('zstd decompress failed: size mismatch') data = dest[:osize] self.check_fix_size(meta, data) return meta, data class ZLIB(DecidingCompressor): """ zlib compression / decompression (python stdlib) """ ID = 0x05 name = 'zlib' def __init__(self, level=6, legacy_mode=False, **kwargs): super().__init__(level=level, legacy_mode=legacy_mode, **kwargs) self.level = level def _decide(self, meta, data): """ Decides what to do with *data*. Returns (compressor, zlib_data). *zlib_data* is the ZLIB result if *compressor* is ZLIB as well, otherwise it is None. """ zlib_data = zlib.compress(data, self.level) if len(zlib_data) < len(data): return self, (meta, zlib_data) else: return NONE_COMPRESSOR, (meta, None) def decompress(self, meta, data): meta, data = super().decompress(meta, data) try: data = zlib.decompress(data) self.check_fix_size(meta, data) return meta, data except zlib.error as e: raise DecompressionError(str(e)) from None class ZLIB_legacy(CompressorBase): """ zlib compression / decompression (python stdlib) Note: This is the legacy ZLIB support as used by borg < 1.3. It still suffers from attic *only* supporting zlib and not having separate ID bytes to differentiate between differently compressed chunks. This just works because zlib compressed stuff always starts with 0x.8.. bytes. Newer borg uses the ZLIB class that has separate ID bytes (as all the other compressors) and does not need this hack. """ ID = 0x08 # not used here, see detect() # avoid all 0x.8 IDs elsewhere! name = 'zlib_legacy' @classmethod def detect(cls, data): # matches misc. patterns 0x.8.. used by zlib cmf, flg = data[:2] is_deflate = cmf & 0x0f == 8 check_ok = (cmf * 256 + flg) % 31 == 0 return check_ok and is_deflate def __init__(self, level=6, **kwargs): super().__init__(level=level, **kwargs) self.level = level def compress(self, meta, data): # note: for compatibility no super call, do not add ID bytes return None, zlib.compress(data, self.level) def decompress(self, meta, data): # note: for compatibility no super call, do not strip ID bytes assert self.legacy_mode # only borg 1.x repos have the legacy ZLIB format assert meta is None meta = {} meta["ctype"] = ZLIB.ID # change to non-legacy ZLIB id meta["clevel"] = 255 # we do not know the compression level meta["csize"] = len(data) try: data = zlib.decompress(data) self.check_fix_size(meta, data) return meta, data except zlib.error as e: raise DecompressionError(str(e)) from None class Auto(CompressorBase): """ Meta-Compressor that decides which compression to use based on LZ4's ratio. As a meta-Compressor the actual compression is deferred to other Compressors, therefore this Compressor has no ID, no detect() and no decompress(). """ ID = None name = 'auto' def __init__(self, compressor): super().__init__() self.compressor = compressor def _decide(self, meta, data): """ Decides what to do with *data*. Returns (compressor, compressed_data). *compressor* is the compressor that is decided to be best suited to compress *data*, *compressed_data* is the result of *data* being compressed by a compressor, which may or may not be *compressor*! There are three possible outcomes of the decision process: * *data* compresses well enough for trying the more expensive compressor set on instantiation to make sense. In this case, (expensive_compressor_class, lz4_compressed_data) is returned. * *data* compresses only slightly using the LZ4 compressor, thus trying the more expensive compressor for potentially little gain does not make sense. In this case, (LZ4_COMPRESSOR, lz4_compressed_data) is returned. * *data* does not compress at all using LZ4, in this case (NONE_COMPRESSOR, none_compressed_data) is returned. Note: While it makes no sense, the expensive compressor may well be set to the LZ4 compressor. """ compressor, (meta, compressed_data) = LZ4_COMPRESSOR.decide_compress(meta, data) # compressed_data includes the compression type header, while data does not yet ratio = len(compressed_data) / (len(data) + 2) if ratio < 0.97: return self.compressor, (meta, compressed_data) else: return compressor, (meta, compressed_data) def decide(self, meta, data): return self._decide(meta, data)[0] def compress(self, meta, data): def get_meta(from_meta, to_meta): for key in "ctype", "clevel", "csize": if key in from_meta: to_meta[key] = from_meta[key] compressor, (cheap_meta, cheap_compressed_data) = self._decide(dict(meta), data) if compressor in (LZ4_COMPRESSOR, NONE_COMPRESSOR): # we know that trying to compress with expensive compressor is likely pointless, # so we fallback to return the cheap compressed data. get_meta(cheap_meta, meta) return meta, cheap_compressed_data # if we get here, the decider decided to try the expensive compressor. # we also know that the compressed data returned by the decider is lz4 compressed. expensive_meta, expensive_compressed_data = compressor.compress(dict(meta), data) ratio = len(expensive_compressed_data) / len(cheap_compressed_data) if ratio < 0.99: # the expensive compressor managed to squeeze the data significantly better than lz4. get_meta(expensive_meta, meta) return meta, expensive_compressed_data else: # otherwise let's just store the lz4 data, which decompresses extremely fast. get_meta(cheap_meta, meta) return meta, cheap_compressed_data def decompress(self, data): raise NotImplementedError def detect(cls, data): raise NotImplementedError class ObfuscateSize(CompressorBase): """ Meta-Compressor that obfuscates the compressed data size. """ ID = 0x04 name = 'obfuscate' header_fmt = Struct('= 0 and <= 9") else: raise ArgumentTypeError("too many arguments") self.level = level elif self.name in ('zstd', ): if count < 2: level = 3 # default compression level in zstd elif count == 2: level = int(values[1]) if not 1 <= level <= 22: raise ArgumentTypeError("level must be >= 1 and <= 22") else: raise ArgumentTypeError("too many arguments") self.level = level elif self.name == 'auto': if 2 <= count <= 3: compression = ','.join(values[1:]) else: raise ArgumentTypeError("bad arguments") self.inner = CompressionSpec(compression) elif self.name == 'obfuscate': if 3 <= count <= 5: level = int(values[1]) if not ((1 <= level <= 6) or (110 <= level <= 123)): raise ArgumentTypeError("level must be >= 1 and <= 6 or >= 110 and <= 123") self.level = level compression = ','.join(values[2:]) else: raise ArgumentTypeError("bad arguments") self.inner = CompressionSpec(compression) else: raise ArgumentTypeError("unsupported compression type") @property def compressor(self): if self.name in ('none', 'lz4', ): return get_compressor(self.name) elif self.name in ('zlib', 'lzma', 'zstd', 'zlib_legacy'): return get_compressor(self.name, level=self.level) elif self.name == 'auto': return get_compressor(self.name, compressor=self.inner.compressor) elif self.name == 'obfuscate': return get_compressor(self.name, level=self.level, compressor=self.inner.compressor)