#!/usr/bin/python -u # # p7zr library # # Copyright (c) 2019,2020 Hiroshi Miura # Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de # 7-Zip Copyright (C) 1999-2010 Igor Pavlov # LZMA SDK Copyright (C) 1999-2010 Igor Pavlov # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # import functools import io import os import struct from binascii import unhexlify from functools import reduce from io import BytesIO from operator import and_, or_ from struct import pack, unpack from typing import Any, BinaryIO, Dict, List, Optional, Tuple from py7zr.compression import SevenZipCompressor, SevenZipDecompressor from py7zr.exceptions import Bad7zFile, UnsupportedCompressionMethodError from py7zr.helpers import ArchiveTimestamp, calculate_crc32 from py7zr.properties import MAGIC_7Z, CompressionMethod, Property MAX_LENGTH = 65536 P7ZIP_MAJOR_VERSION = b'\x00' P7ZIP_MINOR_VERSION = b'\x04' def read_crcs(file: BinaryIO, count: int) -> List[int]: data = file.read(4 * count) return [unpack(' Tuple[bytes, ...]: return unpack(b'B' * length, file.read(length)) def read_byte(file: BinaryIO) -> int: return ord(file.read(1)) def write_bytes(file: BinaryIO, data: bytes): return file.write(data) def write_byte(file: BinaryIO, data): assert len(data) == 1 return write_bytes(file, data) def read_real_uint64(file: BinaryIO) -> Tuple[int, bytes]: """read 8 bytes, return unpacked value as a little endian unsigned long long, and raw data.""" res = file.read(8) a = unpack(' Tuple[int, bytes]: """read 4 bytes, return unpacked value as a little endian unsigned long, and raw data.""" res = file.read(4) a = unpack(' int: """read UINT64, definition show in write_uint64()""" b = ord(file.read(1)) if b == 255: return read_real_uint64(file)[0] blen = [(0b01111111, 0), (0b10111111, 1), (0b11011111, 2), (0b11101111, 3), (0b11110111, 4), (0b11111011, 5), (0b11111101, 6), (0b11111110, 7)] mask = 0x80 vlen = 8 for v, l in blen: if b <= v: vlen = l break mask >>= 1 if vlen == 0: return b & (mask - 1) val = file.read(vlen) value = int.from_bytes(val, byteorder='little') highpart = b & (mask - 1) return value + (highpart << (vlen * 8)) def write_real_uint64(file: BinaryIO, value: int): """write 8 bytes, as an unsigned long long.""" file.write(pack(' 0x01ffffffffffffff: file.write(b'\xff') file.write(value.to_bytes(8, 'little')) return byte_length = (value.bit_length() + 7) // 8 ba = bytearray(value.to_bytes(byte_length, 'little')) high_byte = int(ba[-1]) if high_byte < 2 << (8 - byte_length - 1): for x in range(byte_length - 1): high_byte |= 0x80 >> x file.write(pack('B', high_byte)) file.write(ba[:byte_length - 1]) else: mask = 0x80 for x in range(byte_length): mask |= 0x80 >> x file.write(pack('B', mask)) file.write(ba) def read_boolean(file: BinaryIO, count: int, checkall: bool = False) -> List[bool]: if checkall: all_defined = file.read(1) if all_defined != unhexlify('00'): return [True] * count result = [] b = 0 mask = 0 for i in range(count): if mask == 0: b = ord(file.read(1)) mask = 0x80 result.append(b & mask != 0) mask >>= 1 return result def write_boolean(file: BinaryIO, booleans: List[bool], all_defined: bool = False): if all_defined and reduce(and_, booleans, True): file.write(b'\x01') return elif all_defined: file.write(b'\x00') o = bytearray(-(-len(booleans) // 8)) for i, b in enumerate(booleans): if b: o[i // 8] |= 1 << (7 - i % 8) file.write(o) def read_utf16(file: BinaryIO) -> str: """read a utf-16 string from file""" val = '' for _ in range(MAX_LENGTH): ch = file.read(2) if ch == unhexlify('0000'): break val += ch.decode('utf-16LE') return val def write_utf16(file: BinaryIO, val: str): """write a utf-16 string to file""" for c in val: file.write(c.encode('utf-16LE')) file.write(b'\x00\x00') def bits_to_bytes(bit_length: int) -> int: return - (-bit_length // 8) class ArchiveProperties: __slots__ = ['property_data'] def __init__(self): self.property_data = [] @classmethod def retrieve(cls, file): return cls()._read(file) def _read(self, file): pid = file.read(1) if pid == Property.ARCHIVE_PROPERTIES: while True: ptype = file.read(1) if ptype == Property.END: break size = read_uint64(file) props = read_bytes(file, size) self.property_data.append(props) return self def write(self, file): if len(self.property_data) > 0: write_byte(file, Property.ARCHIVE_PROPERTIES) for data in self.property_data: write_uint64(file, len(data)) write_bytes(file, data) write_byte(file, Property.END) class PackInfo: """ information about packed streams """ __slots__ = ['packpos', 'numstreams', 'packsizes', 'packpositions', 'crcs'] def __init__(self) -> None: self.packpos = 0 # type: int self.numstreams = 0 # type: int self.packsizes = [] # type: List[int] self.crcs = None # type: Optional[List[int]] @classmethod def retrieve(cls, file: BinaryIO): return cls()._read(file) def _read(self, file: BinaryIO): self.packpos = read_uint64(file) self.numstreams = read_uint64(file) pid = file.read(1) if pid == Property.SIZE: self.packsizes = [read_uint64(file) for _ in range(self.numstreams)] pid = file.read(1) if pid == Property.CRC: self.crcs = [read_uint64(file) for _ in range(self.numstreams)] pid = file.read(1) if pid != Property.END: raise Bad7zFile('end id expected but %s found' % repr(pid)) self.packpositions = [sum(self.packsizes[:i]) for i in range(self.numstreams + 1)] # type: List[int] return self def write(self, file: BinaryIO): assert self.packpos is not None numstreams = len(self.packsizes) assert self.crcs is None or len(self.crcs) == numstreams write_byte(file, Property.PACK_INFO) write_uint64(file, self.packpos) write_uint64(file, numstreams) write_byte(file, Property.SIZE) for size in self.packsizes: write_uint64(file, size) if self.crcs is not None: write_bytes(file, Property.CRC) for crc in self.crcs: write_uint64(file, crc) write_byte(file, Property.END) class Folder: """ a "Folder" represents a stream of compressed data. coders: list of coder num_coders: length of coders coder: hash list keys of coders: method, numinstreams, numoutstreams, properties unpacksizes: uncompressed sizes of outstreams """ __slots__ = ['unpacksizes', 'solid', 'coders', 'digestdefined', 'totalin', 'totalout', 'bindpairs', 'packed_indices', 'crc', 'decompressor', 'compressor', 'files'] def __init__(self) -> None: self.unpacksizes = None # type: Optional[List[int]] self.coders = [] # type: List[Dict[str, Any]] self.bindpairs = [] # type: List[Any] self.packed_indices = [] # type: List[int] # calculated values self.totalin = 0 # type: int self.totalout = 0 # type: int # internal values self.solid = False # type: bool self.digestdefined = False # type: bool self.crc = None # type: Optional[int] # compress/decompress objects self.decompressor = None # type: Optional[SevenZipDecompressor] self.compressor = None # type: Optional[SevenZipCompressor] self.files = None @classmethod def retrieve(cls, file: BinaryIO): obj = cls() obj._read(file) return obj def _read(self, file: BinaryIO) -> None: num_coders = read_uint64(file) for _ in range(num_coders): b = read_byte(file) methodsize = b & 0xf iscomplex = b & 0x10 == 0x10 hasattributes = b & 0x20 == 0x20 c = {'method': file.read(methodsize)} # type: Dict[str, Any] if iscomplex: c['numinstreams'] = read_uint64(file) c['numoutstreams'] = read_uint64(file) else: c['numinstreams'] = 1 c['numoutstreams'] = 1 self.totalin += c['numinstreams'] self.totalout += c['numoutstreams'] if hasattributes: proplen = read_uint64(file) c['properties'] = file.read(proplen) self.coders.append(c) num_bindpairs = self.totalout - 1 for i in range(num_bindpairs): self.bindpairs.append((read_uint64(file), read_uint64(file),)) num_packedstreams = self.totalin - num_bindpairs if num_packedstreams == 1: for i in range(self.totalin): if self._find_in_bin_pair(i) < 0: # there is no in_bin_pair self.packed_indices.append(i) elif num_packedstreams > 1: for i in range(num_packedstreams): self.packed_indices.append(read_uint64(file)) def write(self, file: BinaryIO): num_coders = len(self.coders) assert num_coders > 0 write_uint64(file, num_coders) for i, c in enumerate(self.coders): id = c['method'] # type: bytes id_size = len(id) & 0x0f iscomplex = 0x10 if not self.is_simple(c) else 0x00 hasattributes = 0x20 if c['properties'] is not None else 0x00 flag = struct.pack('B', id_size | iscomplex | hasattributes) write_byte(file, flag) write_bytes(file, id[:id_size]) if not self.is_simple(c): write_uint64(file, c['numinstreams']) assert c['numoutstreams'] == 1 write_uint64(file, c['numoutstreams']) if c['properties'] is not None: write_uint64(file, len(c['properties'])) write_bytes(file, c['properties']) num_bindpairs = self.totalout - 1 assert len(self.bindpairs) == num_bindpairs num_packedstreams = self.totalin - num_bindpairs for bp in self.bindpairs: write_uint64(file, bp[0]) write_uint64(file, bp[1]) if num_packedstreams > 1: for pi in self.packed_indices: write_uint64(file, pi) def is_simple(self, coder): return coder['numinstreams'] == 1 and coder['numoutstreams'] == 1 def get_decompressor(self, size: int, reset: bool = False) -> SevenZipDecompressor: if self.decompressor is not None and not reset: return self.decompressor else: try: self.decompressor = SevenZipDecompressor(self.coders, size, self.crc) except Exception as e: raise e if self.decompressor is not None: return self.decompressor else: raise def get_compressor(self) -> SevenZipCompressor: if self.compressor is not None: return self.compressor else: try: # FIXME: set filters self.compressor = SevenZipCompressor() self.coders = self.compressor.coders return self.compressor except Exception as e: raise e def get_unpack_size(self) -> int: if self.unpacksizes is None: return 0 for i in range(len(self.unpacksizes) - 1, -1, -1): if self._find_out_bin_pair(i): return self.unpacksizes[i] raise TypeError('not found') def _find_in_bin_pair(self, index: int) -> int: for idx, (a, b) in enumerate(self.bindpairs): if a == index: return idx return -1 def _find_out_bin_pair(self, index: int) -> int: for idx, (a, b) in enumerate(self.bindpairs): if b == index: return idx return -1 def is_encrypted(self) -> bool: return CompressionMethod.CRYPT_AES256_SHA256 in [x['method'] for x in self.coders] class UnpackInfo: """ combines multiple folders """ __slots__ = ['numfolders', 'folders', 'datastreamidx'] @classmethod def retrieve(cls, file: BinaryIO): obj = cls() obj._read(file) return obj def __init__(self): self.numfolders = None self.folders = [] self.datastreamidx = None def _read(self, file: BinaryIO): pid = file.read(1) if pid != Property.FOLDER: raise Bad7zFile('folder id expected but %s found' % repr(pid)) self.numfolders = read_uint64(file) self.folders = [] external = read_byte(file) if external == 0x00: self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)] else: datastreamidx = read_uint64(file) current_pos = file.tell() file.seek(datastreamidx, 0) self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)] file.seek(current_pos, 0) self._retrieve_coders_info(file) def _retrieve_coders_info(self, file: BinaryIO): pid = file.read(1) if pid != Property.CODERS_UNPACK_SIZE: raise Bad7zFile('coders unpack size id expected but %s found' % repr(pid)) for folder in self.folders: folder.unpacksizes = [read_uint64(file) for _ in range(folder.totalout)] pid = file.read(1) if pid == Property.CRC: defined = read_boolean(file, self.numfolders, checkall=True) crcs = read_crcs(file, self.numfolders) for idx, folder in enumerate(self.folders): folder.digestdefined = defined[idx] folder.crc = crcs[idx] pid = file.read(1) if pid != Property.END: raise Bad7zFile('end id expected but %s found at %d' % (repr(pid), file.tell())) def write(self, file: BinaryIO): assert self.numfolders is not None assert self.folders is not None assert self.numfolders == len(self.folders) file.write(Property.UNPACK_INFO) file.write(Property.FOLDER) write_uint64(file, self.numfolders) write_byte(file, b'\x00') for folder in self.folders: folder.write(file) # If support external entity, we may write # self.datastreamidx here. # folder data will be written in another place. # write_byte(file, b'\x01') # assert self.datastreamidx is not None # write_uint64(file, self.datastreamidx) write_byte(file, Property.CODERS_UNPACK_SIZE) for folder in self.folders: for i in range(folder.totalout): write_uint64(file, folder.unpacksizes[i]) write_byte(file, Property.END) class SubstreamsInfo: """ defines the substreams of a folder """ __slots__ = ['digests', 'digestsdefined', 'unpacksizes', 'num_unpackstreams_folders'] def __init__(self): self.digests = [] # type: List[int] self.digestsdefined = [] # type: List[bool] self.unpacksizes = None # type: Optional[List[int]] self.num_unpackstreams_folders = [] # type: List[int] @classmethod def retrieve(cls, file: BinaryIO, numfolders: int, folders: List[Folder]): obj = cls() obj._read(file, numfolders, folders) return obj def _read(self, file: BinaryIO, numfolders: int, folders: List[Folder]): pid = file.read(1) if pid == Property.NUM_UNPACK_STREAM: self.num_unpackstreams_folders = [read_uint64(file) for _ in range(numfolders)] pid = file.read(1) else: self.num_unpackstreams_folders = [1] * numfolders if pid == Property.SIZE: self.unpacksizes = [] for i in range(len(self.num_unpackstreams_folders)): totalsize = 0 # type: int for j in range(1, self.num_unpackstreams_folders[i]): size = read_uint64(file) self.unpacksizes.append(size) totalsize += size self.unpacksizes.append(folders[i].get_unpack_size() - totalsize) pid = file.read(1) num_digests = 0 num_digests_total = 0 for i in range(numfolders): numsubstreams = self.num_unpackstreams_folders[i] if numsubstreams != 1 or not folders[i].digestdefined: num_digests += numsubstreams num_digests_total += numsubstreams if pid == Property.CRC: defined = read_boolean(file, num_digests, checkall=True) crcs = read_crcs(file, num_digests) didx = 0 for i in range(numfolders): folder = folders[i] numsubstreams = self.num_unpackstreams_folders[i] if numsubstreams == 1 and folder.digestdefined and folder.crc is not None: self.digestsdefined.append(True) self.digests.append(folder.crc) else: for j in range(numsubstreams): self.digestsdefined.append(defined[didx]) self.digests.append(crcs[didx]) didx += 1 pid = file.read(1) if pid != Property.END: raise Bad7zFile('end id expected but %r found' % pid) if not self.digestsdefined: self.digestsdefined = [False] * num_digests_total self.digests = [0] * num_digests_total def write(self, file: BinaryIO, numfolders: int): assert self.num_unpackstreams_folders is not None if len(self.num_unpackstreams_folders) == 0: # nothing to write return if self.unpacksizes is None: raise ValueError write_byte(file, Property.SUBSTREAMS_INFO) if not functools.reduce(lambda x, y: x and (y == 1), self.num_unpackstreams_folders, True): write_byte(file, Property.NUM_UNPACK_STREAM) for n in self.num_unpackstreams_folders: write_uint64(file, n) write_byte(file, Property.SIZE) idx = 0 for i in range(numfolders): for j in range(1, self.num_unpackstreams_folders[i]): size = self.unpacksizes[idx] write_uint64(file, size) idx += 1 idx += 1 if functools.reduce(lambda x, y: x or y, self.digestsdefined, False): write_byte(file, Property.CRC) write_boolean(file, self.digestsdefined, all_defined=True) write_crcs(file, self.digests) write_byte(file, Property.END) class StreamsInfo: """ information about compressed streams """ __slots__ = ['packinfo', 'unpackinfo', 'substreamsinfo'] def __init__(self): self.packinfo = None # type: PackInfo self.unpackinfo = None # type: UnpackInfo self.substreamsinfo = None # type: Optional[SubstreamsInfo] @classmethod def retrieve(cls, file: BinaryIO): obj = cls() obj.read(file) return obj def read(self, file: BinaryIO) -> None: pid = file.read(1) if pid == Property.PACK_INFO: self.packinfo = PackInfo.retrieve(file) pid = file.read(1) if pid == Property.UNPACK_INFO: self.unpackinfo = UnpackInfo.retrieve(file) pid = file.read(1) if pid == Property.SUBSTREAMS_INFO: self.substreamsinfo = SubstreamsInfo.retrieve(file, self.unpackinfo.numfolders, self.unpackinfo.folders) pid = file.read(1) if pid != Property.END: raise Bad7zFile('end id expected but %s found' % repr(pid)) def write(self, file: BinaryIO): write_byte(file, Property.MAIN_STREAMS_INFO) self._write(file) def _write(self, file: BinaryIO): if self.packinfo is not None: self.packinfo.write(file) if self.unpackinfo is not None: self.unpackinfo.write(file) if self.substreamsinfo is not None: self.substreamsinfo.write(file, self.unpackinfo.numfolders) write_byte(file, Property.END) class HeaderStreamsInfo(StreamsInfo): def __init__(self): super().__init__() self.packinfo = PackInfo() self.unpackinfo = UnpackInfo() folder = Folder() folder.compressor = SevenZipCompressor() folder.coders = folder.compressor.coders folder.solid = False folder.digestdefined = False folder.bindpairs = [] folder.totalin = 1 folder.totalout = 1 folder.digestdefined = [True] self.unpackinfo.numfolders = 1 self.unpackinfo.folders = [folder] def write(self, file: BinaryIO): self._write(file) class FilesInfo: """ holds file properties """ __slots__ = ['files', 'emptyfiles', 'antifiles'] def __init__(self): self.files = [] # type: List[Dict[str, Any]] self.emptyfiles = [] # type: List[bool] self.antifiles = None @classmethod def retrieve(cls, file: BinaryIO): obj = cls() obj._read(file) return obj def _read(self, fp: BinaryIO): numfiles = read_uint64(fp) self.files = [{'emptystream': False} for _ in range(numfiles)] numemptystreams = 0 while True: prop = fp.read(1) if prop == Property.END: break size = read_uint64(fp) if prop == Property.DUMMY: # Added by newer versions of 7z to adjust padding. fp.seek(size, os.SEEK_CUR) continue buffer = io.BytesIO(fp.read(size)) if prop == Property.EMPTY_STREAM: isempty = read_boolean(buffer, numfiles, checkall=False) list(map(lambda x, y: x.update({'emptystream': y}), self.files, isempty)) # type: ignore numemptystreams += isempty.count(True) elif prop == Property.EMPTY_FILE: self.emptyfiles = read_boolean(buffer, numemptystreams, checkall=False) elif prop == Property.ANTI: self.antifiles = read_boolean(buffer, numemptystreams, checkall=False) elif prop == Property.NAME: external = buffer.read(1) if external == b'\x00': self._read_name(buffer) else: dataindex = read_uint64(buffer) current_pos = fp.tell() fp.seek(dataindex, 0) self._read_name(fp) fp.seek(current_pos, 0) elif prop == Property.CREATION_TIME: self._read_times(buffer, 'creationtime') elif prop == Property.LAST_ACCESS_TIME: self._read_times(buffer, 'lastaccesstime') elif prop == Property.LAST_WRITE_TIME: self._read_times(buffer, 'lastwritetime') elif prop == Property.ATTRIBUTES: defined = read_boolean(buffer, numfiles, checkall=True) external = buffer.read(1) if external == b'\x00': self._read_attributes(buffer, defined) else: dataindex = read_uint64(buffer) # try to read external data current_pos = fp.tell() fp.seek(dataindex, 0) self._read_attributes(fp, defined) fp.seek(current_pos, 0) elif prop == Property.START_POS: self._read_start_pos(buffer) else: raise Bad7zFile('invalid type %r' % prop) def _read_name(self, buffer: BinaryIO) -> None: for f in self.files: f['filename'] = read_utf16(buffer).replace('\\', '/') def _read_attributes(self, buffer: BinaryIO, defined: List[bool]) -> None: for idx, f in enumerate(self.files): f['attributes'] = read_uint32(buffer)[0] if defined[idx] else None def _read_times(self, fp: BinaryIO, name: str) -> None: defined = read_boolean(fp, len(self.files), checkall=True) # NOTE: the "external" flag is currently ignored, should be 0x00 external = fp.read(1) assert external == b'\x00' for i, f in enumerate(self.files): f[name] = ArchiveTimestamp(read_real_uint64(fp)[0]) if defined[i] else None def _read_start_pos(self, fp: BinaryIO) -> None: defined = read_boolean(fp, len(self.files), checkall=True) # NOTE: the "external" flag is currently ignored, should be 0x00 external = fp.read(1) assert external == 0x00 for i, f in enumerate(self.files): f['startpos'] = read_real_uint64(fp)[0] if defined[i] else None def _write_times(self, fp: BinaryIO, propid, name: str) -> None: write_byte(fp, propid) defined = [] # type: List[bool] num_defined = 0 # type: int for f in self.files: if name in f.keys(): if f[name] is not None: defined.append(True) num_defined += 1 size = num_defined * 8 + 2 if not reduce(and_, defined, True): size += bits_to_bytes(num_defined) write_uint64(fp, size) write_boolean(fp, defined, all_defined=True) write_byte(fp, b'\x00') for i, file in enumerate(self.files): if defined[i]: write_real_uint64(fp, ArchiveTimestamp.from_datetime(file[name])) else: pass def _write_prop_bool_vector(self, fp: BinaryIO, propid, vector) -> None: write_byte(fp, propid) write_boolean(fp, vector, all_defined=True) @staticmethod def _are_there(vector) -> bool: if vector is not None: if functools.reduce(or_, vector, False): return True return False def _write_names(self, file: BinaryIO): name_defined = 0 names = [] name_size = 0 for f in self.files: if f.get('filename', None) is not None: name_defined += 1 names.append(f['filename']) name_size += len(f['filename'].encode('utf-16LE')) + 2 # len(str + NULL_WORD) if name_defined > 0: write_byte(file, Property.NAME) write_uint64(file, name_size + 1) write_byte(file, b'\x00') for n in names: write_utf16(file, n) def _write_attributes(self, file): defined = [] # type: List[bool] num_defined = 0 for f in self.files: if 'attributes' in f.keys() and f['attributes'] is not None: defined.append(True) num_defined += 1 else: defined.append(False) size = num_defined * 4 + 2 if num_defined != len(defined): size += bits_to_bytes(num_defined) write_byte(file, Property.ATTRIBUTES) write_uint64(file, size) write_boolean(file, defined, all_defined=True) write_byte(file, b'\x00') for i, f in enumerate(self.files): if defined[i]: write_uint32(file, f['attributes']) def write(self, file: BinaryIO): assert self.files is not None write_byte(file, Property.FILES_INFO) numfiles = len(self.files) write_uint64(file, numfiles) emptystreams = [] # List[bool] for f in self.files: emptystreams.append(f['emptystream']) if self._are_there(emptystreams): write_byte(file, Property.EMPTY_STREAM) write_uint64(file, bits_to_bytes(numfiles)) write_boolean(file, emptystreams, all_defined=False) else: if self._are_there(self.emptyfiles): self._write_prop_bool_vector(file, Property.EMPTY_FILE, self.emptyfiles) if self._are_there(self.antifiles): self._write_prop_bool_vector(file, Property.ANTI, self.antifiles) # Name self._write_names(file) # timestamps self._write_times(file, Property.CREATION_TIME, 'creationtime') self._write_times(file, Property.LAST_ACCESS_TIME, 'lastaccesstime') self._write_times(file, Property.LAST_WRITE_TIME, 'lastwritetime') # start_pos # FIXME: TBD # attribute self._write_attributes(file) write_byte(file, Property.END) class Header: """ the archive header """ __slot__ = ['solid', 'properties', 'additional_streams', 'main_streams', 'files_info', 'size', '_start_pos'] def __init__(self) -> None: self.solid = False self.properties = None self.additional_streams = None self.main_streams = None self.files_info = None self.size = 0 # fixme. Not implemented yet self._start_pos = 0 @classmethod def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int): obj = cls() obj._read(fp, buffer, start_pos) return obj def _read(self, fp: BinaryIO, buffer: BytesIO, start_pos: int) -> None: self._start_pos = start_pos fp.seek(self._start_pos) self._decode_header(fp, buffer) def _decode_header(self, fp: BinaryIO, buffer: BytesIO) -> None: """ Decode header data or encoded header data from buffer. When buffer consist of encoded buffer, it get stream data from it and call itself recursively """ pid = buffer.read(1) if not pid: # empty archive return elif pid == Property.HEADER: self._extract_header_info(buffer) return elif pid != Property.ENCODED_HEADER: raise TypeError('Unknown field: %r' % id) # get from encoded header streams = HeaderStreamsInfo.retrieve(buffer) self._decode_header(fp, self._get_headerdata_from_streams(fp, streams)) def _get_headerdata_from_streams(self, fp: BinaryIO, streams: StreamsInfo) -> BytesIO: """get header data from given streams.unpackinfo and packinfo. folder data are stored in raw data positioned in afterheader.""" buffer = io.BytesIO() src_start = self._start_pos for folder in streams.unpackinfo.folders: if folder.is_encrypted(): raise UnsupportedCompressionMethodError() uncompressed = folder.unpacksizes if not isinstance(uncompressed, (list, tuple)): uncompressed = [uncompressed] * len(folder.coders) compressed_size = streams.packinfo.packsizes[0] uncompressed_size = uncompressed[-1] src_start += streams.packinfo.packpos fp.seek(src_start, 0) decompressor = folder.get_decompressor(compressed_size) folder_data = decompressor.decompress(fp.read(compressed_size))[:uncompressed_size] src_start += uncompressed_size if folder.digestdefined: if folder.crc != calculate_crc32(folder_data): raise Bad7zFile('invalid block data') buffer.write(folder_data) buffer.seek(0, 0) return buffer def _encode_header(self, file: BinaryIO, afterheader: int): startpos = file.tell() packpos = startpos - afterheader buf = io.BytesIO() _, raw_header_len, raw_crc = self.write(buf, 0, False) streams = HeaderStreamsInfo() streams.packinfo.packpos = packpos folder = streams.unpackinfo.folders[0] folder.crc = [raw_crc] folder.unpacksizes = [raw_header_len] compressed_len = 0 buf.seek(0, 0) data = buf.read(io.DEFAULT_BUFFER_SIZE) while data: out = folder.compressor.compress(data) compressed_len += len(out) file.write(out) data = buf.read(io.DEFAULT_BUFFER_SIZE) out = folder.compressor.flush() compressed_len += len(out) file.write(out) # streams.packinfo.packsizes = [compressed_len] # actual header start position startpos = file.tell() write_byte(file, Property.ENCODED_HEADER) streams.write(file) write_byte(file, Property.END) return startpos def write(self, file: BinaryIO, afterheader: int, encoded: bool = True): startpos = file.tell() if encoded: startpos = self._encode_header(file, afterheader) else: write_byte(file, Property.HEADER) # Archive properties if self.main_streams is not None: self.main_streams.write(file) # Files Info if self.files_info is not None: self.files_info.write(file) if self.properties is not None: self.properties.write(file) # AdditionalStreams if self.additional_streams is not None: self.additional_streams.write(file) write_byte(file, Property.END) endpos = file.tell() header_len = endpos - startpos file.seek(startpos, io.SEEK_SET) crc = calculate_crc32(file.read(header_len)) file.seek(endpos, io.SEEK_SET) return startpos, header_len, crc def _extract_header_info(self, fp: BinaryIO) -> None: pid = fp.read(1) if pid == Property.ARCHIVE_PROPERTIES: self.properties = ArchiveProperties.retrieve(fp) pid = fp.read(1) if pid == Property.ADDITIONAL_STREAMS_INFO: self.additional_streams = StreamsInfo.retrieve(fp) pid = fp.read(1) if pid == Property.MAIN_STREAMS_INFO: self.main_streams = StreamsInfo.retrieve(fp) pid = fp.read(1) if pid == Property.FILES_INFO: self.files_info = FilesInfo.retrieve(fp) pid = fp.read(1) if pid != Property.END: raise Bad7zFile('end id expected but %s found' % (repr(pid))) @staticmethod def build_header(folders): header = Header() header.files_info = FilesInfo() header.main_streams = StreamsInfo() header.main_streams.packinfo = PackInfo() header.main_streams.packinfo.numstreams = 0 header.main_streams.packinfo.packpos = 0 header.main_streams.unpackinfo = UnpackInfo() header.main_streams.unpackinfo.numfolders = len(folders) header.main_streams.unpackinfo.folders = folders header.main_streams.substreamsinfo = SubstreamsInfo() header.main_streams.substreamsinfo.num_unpackstreams_folders = [len(folders)] header.main_streams.substreamsinfo.unpacksizes = [] return header class SignatureHeader: """The SignatureHeader class hold information of a signature header of archive.""" __slots__ = ['version', 'startheadercrc', 'nextheaderofs', 'nextheadersize', 'nextheadercrc'] def __init__(self) -> None: self.version = (P7ZIP_MAJOR_VERSION, P7ZIP_MINOR_VERSION) # type: Tuple[bytes, ...] self.startheadercrc = None # type: Optional[int] self.nextheaderofs = None # type: Optional[int] self.nextheadersize = None # type: Optional[int] self.nextheadercrc = None # type: Optional[int] @classmethod def retrieve(cls, file: BinaryIO): obj = cls() obj._read(file) return obj def _read(self, file: BinaryIO) -> None: file.seek(len(MAGIC_7Z), 0) self.version = read_bytes(file, 2) self.startheadercrc, _ = read_uint32(file) self.nextheaderofs, data = read_real_uint64(file) crc = calculate_crc32(data) self.nextheadersize, data = read_real_uint64(file) crc = calculate_crc32(data, crc) self.nextheadercrc, data = read_uint32(file) crc = calculate_crc32(data, crc) if crc != self.startheadercrc: raise Bad7zFile('invalid header data') def calccrc(self, length: int, header_crc: int): self.nextheadersize = length self.nextheadercrc = header_crc assert self.nextheaderofs is not None buf = io.BytesIO() write_real_uint64(buf, self.nextheaderofs) write_real_uint64(buf, self.nextheadersize) write_uint32(buf, self.nextheadercrc) startdata = buf.getvalue() self.startheadercrc = calculate_crc32(startdata) def write(self, file: BinaryIO): assert self.startheadercrc is not None assert self.nextheadercrc is not None assert self.nextheaderofs is not None assert self.nextheadersize is not None file.seek(0, 0) write_bytes(file, MAGIC_7Z) write_byte(file, self.version[0]) write_byte(file, self.version[1]) write_uint32(file, self.startheadercrc) write_real_uint64(file, self.nextheaderofs) write_real_uint64(file, self.nextheadersize) write_uint32(file, self.nextheadercrc) def _write_skelton(self, file: BinaryIO): file.seek(0, 0) write_bytes(file, MAGIC_7Z) write_byte(file, self.version[0]) write_byte(file, self.version[1]) write_uint32(file, 1) write_real_uint64(file, 2) write_real_uint64(file, 3) write_uint32(file, 4) class FinishHeader(): """Finish header for multi-volume 7z file.""" def __init__(self): self.archive_start_offset = None # data offset from end of the finish header self.additional_start_block_size = None # start signature & start header size self.finish_header_size = 20 + 16 @classmethod def retrieve(cls, file): obj = cls() obj._read(file) return obj def _read(self, file): self.archive_start_offset = read_uint64(file) self.additional_start_block_size = read_uint64(file)