bazarr/libs/py7zr/archiveinfo.py

1104 lines
40 KiB
Python

#!/usr/bin/python -u
#
# p7zr library
#
# Copyright (c) 2019,2020 Hiroshi Miura <miurahr@linux.com>
# Copyright (c) 2004-2015 by Joachim Bauch, mail@joachim-bauch.de
# 7-Zip Copyright (C) 1999-2010 Igor Pavlov
# LZMA SDK Copyright (C) 1999-2010 Igor Pavlov
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import functools
import io
import os
import struct
from binascii import unhexlify
from functools import reduce
from io import BytesIO
from operator import and_, or_
from struct import pack, unpack
from typing import Any, BinaryIO, Dict, List, Optional, Tuple
from py7zr.compression import SevenZipCompressor, SevenZipDecompressor
from py7zr.exceptions import Bad7zFile, UnsupportedCompressionMethodError
from py7zr.helpers import ArchiveTimestamp, calculate_crc32
from py7zr.properties import MAGIC_7Z, CompressionMethod, Property
MAX_LENGTH = 65536
P7ZIP_MAJOR_VERSION = b'\x00'
P7ZIP_MINOR_VERSION = b'\x04'
def read_crcs(file: BinaryIO, count: int) -> List[int]:
data = file.read(4 * count)
return [unpack('<L', data[i * 4:i * 4 + 4])[0] for i in range(count)]
def write_crcs(file: BinaryIO, crcs):
for crc in crcs:
write_uint32(file, crc)
def read_bytes(file: BinaryIO, length: int) -> Tuple[bytes, ...]:
return unpack(b'B' * length, file.read(length))
def read_byte(file: BinaryIO) -> int:
return ord(file.read(1))
def write_bytes(file: BinaryIO, data: bytes):
return file.write(data)
def write_byte(file: BinaryIO, data):
assert len(data) == 1
return write_bytes(file, data)
def read_real_uint64(file: BinaryIO) -> Tuple[int, bytes]:
"""read 8 bytes, return unpacked value as a little endian unsigned long long, and raw data."""
res = file.read(8)
a = unpack('<Q', res)[0]
return a, res
def read_uint32(file: BinaryIO) -> Tuple[int, bytes]:
"""read 4 bytes, return unpacked value as a little endian unsigned long, and raw data."""
res = file.read(4)
a = unpack('<L', res)[0]
return a, res
def write_uint32(file: BinaryIO, value):
"""write uint32 value in 4 bytes."""
b = pack('<L', value)
file.write(b)
def read_uint64(file: BinaryIO) -> int:
"""read UINT64, definition show in write_uint64()"""
b = ord(file.read(1))
if b == 255:
return read_real_uint64(file)[0]
blen = [(0b01111111, 0), (0b10111111, 1), (0b11011111, 2), (0b11101111, 3),
(0b11110111, 4), (0b11111011, 5), (0b11111101, 6), (0b11111110, 7)]
mask = 0x80
vlen = 8
for v, l in blen:
if b <= v:
vlen = l
break
mask >>= 1
if vlen == 0:
return b & (mask - 1)
val = file.read(vlen)
value = int.from_bytes(val, byteorder='little')
highpart = b & (mask - 1)
return value + (highpart << (vlen * 8))
def write_real_uint64(file: BinaryIO, value: int):
"""write 8 bytes, as an unsigned long long."""
file.write(pack('<Q', value))
def write_uint64(file: BinaryIO, value: int):
"""
UINT64 means real UINT64 encoded with the following scheme:
| Size of encoding sequence depends from first byte:
| First_Byte Extra_Bytes Value
| (binary)
| 0xxxxxxx : ( xxxxxxx )
| 10xxxxxx BYTE y[1] : ( xxxxxx << (8 * 1)) + y
| 110xxxxx BYTE y[2] : ( xxxxx << (8 * 2)) + y
| ...
| 1111110x BYTE y[6] : ( x << (8 * 6)) + y
| 11111110 BYTE y[7] : y
| 11111111 BYTE y[8] : y
"""
if value < 0x80:
file.write(pack('B', value))
return
if value > 0x01ffffffffffffff:
file.write(b'\xff')
file.write(value.to_bytes(8, 'little'))
return
byte_length = (value.bit_length() + 7) // 8
ba = bytearray(value.to_bytes(byte_length, 'little'))
high_byte = int(ba[-1])
if high_byte < 2 << (8 - byte_length - 1):
for x in range(byte_length - 1):
high_byte |= 0x80 >> x
file.write(pack('B', high_byte))
file.write(ba[:byte_length - 1])
else:
mask = 0x80
for x in range(byte_length):
mask |= 0x80 >> x
file.write(pack('B', mask))
file.write(ba)
def read_boolean(file: BinaryIO, count: int, checkall: bool = False) -> List[bool]:
if checkall:
all_defined = file.read(1)
if all_defined != unhexlify('00'):
return [True] * count
result = []
b = 0
mask = 0
for i in range(count):
if mask == 0:
b = ord(file.read(1))
mask = 0x80
result.append(b & mask != 0)
mask >>= 1
return result
def write_boolean(file: BinaryIO, booleans: List[bool], all_defined: bool = False):
if all_defined and reduce(and_, booleans, True):
file.write(b'\x01')
return
elif all_defined:
file.write(b'\x00')
o = bytearray(-(-len(booleans) // 8))
for i, b in enumerate(booleans):
if b:
o[i // 8] |= 1 << (7 - i % 8)
file.write(o)
def read_utf16(file: BinaryIO) -> str:
"""read a utf-16 string from file"""
val = ''
for _ in range(MAX_LENGTH):
ch = file.read(2)
if ch == unhexlify('0000'):
break
val += ch.decode('utf-16LE')
return val
def write_utf16(file: BinaryIO, val: str):
"""write a utf-16 string to file"""
for c in val:
file.write(c.encode('utf-16LE'))
file.write(b'\x00\x00')
def bits_to_bytes(bit_length: int) -> int:
return - (-bit_length // 8)
class ArchiveProperties:
__slots__ = ['property_data']
def __init__(self):
self.property_data = []
@classmethod
def retrieve(cls, file):
return cls()._read(file)
def _read(self, file):
pid = file.read(1)
if pid == Property.ARCHIVE_PROPERTIES:
while True:
ptype = file.read(1)
if ptype == Property.END:
break
size = read_uint64(file)
props = read_bytes(file, size)
self.property_data.append(props)
return self
def write(self, file):
if len(self.property_data) > 0:
write_byte(file, Property.ARCHIVE_PROPERTIES)
for data in self.property_data:
write_uint64(file, len(data))
write_bytes(file, data)
write_byte(file, Property.END)
class PackInfo:
""" information about packed streams """
__slots__ = ['packpos', 'numstreams', 'packsizes', 'packpositions', 'crcs']
def __init__(self) -> None:
self.packpos = 0 # type: int
self.numstreams = 0 # type: int
self.packsizes = [] # type: List[int]
self.crcs = None # type: Optional[List[int]]
@classmethod
def retrieve(cls, file: BinaryIO):
return cls()._read(file)
def _read(self, file: BinaryIO):
self.packpos = read_uint64(file)
self.numstreams = read_uint64(file)
pid = file.read(1)
if pid == Property.SIZE:
self.packsizes = [read_uint64(file) for _ in range(self.numstreams)]
pid = file.read(1)
if pid == Property.CRC:
self.crcs = [read_uint64(file) for _ in range(self.numstreams)]
pid = file.read(1)
if pid != Property.END:
raise Bad7zFile('end id expected but %s found' % repr(pid))
self.packpositions = [sum(self.packsizes[:i]) for i in range(self.numstreams + 1)] # type: List[int]
return self
def write(self, file: BinaryIO):
assert self.packpos is not None
numstreams = len(self.packsizes)
assert self.crcs is None or len(self.crcs) == numstreams
write_byte(file, Property.PACK_INFO)
write_uint64(file, self.packpos)
write_uint64(file, numstreams)
write_byte(file, Property.SIZE)
for size in self.packsizes:
write_uint64(file, size)
if self.crcs is not None:
write_bytes(file, Property.CRC)
for crc in self.crcs:
write_uint64(file, crc)
write_byte(file, Property.END)
class Folder:
""" a "Folder" represents a stream of compressed data.
coders: list of coder
num_coders: length of coders
coder: hash list
keys of coders: method, numinstreams, numoutstreams, properties
unpacksizes: uncompressed sizes of outstreams
"""
__slots__ = ['unpacksizes', 'solid', 'coders', 'digestdefined', 'totalin', 'totalout',
'bindpairs', 'packed_indices', 'crc', 'decompressor', 'compressor', 'files']
def __init__(self) -> None:
self.unpacksizes = None # type: Optional[List[int]]
self.coders = [] # type: List[Dict[str, Any]]
self.bindpairs = [] # type: List[Any]
self.packed_indices = [] # type: List[int]
# calculated values
self.totalin = 0 # type: int
self.totalout = 0 # type: int
# internal values
self.solid = False # type: bool
self.digestdefined = False # type: bool
self.crc = None # type: Optional[int]
# compress/decompress objects
self.decompressor = None # type: Optional[SevenZipDecompressor]
self.compressor = None # type: Optional[SevenZipCompressor]
self.files = None
@classmethod
def retrieve(cls, file: BinaryIO):
obj = cls()
obj._read(file)
return obj
def _read(self, file: BinaryIO) -> None:
num_coders = read_uint64(file)
for _ in range(num_coders):
b = read_byte(file)
methodsize = b & 0xf
iscomplex = b & 0x10 == 0x10
hasattributes = b & 0x20 == 0x20
c = {'method': file.read(methodsize)} # type: Dict[str, Any]
if iscomplex:
c['numinstreams'] = read_uint64(file)
c['numoutstreams'] = read_uint64(file)
else:
c['numinstreams'] = 1
c['numoutstreams'] = 1
self.totalin += c['numinstreams']
self.totalout += c['numoutstreams']
if hasattributes:
proplen = read_uint64(file)
c['properties'] = file.read(proplen)
self.coders.append(c)
num_bindpairs = self.totalout - 1
for i in range(num_bindpairs):
self.bindpairs.append((read_uint64(file), read_uint64(file),))
num_packedstreams = self.totalin - num_bindpairs
if num_packedstreams == 1:
for i in range(self.totalin):
if self._find_in_bin_pair(i) < 0: # there is no in_bin_pair
self.packed_indices.append(i)
elif num_packedstreams > 1:
for i in range(num_packedstreams):
self.packed_indices.append(read_uint64(file))
def write(self, file: BinaryIO):
num_coders = len(self.coders)
assert num_coders > 0
write_uint64(file, num_coders)
for i, c in enumerate(self.coders):
id = c['method'] # type: bytes
id_size = len(id) & 0x0f
iscomplex = 0x10 if not self.is_simple(c) else 0x00
hasattributes = 0x20 if c['properties'] is not None else 0x00
flag = struct.pack('B', id_size | iscomplex | hasattributes)
write_byte(file, flag)
write_bytes(file, id[:id_size])
if not self.is_simple(c):
write_uint64(file, c['numinstreams'])
assert c['numoutstreams'] == 1
write_uint64(file, c['numoutstreams'])
if c['properties'] is not None:
write_uint64(file, len(c['properties']))
write_bytes(file, c['properties'])
num_bindpairs = self.totalout - 1
assert len(self.bindpairs) == num_bindpairs
num_packedstreams = self.totalin - num_bindpairs
for bp in self.bindpairs:
write_uint64(file, bp[0])
write_uint64(file, bp[1])
if num_packedstreams > 1:
for pi in self.packed_indices:
write_uint64(file, pi)
def is_simple(self, coder):
return coder['numinstreams'] == 1 and coder['numoutstreams'] == 1
def get_decompressor(self, size: int, reset: bool = False) -> SevenZipDecompressor:
if self.decompressor is not None and not reset:
return self.decompressor
else:
try:
self.decompressor = SevenZipDecompressor(self.coders, size, self.crc)
except Exception as e:
raise e
if self.decompressor is not None:
return self.decompressor
else:
raise
def get_compressor(self) -> SevenZipCompressor:
if self.compressor is not None:
return self.compressor
else:
try:
# FIXME: set filters
self.compressor = SevenZipCompressor()
self.coders = self.compressor.coders
return self.compressor
except Exception as e:
raise e
def get_unpack_size(self) -> int:
if self.unpacksizes is None:
return 0
for i in range(len(self.unpacksizes) - 1, -1, -1):
if self._find_out_bin_pair(i):
return self.unpacksizes[i]
raise TypeError('not found')
def _find_in_bin_pair(self, index: int) -> int:
for idx, (a, b) in enumerate(self.bindpairs):
if a == index:
return idx
return -1
def _find_out_bin_pair(self, index: int) -> int:
for idx, (a, b) in enumerate(self.bindpairs):
if b == index:
return idx
return -1
def is_encrypted(self) -> bool:
return CompressionMethod.CRYPT_AES256_SHA256 in [x['method'] for x in self.coders]
class UnpackInfo:
""" combines multiple folders """
__slots__ = ['numfolders', 'folders', 'datastreamidx']
@classmethod
def retrieve(cls, file: BinaryIO):
obj = cls()
obj._read(file)
return obj
def __init__(self):
self.numfolders = None
self.folders = []
self.datastreamidx = None
def _read(self, file: BinaryIO):
pid = file.read(1)
if pid != Property.FOLDER:
raise Bad7zFile('folder id expected but %s found' % repr(pid))
self.numfolders = read_uint64(file)
self.folders = []
external = read_byte(file)
if external == 0x00:
self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)]
else:
datastreamidx = read_uint64(file)
current_pos = file.tell()
file.seek(datastreamidx, 0)
self.folders = [Folder.retrieve(file) for _ in range(self.numfolders)]
file.seek(current_pos, 0)
self._retrieve_coders_info(file)
def _retrieve_coders_info(self, file: BinaryIO):
pid = file.read(1)
if pid != Property.CODERS_UNPACK_SIZE:
raise Bad7zFile('coders unpack size id expected but %s found' % repr(pid))
for folder in self.folders:
folder.unpacksizes = [read_uint64(file) for _ in range(folder.totalout)]
pid = file.read(1)
if pid == Property.CRC:
defined = read_boolean(file, self.numfolders, checkall=True)
crcs = read_crcs(file, self.numfolders)
for idx, folder in enumerate(self.folders):
folder.digestdefined = defined[idx]
folder.crc = crcs[idx]
pid = file.read(1)
if pid != Property.END:
raise Bad7zFile('end id expected but %s found at %d' % (repr(pid), file.tell()))
def write(self, file: BinaryIO):
assert self.numfolders is not None
assert self.folders is not None
assert self.numfolders == len(self.folders)
file.write(Property.UNPACK_INFO)
file.write(Property.FOLDER)
write_uint64(file, self.numfolders)
write_byte(file, b'\x00')
for folder in self.folders:
folder.write(file)
# If support external entity, we may write
# self.datastreamidx here.
# folder data will be written in another place.
# write_byte(file, b'\x01')
# assert self.datastreamidx is not None
# write_uint64(file, self.datastreamidx)
write_byte(file, Property.CODERS_UNPACK_SIZE)
for folder in self.folders:
for i in range(folder.totalout):
write_uint64(file, folder.unpacksizes[i])
write_byte(file, Property.END)
class SubstreamsInfo:
""" defines the substreams of a folder """
__slots__ = ['digests', 'digestsdefined', 'unpacksizes', 'num_unpackstreams_folders']
def __init__(self):
self.digests = [] # type: List[int]
self.digestsdefined = [] # type: List[bool]
self.unpacksizes = None # type: Optional[List[int]]
self.num_unpackstreams_folders = [] # type: List[int]
@classmethod
def retrieve(cls, file: BinaryIO, numfolders: int, folders: List[Folder]):
obj = cls()
obj._read(file, numfolders, folders)
return obj
def _read(self, file: BinaryIO, numfolders: int, folders: List[Folder]):
pid = file.read(1)
if pid == Property.NUM_UNPACK_STREAM:
self.num_unpackstreams_folders = [read_uint64(file) for _ in range(numfolders)]
pid = file.read(1)
else:
self.num_unpackstreams_folders = [1] * numfolders
if pid == Property.SIZE:
self.unpacksizes = []
for i in range(len(self.num_unpackstreams_folders)):
totalsize = 0 # type: int
for j in range(1, self.num_unpackstreams_folders[i]):
size = read_uint64(file)
self.unpacksizes.append(size)
totalsize += size
self.unpacksizes.append(folders[i].get_unpack_size() - totalsize)
pid = file.read(1)
num_digests = 0
num_digests_total = 0
for i in range(numfolders):
numsubstreams = self.num_unpackstreams_folders[i]
if numsubstreams != 1 or not folders[i].digestdefined:
num_digests += numsubstreams
num_digests_total += numsubstreams
if pid == Property.CRC:
defined = read_boolean(file, num_digests, checkall=True)
crcs = read_crcs(file, num_digests)
didx = 0
for i in range(numfolders):
folder = folders[i]
numsubstreams = self.num_unpackstreams_folders[i]
if numsubstreams == 1 and folder.digestdefined and folder.crc is not None:
self.digestsdefined.append(True)
self.digests.append(folder.crc)
else:
for j in range(numsubstreams):
self.digestsdefined.append(defined[didx])
self.digests.append(crcs[didx])
didx += 1
pid = file.read(1)
if pid != Property.END:
raise Bad7zFile('end id expected but %r found' % pid)
if not self.digestsdefined:
self.digestsdefined = [False] * num_digests_total
self.digests = [0] * num_digests_total
def write(self, file: BinaryIO, numfolders: int):
assert self.num_unpackstreams_folders is not None
if len(self.num_unpackstreams_folders) == 0:
# nothing to write
return
if self.unpacksizes is None:
raise ValueError
write_byte(file, Property.SUBSTREAMS_INFO)
if not functools.reduce(lambda x, y: x and (y == 1), self.num_unpackstreams_folders, True):
write_byte(file, Property.NUM_UNPACK_STREAM)
for n in self.num_unpackstreams_folders:
write_uint64(file, n)
write_byte(file, Property.SIZE)
idx = 0
for i in range(numfolders):
for j in range(1, self.num_unpackstreams_folders[i]):
size = self.unpacksizes[idx]
write_uint64(file, size)
idx += 1
idx += 1
if functools.reduce(lambda x, y: x or y, self.digestsdefined, False):
write_byte(file, Property.CRC)
write_boolean(file, self.digestsdefined, all_defined=True)
write_crcs(file, self.digests)
write_byte(file, Property.END)
class StreamsInfo:
""" information about compressed streams """
__slots__ = ['packinfo', 'unpackinfo', 'substreamsinfo']
def __init__(self):
self.packinfo = None # type: PackInfo
self.unpackinfo = None # type: UnpackInfo
self.substreamsinfo = None # type: Optional[SubstreamsInfo]
@classmethod
def retrieve(cls, file: BinaryIO):
obj = cls()
obj.read(file)
return obj
def read(self, file: BinaryIO) -> None:
pid = file.read(1)
if pid == Property.PACK_INFO:
self.packinfo = PackInfo.retrieve(file)
pid = file.read(1)
if pid == Property.UNPACK_INFO:
self.unpackinfo = UnpackInfo.retrieve(file)
pid = file.read(1)
if pid == Property.SUBSTREAMS_INFO:
self.substreamsinfo = SubstreamsInfo.retrieve(file, self.unpackinfo.numfolders, self.unpackinfo.folders)
pid = file.read(1)
if pid != Property.END:
raise Bad7zFile('end id expected but %s found' % repr(pid))
def write(self, file: BinaryIO):
write_byte(file, Property.MAIN_STREAMS_INFO)
self._write(file)
def _write(self, file: BinaryIO):
if self.packinfo is not None:
self.packinfo.write(file)
if self.unpackinfo is not None:
self.unpackinfo.write(file)
if self.substreamsinfo is not None:
self.substreamsinfo.write(file, self.unpackinfo.numfolders)
write_byte(file, Property.END)
class HeaderStreamsInfo(StreamsInfo):
def __init__(self):
super().__init__()
self.packinfo = PackInfo()
self.unpackinfo = UnpackInfo()
folder = Folder()
folder.compressor = SevenZipCompressor()
folder.coders = folder.compressor.coders
folder.solid = False
folder.digestdefined = False
folder.bindpairs = []
folder.totalin = 1
folder.totalout = 1
folder.digestdefined = [True]
self.unpackinfo.numfolders = 1
self.unpackinfo.folders = [folder]
def write(self, file: BinaryIO):
self._write(file)
class FilesInfo:
""" holds file properties """
__slots__ = ['files', 'emptyfiles', 'antifiles']
def __init__(self):
self.files = [] # type: List[Dict[str, Any]]
self.emptyfiles = [] # type: List[bool]
self.antifiles = None
@classmethod
def retrieve(cls, file: BinaryIO):
obj = cls()
obj._read(file)
return obj
def _read(self, fp: BinaryIO):
numfiles = read_uint64(fp)
self.files = [{'emptystream': False} for _ in range(numfiles)]
numemptystreams = 0
while True:
prop = fp.read(1)
if prop == Property.END:
break
size = read_uint64(fp)
if prop == Property.DUMMY:
# Added by newer versions of 7z to adjust padding.
fp.seek(size, os.SEEK_CUR)
continue
buffer = io.BytesIO(fp.read(size))
if prop == Property.EMPTY_STREAM:
isempty = read_boolean(buffer, numfiles, checkall=False)
list(map(lambda x, y: x.update({'emptystream': y}), self.files, isempty)) # type: ignore
numemptystreams += isempty.count(True)
elif prop == Property.EMPTY_FILE:
self.emptyfiles = read_boolean(buffer, numemptystreams, checkall=False)
elif prop == Property.ANTI:
self.antifiles = read_boolean(buffer, numemptystreams, checkall=False)
elif prop == Property.NAME:
external = buffer.read(1)
if external == b'\x00':
self._read_name(buffer)
else:
dataindex = read_uint64(buffer)
current_pos = fp.tell()
fp.seek(dataindex, 0)
self._read_name(fp)
fp.seek(current_pos, 0)
elif prop == Property.CREATION_TIME:
self._read_times(buffer, 'creationtime')
elif prop == Property.LAST_ACCESS_TIME:
self._read_times(buffer, 'lastaccesstime')
elif prop == Property.LAST_WRITE_TIME:
self._read_times(buffer, 'lastwritetime')
elif prop == Property.ATTRIBUTES:
defined = read_boolean(buffer, numfiles, checkall=True)
external = buffer.read(1)
if external == b'\x00':
self._read_attributes(buffer, defined)
else:
dataindex = read_uint64(buffer)
# try to read external data
current_pos = fp.tell()
fp.seek(dataindex, 0)
self._read_attributes(fp, defined)
fp.seek(current_pos, 0)
elif prop == Property.START_POS:
self._read_start_pos(buffer)
else:
raise Bad7zFile('invalid type %r' % prop)
def _read_name(self, buffer: BinaryIO) -> None:
for f in self.files:
f['filename'] = read_utf16(buffer).replace('\\', '/')
def _read_attributes(self, buffer: BinaryIO, defined: List[bool]) -> None:
for idx, f in enumerate(self.files):
f['attributes'] = read_uint32(buffer)[0] if defined[idx] else None
def _read_times(self, fp: BinaryIO, name: str) -> None:
defined = read_boolean(fp, len(self.files), checkall=True)
# NOTE: the "external" flag is currently ignored, should be 0x00
external = fp.read(1)
assert external == b'\x00'
for i, f in enumerate(self.files):
f[name] = ArchiveTimestamp(read_real_uint64(fp)[0]) if defined[i] else None
def _read_start_pos(self, fp: BinaryIO) -> None:
defined = read_boolean(fp, len(self.files), checkall=True)
# NOTE: the "external" flag is currently ignored, should be 0x00
external = fp.read(1)
assert external == 0x00
for i, f in enumerate(self.files):
f['startpos'] = read_real_uint64(fp)[0] if defined[i] else None
def _write_times(self, fp: BinaryIO, propid, name: str) -> None:
write_byte(fp, propid)
defined = [] # type: List[bool]
num_defined = 0 # type: int
for f in self.files:
if name in f.keys():
if f[name] is not None:
defined.append(True)
num_defined += 1
size = num_defined * 8 + 2
if not reduce(and_, defined, True):
size += bits_to_bytes(num_defined)
write_uint64(fp, size)
write_boolean(fp, defined, all_defined=True)
write_byte(fp, b'\x00')
for i, file in enumerate(self.files):
if defined[i]:
write_real_uint64(fp, ArchiveTimestamp.from_datetime(file[name]))
else:
pass
def _write_prop_bool_vector(self, fp: BinaryIO, propid, vector) -> None:
write_byte(fp, propid)
write_boolean(fp, vector, all_defined=True)
@staticmethod
def _are_there(vector) -> bool:
if vector is not None:
if functools.reduce(or_, vector, False):
return True
return False
def _write_names(self, file: BinaryIO):
name_defined = 0
names = []
name_size = 0
for f in self.files:
if f.get('filename', None) is not None:
name_defined += 1
names.append(f['filename'])
name_size += len(f['filename'].encode('utf-16LE')) + 2 # len(str + NULL_WORD)
if name_defined > 0:
write_byte(file, Property.NAME)
write_uint64(file, name_size + 1)
write_byte(file, b'\x00')
for n in names:
write_utf16(file, n)
def _write_attributes(self, file):
defined = [] # type: List[bool]
num_defined = 0
for f in self.files:
if 'attributes' in f.keys() and f['attributes'] is not None:
defined.append(True)
num_defined += 1
else:
defined.append(False)
size = num_defined * 4 + 2
if num_defined != len(defined):
size += bits_to_bytes(num_defined)
write_byte(file, Property.ATTRIBUTES)
write_uint64(file, size)
write_boolean(file, defined, all_defined=True)
write_byte(file, b'\x00')
for i, f in enumerate(self.files):
if defined[i]:
write_uint32(file, f['attributes'])
def write(self, file: BinaryIO):
assert self.files is not None
write_byte(file, Property.FILES_INFO)
numfiles = len(self.files)
write_uint64(file, numfiles)
emptystreams = [] # List[bool]
for f in self.files:
emptystreams.append(f['emptystream'])
if self._are_there(emptystreams):
write_byte(file, Property.EMPTY_STREAM)
write_uint64(file, bits_to_bytes(numfiles))
write_boolean(file, emptystreams, all_defined=False)
else:
if self._are_there(self.emptyfiles):
self._write_prop_bool_vector(file, Property.EMPTY_FILE, self.emptyfiles)
if self._are_there(self.antifiles):
self._write_prop_bool_vector(file, Property.ANTI, self.antifiles)
# Name
self._write_names(file)
# timestamps
self._write_times(file, Property.CREATION_TIME, 'creationtime')
self._write_times(file, Property.LAST_ACCESS_TIME, 'lastaccesstime')
self._write_times(file, Property.LAST_WRITE_TIME, 'lastwritetime')
# start_pos
# FIXME: TBD
# attribute
self._write_attributes(file)
write_byte(file, Property.END)
class Header:
""" the archive header """
__slot__ = ['solid', 'properties', 'additional_streams', 'main_streams', 'files_info',
'size', '_start_pos']
def __init__(self) -> None:
self.solid = False
self.properties = None
self.additional_streams = None
self.main_streams = None
self.files_info = None
self.size = 0 # fixme. Not implemented yet
self._start_pos = 0
@classmethod
def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int):
obj = cls()
obj._read(fp, buffer, start_pos)
return obj
def _read(self, fp: BinaryIO, buffer: BytesIO, start_pos: int) -> None:
self._start_pos = start_pos
fp.seek(self._start_pos)
self._decode_header(fp, buffer)
def _decode_header(self, fp: BinaryIO, buffer: BytesIO) -> None:
"""
Decode header data or encoded header data from buffer.
When buffer consist of encoded buffer, it get stream data
from it and call itself recursively
"""
pid = buffer.read(1)
if not pid:
# empty archive
return
elif pid == Property.HEADER:
self._extract_header_info(buffer)
return
elif pid != Property.ENCODED_HEADER:
raise TypeError('Unknown field: %r' % id)
# get from encoded header
streams = HeaderStreamsInfo.retrieve(buffer)
self._decode_header(fp, self._get_headerdata_from_streams(fp, streams))
def _get_headerdata_from_streams(self, fp: BinaryIO, streams: StreamsInfo) -> BytesIO:
"""get header data from given streams.unpackinfo and packinfo.
folder data are stored in raw data positioned in afterheader."""
buffer = io.BytesIO()
src_start = self._start_pos
for folder in streams.unpackinfo.folders:
if folder.is_encrypted():
raise UnsupportedCompressionMethodError()
uncompressed = folder.unpacksizes
if not isinstance(uncompressed, (list, tuple)):
uncompressed = [uncompressed] * len(folder.coders)
compressed_size = streams.packinfo.packsizes[0]
uncompressed_size = uncompressed[-1]
src_start += streams.packinfo.packpos
fp.seek(src_start, 0)
decompressor = folder.get_decompressor(compressed_size)
folder_data = decompressor.decompress(fp.read(compressed_size))[:uncompressed_size]
src_start += uncompressed_size
if folder.digestdefined:
if folder.crc != calculate_crc32(folder_data):
raise Bad7zFile('invalid block data')
buffer.write(folder_data)
buffer.seek(0, 0)
return buffer
def _encode_header(self, file: BinaryIO, afterheader: int):
startpos = file.tell()
packpos = startpos - afterheader
buf = io.BytesIO()
_, raw_header_len, raw_crc = self.write(buf, 0, False)
streams = HeaderStreamsInfo()
streams.packinfo.packpos = packpos
folder = streams.unpackinfo.folders[0]
folder.crc = [raw_crc]
folder.unpacksizes = [raw_header_len]
compressed_len = 0
buf.seek(0, 0)
data = buf.read(io.DEFAULT_BUFFER_SIZE)
while data:
out = folder.compressor.compress(data)
compressed_len += len(out)
file.write(out)
data = buf.read(io.DEFAULT_BUFFER_SIZE)
out = folder.compressor.flush()
compressed_len += len(out)
file.write(out)
#
streams.packinfo.packsizes = [compressed_len]
# actual header start position
startpos = file.tell()
write_byte(file, Property.ENCODED_HEADER)
streams.write(file)
write_byte(file, Property.END)
return startpos
def write(self, file: BinaryIO, afterheader: int, encoded: bool = True):
startpos = file.tell()
if encoded:
startpos = self._encode_header(file, afterheader)
else:
write_byte(file, Property.HEADER)
# Archive properties
if self.main_streams is not None:
self.main_streams.write(file)
# Files Info
if self.files_info is not None:
self.files_info.write(file)
if self.properties is not None:
self.properties.write(file)
# AdditionalStreams
if self.additional_streams is not None:
self.additional_streams.write(file)
write_byte(file, Property.END)
endpos = file.tell()
header_len = endpos - startpos
file.seek(startpos, io.SEEK_SET)
crc = calculate_crc32(file.read(header_len))
file.seek(endpos, io.SEEK_SET)
return startpos, header_len, crc
def _extract_header_info(self, fp: BinaryIO) -> None:
pid = fp.read(1)
if pid == Property.ARCHIVE_PROPERTIES:
self.properties = ArchiveProperties.retrieve(fp)
pid = fp.read(1)
if pid == Property.ADDITIONAL_STREAMS_INFO:
self.additional_streams = StreamsInfo.retrieve(fp)
pid = fp.read(1)
if pid == Property.MAIN_STREAMS_INFO:
self.main_streams = StreamsInfo.retrieve(fp)
pid = fp.read(1)
if pid == Property.FILES_INFO:
self.files_info = FilesInfo.retrieve(fp)
pid = fp.read(1)
if pid != Property.END:
raise Bad7zFile('end id expected but %s found' % (repr(pid)))
@staticmethod
def build_header(folders):
header = Header()
header.files_info = FilesInfo()
header.main_streams = StreamsInfo()
header.main_streams.packinfo = PackInfo()
header.main_streams.packinfo.numstreams = 0
header.main_streams.packinfo.packpos = 0
header.main_streams.unpackinfo = UnpackInfo()
header.main_streams.unpackinfo.numfolders = len(folders)
header.main_streams.unpackinfo.folders = folders
header.main_streams.substreamsinfo = SubstreamsInfo()
header.main_streams.substreamsinfo.num_unpackstreams_folders = [len(folders)]
header.main_streams.substreamsinfo.unpacksizes = []
return header
class SignatureHeader:
"""The SignatureHeader class hold information of a signature header of archive."""
__slots__ = ['version', 'startheadercrc', 'nextheaderofs', 'nextheadersize', 'nextheadercrc']
def __init__(self) -> None:
self.version = (P7ZIP_MAJOR_VERSION, P7ZIP_MINOR_VERSION) # type: Tuple[bytes, ...]
self.startheadercrc = None # type: Optional[int]
self.nextheaderofs = None # type: Optional[int]
self.nextheadersize = None # type: Optional[int]
self.nextheadercrc = None # type: Optional[int]
@classmethod
def retrieve(cls, file: BinaryIO):
obj = cls()
obj._read(file)
return obj
def _read(self, file: BinaryIO) -> None:
file.seek(len(MAGIC_7Z), 0)
self.version = read_bytes(file, 2)
self.startheadercrc, _ = read_uint32(file)
self.nextheaderofs, data = read_real_uint64(file)
crc = calculate_crc32(data)
self.nextheadersize, data = read_real_uint64(file)
crc = calculate_crc32(data, crc)
self.nextheadercrc, data = read_uint32(file)
crc = calculate_crc32(data, crc)
if crc != self.startheadercrc:
raise Bad7zFile('invalid header data')
def calccrc(self, length: int, header_crc: int):
self.nextheadersize = length
self.nextheadercrc = header_crc
assert self.nextheaderofs is not None
buf = io.BytesIO()
write_real_uint64(buf, self.nextheaderofs)
write_real_uint64(buf, self.nextheadersize)
write_uint32(buf, self.nextheadercrc)
startdata = buf.getvalue()
self.startheadercrc = calculate_crc32(startdata)
def write(self, file: BinaryIO):
assert self.startheadercrc is not None
assert self.nextheadercrc is not None
assert self.nextheaderofs is not None
assert self.nextheadersize is not None
file.seek(0, 0)
write_bytes(file, MAGIC_7Z)
write_byte(file, self.version[0])
write_byte(file, self.version[1])
write_uint32(file, self.startheadercrc)
write_real_uint64(file, self.nextheaderofs)
write_real_uint64(file, self.nextheadersize)
write_uint32(file, self.nextheadercrc)
def _write_skelton(self, file: BinaryIO):
file.seek(0, 0)
write_bytes(file, MAGIC_7Z)
write_byte(file, self.version[0])
write_byte(file, self.version[1])
write_uint32(file, 1)
write_real_uint64(file, 2)
write_real_uint64(file, 3)
write_uint32(file, 4)
class FinishHeader():
"""Finish header for multi-volume 7z file."""
def __init__(self):
self.archive_start_offset = None # data offset from end of the finish header
self.additional_start_block_size = None # start signature & start header size
self.finish_header_size = 20 + 16
@classmethod
def retrieve(cls, file):
obj = cls()
obj._read(file)
return obj
def _read(self, file):
self.archive_start_offset = read_uint64(file)
self.additional_start_block_size = read_uint64(file)