2016-05-30 22:33:13 +00:00
|
|
|
import errno
|
2016-02-08 19:17:35 +00:00
|
|
|
from datetime import datetime, timezone
|
2010-10-24 20:07:54 +00:00
|
|
|
from getpass import getuser
|
2014-02-16 21:21:18 +00:00
|
|
|
from itertools import groupby
|
convert most print() calls to logging
the logging level varies: most is logging.info(), in some place
logging.warning() or logging.error() are used when the condition is
clearly an error or warning. in other cases, we keep using print, but
force writing to sys.stderr, unless we interact with the user.
there were 77 calls to print before this commit, now there are 7, most
of which in the archiver module, which interacts directly with the
user. in one case there, we still use print() only because logging is
not setup properly yet during argument parsing.
it could be argued that commands like info or list should use print
directly, but we have converted them anyways, without ill effects on
the unit tests
unit tests still use print() in some places
this switches all informational output to stderr, which should help
with, if not fix jborg/attic#312 directly
2015-10-01 17:41:42 +00:00
|
|
|
|
2015-10-06 16:33:55 +00:00
|
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger()
|
2015-05-22 17:21:41 +00:00
|
|
|
from .key import key_factory
|
|
|
|
from .remote import cache_if_remote
|
2015-10-06 16:33:55 +00:00
|
|
|
|
2010-10-20 17:59:15 +00:00
|
|
|
import os
|
2016-05-18 21:59:47 +00:00
|
|
|
from shutil import get_terminal_size
|
2010-10-24 18:18:18 +00:00
|
|
|
import socket
|
2010-10-20 17:59:15 +00:00
|
|
|
import stat
|
2010-10-20 19:08:46 +00:00
|
|
|
import sys
|
2011-09-10 15:19:02 +00:00
|
|
|
import time
|
2013-06-03 11:45:48 +00:00
|
|
|
from io import BytesIO
|
2015-05-22 17:21:41 +00:00
|
|
|
from . import xattr
|
2016-04-17 01:15:19 +00:00
|
|
|
from .constants import * # NOQA
|
2016-05-30 22:33:13 +00:00
|
|
|
from .helpers import Manifest
|
|
|
|
from .helpers import Chunk, ChunkIteratorFileWrapper, open_item
|
|
|
|
from .helpers import Error, IntegrityError
|
|
|
|
from .helpers import uid2user, user2uid, gid2group, group2gid
|
|
|
|
from .helpers import parse_timestamp, to_localtime
|
|
|
|
from .helpers import format_time, format_timedelta, format_file_size, file_status
|
|
|
|
from .helpers import safe_encode, safe_decode, make_path_safe, remove_surrogates
|
|
|
|
from .helpers import decode_dict, StableDict
|
|
|
|
from .helpers import int_to_bigint, bigint_to_int, bin_to_hex
|
|
|
|
from .helpers import ProgressIndicatorPercent, log_multi
|
|
|
|
from .helpers import PathPrefixPattern, FnmatchPattern
|
|
|
|
from .helpers import consume
|
|
|
|
from .helpers import CompressionDecider1, CompressionDecider2, CompressionSpec
|
2016-04-10 13:59:10 +00:00
|
|
|
from .repository import Repository
|
2016-05-18 21:59:47 +00:00
|
|
|
from .platform import acl_get, acl_set, set_flags, get_flags, swidth
|
2015-11-13 15:38:50 +00:00
|
|
|
from .chunker import Chunker
|
2016-04-16 15:48:47 +00:00
|
|
|
from .hashindex import ChunkIndex, ChunkIndexEntry
|
|
|
|
from .cache import ChunkListEntry
|
2015-11-13 15:38:50 +00:00
|
|
|
import msgpack
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2013-06-03 11:45:48 +00:00
|
|
|
has_lchmod = hasattr(os, 'lchmod')
|
2010-10-27 17:30:21 +00:00
|
|
|
|
2016-02-18 22:01:55 +00:00
|
|
|
flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
|
|
|
|
flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
|
|
|
|
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2016-05-18 21:59:47 +00:00
|
|
|
class Statistics:
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.osize = self.csize = self.usize = self.nfiles = 0
|
|
|
|
self.last_progress = 0 # timestamp when last progress was shown
|
|
|
|
|
|
|
|
def update(self, size, csize, unique):
|
|
|
|
self.osize += size
|
|
|
|
self.csize += csize
|
|
|
|
if unique:
|
|
|
|
self.usize += csize
|
|
|
|
|
|
|
|
summary = """\
|
|
|
|
Original size Compressed size Deduplicated size
|
|
|
|
{label:15} {stats.osize_fmt:>20s} {stats.csize_fmt:>20s} {stats.usize_fmt:>20s}"""
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return self.summary.format(stats=self, label='This archive:')
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return "<{cls} object at {hash:#x} ({self.osize}, {self.csize}, {self.usize})>".format(
|
|
|
|
cls=type(self).__name__, hash=id(self), self=self)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def osize_fmt(self):
|
|
|
|
return format_file_size(self.osize)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def usize_fmt(self):
|
|
|
|
return format_file_size(self.usize)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def csize_fmt(self):
|
|
|
|
return format_file_size(self.csize)
|
|
|
|
|
|
|
|
def show_progress(self, item=None, final=False, stream=None, dt=None):
|
|
|
|
now = time.time()
|
|
|
|
if dt is None or now - self.last_progress > dt:
|
|
|
|
self.last_progress = now
|
|
|
|
columns, lines = get_terminal_size()
|
|
|
|
if not final:
|
|
|
|
msg = '{0.osize_fmt} O {0.csize_fmt} C {0.usize_fmt} D {0.nfiles} N '.format(self)
|
|
|
|
path = remove_surrogates(item[b'path']) if item else ''
|
|
|
|
space = columns - swidth(msg)
|
|
|
|
if space < swidth('...') + swidth(path):
|
|
|
|
path = '%s...%s' % (path[:(space // 2) - swidth('...')], path[-space // 2:])
|
|
|
|
msg += "{0:<{space}}".format(path, space=space)
|
|
|
|
else:
|
|
|
|
msg = ' ' * columns
|
|
|
|
print(msg, file=stream or sys.stderr, end="\r", flush=True)
|
|
|
|
|
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
class DownloadPipeline:
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def __init__(self, repository, key):
|
|
|
|
self.repository = repository
|
|
|
|
self.key = key
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-23 21:13:08 +00:00
|
|
|
def unpack_many(self, ids, filter=None, preload=False):
|
2014-01-22 19:58:48 +00:00
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
2016-03-18 02:16:12 +00:00
|
|
|
for _, data in self.fetch_many(ids):
|
2014-01-22 19:58:48 +00:00
|
|
|
unpacker.feed(data)
|
2016-04-17 01:15:19 +00:00
|
|
|
items = [decode_dict(item, ITEM_TEXT_KEYS) for item in unpacker]
|
2014-01-22 19:58:48 +00:00
|
|
|
if filter:
|
|
|
|
items = [item for item in items if filter(item)]
|
2016-04-16 15:48:47 +00:00
|
|
|
for item in items:
|
|
|
|
if b'chunks' in item:
|
|
|
|
item[b'chunks'] = [ChunkListEntry(*e) for e in item[b'chunks']]
|
2014-01-23 21:13:08 +00:00
|
|
|
if preload:
|
|
|
|
for item in items:
|
|
|
|
if b'chunks' in item:
|
2016-04-16 15:48:47 +00:00
|
|
|
self.repository.preload([c.id for c in item[b'chunks']])
|
2014-01-22 19:58:48 +00:00
|
|
|
for item in items:
|
|
|
|
yield item
|
|
|
|
|
|
|
|
def fetch_many(self, ids, is_preloaded=False):
|
2014-02-16 21:21:18 +00:00
|
|
|
for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
2014-01-22 19:58:48 +00:00
|
|
|
yield self.key.decrypt(id_, data)
|
|
|
|
|
|
|
|
|
|
|
|
class ChunkBuffer:
|
|
|
|
BUFFER_SIZE = 1 * 1024 * 1024
|
|
|
|
|
2016-01-15 19:56:21 +00:00
|
|
|
def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
|
2014-07-10 13:44:29 +00:00
|
|
|
self.buffer = BytesIO()
|
2014-01-22 19:58:48 +00:00
|
|
|
self.packer = msgpack.Packer(unicode_errors='surrogateescape')
|
|
|
|
self.chunks = []
|
|
|
|
self.key = key
|
2015-06-20 23:46:41 +00:00
|
|
|
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def add(self, item):
|
2014-02-16 22:36:48 +00:00
|
|
|
self.buffer.write(self.packer.pack(StableDict(item)))
|
2014-02-16 21:21:18 +00:00
|
|
|
if self.is_full():
|
|
|
|
self.flush()
|
|
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
|
raise NotImplementedError
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def flush(self, flush=False):
|
|
|
|
if self.buffer.tell() == 0:
|
|
|
|
return
|
|
|
|
self.buffer.seek(0)
|
2016-03-18 02:16:12 +00:00
|
|
|
chunks = list(Chunk(bytes(s)) for s in self.chunker.chunkify(self.buffer))
|
2014-01-22 19:58:48 +00:00
|
|
|
self.buffer.seek(0)
|
|
|
|
self.buffer.truncate(0)
|
2014-08-30 13:10:41 +00:00
|
|
|
# Leave the last partial chunk in the buffer unless flush is True
|
2014-01-22 19:58:48 +00:00
|
|
|
end = None if flush or len(chunks) == 1 else -1
|
|
|
|
for chunk in chunks[:end]:
|
2014-02-16 21:21:18 +00:00
|
|
|
self.chunks.append(self.write_chunk(chunk))
|
2014-01-22 19:58:48 +00:00
|
|
|
if end == -1:
|
2016-04-19 07:18:46 +00:00
|
|
|
self.buffer.write(chunks[-1].data)
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def is_full(self):
|
|
|
|
return self.buffer.tell() > self.BUFFER_SIZE
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
class CacheChunkBuffer(ChunkBuffer):
|
|
|
|
|
2016-01-15 19:56:21 +00:00
|
|
|
def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
|
2015-07-11 16:31:49 +00:00
|
|
|
super().__init__(key, chunker_params)
|
2014-02-16 21:21:18 +00:00
|
|
|
self.cache = cache
|
|
|
|
self.stats = stats
|
|
|
|
|
|
|
|
def write_chunk(self, chunk):
|
2016-03-18 02:16:12 +00:00
|
|
|
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk.data), chunk, self.stats)
|
2014-02-16 21:21:18 +00:00
|
|
|
return id_
|
|
|
|
|
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
class Archive:
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class DoesNotExist(Error):
|
|
|
|
"""Archive {} does not exist"""
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class AlreadyExists(Error):
|
|
|
|
"""Archive {} already exists"""
|
2011-09-10 15:19:02 +00:00
|
|
|
|
2015-04-21 20:29:10 +00:00
|
|
|
class IncompatibleFilesystemEncodingError(Error):
|
|
|
|
"""Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
|
|
|
|
|
2013-06-20 10:44:58 +00:00
|
|
|
def __init__(self, repository, key, manifest, name, cache=None, create=False,
|
2015-06-19 23:20:46 +00:00
|
|
|
checkpoint_interval=300, numeric_owner=False, progress=False,
|
2016-04-18 23:13:10 +00:00
|
|
|
chunker_params=CHUNKER_PARAMS, start=None, end=None, compression=None, compression_files=None):
|
2013-06-03 11:45:48 +00:00
|
|
|
self.cwd = os.getcwd()
|
2011-07-30 19:13:48 +00:00
|
|
|
self.key = key
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository = repository
|
2011-07-30 19:13:48 +00:00
|
|
|
self.cache = cache
|
2011-09-04 21:02:47 +00:00
|
|
|
self.manifest = manifest
|
2010-10-20 20:53:58 +00:00
|
|
|
self.hard_links = {}
|
2011-08-07 15:10:21 +00:00
|
|
|
self.stats = Statistics()
|
2015-03-24 03:24:54 +00:00
|
|
|
self.show_progress = progress
|
2011-09-10 15:19:02 +00:00
|
|
|
self.name = name
|
|
|
|
self.checkpoint_interval = checkpoint_interval
|
2012-02-29 22:59:17 +00:00
|
|
|
self.numeric_owner = numeric_owner
|
2016-03-11 22:37:37 +00:00
|
|
|
if start is None:
|
|
|
|
start = datetime.utcnow()
|
2016-03-12 11:40:39 +00:00
|
|
|
self.chunker_params = chunker_params
|
2015-10-02 19:56:21 +00:00
|
|
|
self.start = start
|
2016-03-11 22:37:37 +00:00
|
|
|
if end is None:
|
|
|
|
end = datetime.utcnow()
|
2015-10-02 19:56:21 +00:00
|
|
|
self.end = end
|
2014-01-22 19:58:48 +00:00
|
|
|
self.pipeline = DownloadPipeline(self.repository, self.key)
|
2011-09-10 15:19:02 +00:00
|
|
|
if create:
|
2016-01-15 19:56:21 +00:00
|
|
|
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
2015-06-20 23:46:41 +00:00
|
|
|
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
|
2016-04-18 23:13:10 +00:00
|
|
|
self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
|
|
compression_files or [])
|
|
|
|
key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
2011-09-10 15:19:02 +00:00
|
|
|
if name in manifest.archives:
|
2012-12-09 22:06:33 +00:00
|
|
|
raise self.AlreadyExists(name)
|
2011-09-10 15:19:02 +00:00
|
|
|
self.last_checkpoint = time.time()
|
|
|
|
i = 0
|
|
|
|
while True:
|
|
|
|
self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
|
2015-03-17 22:47:21 +00:00
|
|
|
if self.checkpoint_name not in manifest.archives:
|
2011-09-10 15:19:02 +00:00
|
|
|
break
|
|
|
|
i += 1
|
|
|
|
else:
|
2012-12-10 19:48:39 +00:00
|
|
|
if name not in self.manifest.archives:
|
2012-12-09 22:06:33 +00:00
|
|
|
raise self.DoesNotExist(name)
|
2012-12-10 19:48:39 +00:00
|
|
|
info = self.manifest.archives[name]
|
2013-06-03 11:45:48 +00:00
|
|
|
self.load(info[b'id'])
|
2015-06-20 23:46:41 +00:00
|
|
|
self.zeros = b'\0' * (1 << chunker_params[1])
|
2011-08-15 20:32:26 +00:00
|
|
|
|
2015-03-24 06:11:00 +00:00
|
|
|
def _load_meta(self, id):
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(id, self.repository.get(id))
|
2015-03-24 06:11:00 +00:00
|
|
|
metadata = msgpack.unpackb(data)
|
|
|
|
if metadata[b'version'] != 1:
|
|
|
|
raise Exception('Unknown archive metadata version')
|
|
|
|
return metadata
|
|
|
|
|
2010-10-21 19:21:43 +00:00
|
|
|
def load(self, id):
|
|
|
|
self.id = id
|
2015-03-24 06:11:00 +00:00
|
|
|
self.metadata = self._load_meta(self.id)
|
2016-04-17 01:15:19 +00:00
|
|
|
decode_dict(self.metadata, ARCHIVE_TEXT_KEYS)
|
2016-04-23 20:57:04 +00:00
|
|
|
self.metadata[b'cmdline'] = [safe_decode(arg) for arg in self.metadata[b'cmdline']]
|
2013-06-03 11:45:48 +00:00
|
|
|
self.name = self.metadata[b'name']
|
2010-10-25 17:51:47 +00:00
|
|
|
|
2011-06-16 19:55:54 +00:00
|
|
|
@property
|
|
|
|
def ts(self):
|
2016-02-05 01:02:04 +00:00
|
|
|
"""Timestamp of archive creation (start) in UTC"""
|
2016-02-07 01:35:31 +00:00
|
|
|
ts = self.metadata[b'time']
|
|
|
|
return parse_timestamp(ts)
|
2011-06-16 19:55:54 +00:00
|
|
|
|
2016-02-05 01:02:04 +00:00
|
|
|
@property
|
|
|
|
def ts_end(self):
|
|
|
|
"""Timestamp of archive creation (end) in UTC"""
|
2016-02-07 01:35:31 +00:00
|
|
|
# fall back to time if there is no time_end present in metadata
|
|
|
|
ts = self.metadata.get(b'time_end') or self.metadata[b'time']
|
|
|
|
return parse_timestamp(ts)
|
2016-02-05 01:02:04 +00:00
|
|
|
|
2015-10-02 19:56:21 +00:00
|
|
|
@property
|
|
|
|
def fpr(self):
|
2016-04-23 20:42:56 +00:00
|
|
|
return bin_to_hex(self.id)
|
2015-10-02 19:56:21 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def duration(self):
|
2016-01-30 20:32:45 +00:00
|
|
|
return format_timedelta(self.end - self.start)
|
2015-10-02 19:56:21 +00:00
|
|
|
|
2016-04-23 23:29:17 +00:00
|
|
|
@property
|
|
|
|
def duration_from_meta(self):
|
|
|
|
return format_timedelta(self.ts_end - self.ts)
|
|
|
|
|
2015-10-02 19:56:21 +00:00
|
|
|
def __str__(self):
|
2016-02-08 19:17:35 +00:00
|
|
|
return '''\
|
|
|
|
Archive name: {0.name}
|
2015-10-02 19:56:21 +00:00
|
|
|
Archive fingerprint: {0.fpr}
|
2016-02-08 19:17:35 +00:00
|
|
|
Time (start): {start}
|
|
|
|
Time (end): {end}
|
2015-10-02 19:56:21 +00:00
|
|
|
Duration: {0.duration}
|
2016-02-08 19:17:35 +00:00
|
|
|
Number of files: {0.stats.nfiles}'''.format(
|
|
|
|
self,
|
|
|
|
start=format_time(to_localtime(self.start.replace(tzinfo=timezone.utc))),
|
|
|
|
end=format_time(to_localtime(self.end.replace(tzinfo=timezone.utc))))
|
2015-10-02 19:56:21 +00:00
|
|
|
|
2011-08-11 19:18:13 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return 'Archive(%r)' % self.name
|
|
|
|
|
2014-01-23 21:13:08 +00:00
|
|
|
def iter_items(self, filter=None, preload=False):
|
|
|
|
for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
|
|
|
|
yield item
|
2010-11-29 20:08:37 +00:00
|
|
|
|
2011-08-02 21:20:46 +00:00
|
|
|
def add_item(self, item):
|
2015-11-03 19:21:52 +00:00
|
|
|
unknown_keys = set(item) - ITEM_KEYS
|
2016-04-17 01:15:19 +00:00
|
|
|
assert not unknown_keys, ('unknown item metadata keys detected, please update constants.ITEM_KEYS: %s',
|
2015-11-03 19:21:52 +00:00
|
|
|
','.join(k.decode('ascii') for k in unknown_keys))
|
2015-12-27 10:06:03 +00:00
|
|
|
if self.show_progress:
|
|
|
|
self.stats.show_progress(item=item, dt=0.2)
|
2014-01-22 19:58:48 +00:00
|
|
|
self.items_buffer.add(item)
|
2016-04-07 09:29:52 +00:00
|
|
|
if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval:
|
2011-09-10 15:19:02 +00:00
|
|
|
self.write_checkpoint()
|
2014-03-05 19:00:27 +00:00
|
|
|
self.last_checkpoint = time.time()
|
2010-12-04 20:03:02 +00:00
|
|
|
|
2011-09-10 15:19:02 +00:00
|
|
|
def write_checkpoint(self):
|
|
|
|
self.save(self.checkpoint_name)
|
|
|
|
del self.manifest.archives[self.checkpoint_name]
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
2011-09-10 15:19:02 +00:00
|
|
|
|
2016-04-07 09:29:52 +00:00
|
|
|
def save(self, name=None, comment=None, timestamp=None, additional_metadata=None):
|
2011-09-10 15:19:02 +00:00
|
|
|
name = name or self.name
|
2011-09-04 21:02:47 +00:00
|
|
|
if name in self.manifest.archives:
|
2011-09-10 15:19:02 +00:00
|
|
|
raise self.AlreadyExists(name)
|
2014-01-22 19:58:48 +00:00
|
|
|
self.items_buffer.flush(flush=True)
|
2015-04-18 19:36:10 +00:00
|
|
|
if timestamp is None:
|
2016-02-07 01:35:31 +00:00
|
|
|
self.end = datetime.utcnow()
|
2016-02-05 01:02:04 +00:00
|
|
|
start = self.start
|
|
|
|
end = self.end
|
|
|
|
else:
|
2016-02-07 01:35:31 +00:00
|
|
|
self.end = timestamp
|
2016-02-05 01:02:04 +00:00
|
|
|
start = timestamp
|
|
|
|
end = timestamp # we only have 1 value
|
2016-04-07 09:29:52 +00:00
|
|
|
metadata = {
|
2010-10-20 19:08:46 +00:00
|
|
|
'version': 1,
|
2010-10-20 17:59:15 +00:00
|
|
|
'name': name,
|
2016-04-06 11:04:18 +00:00
|
|
|
'comment': comment,
|
2014-01-22 19:58:48 +00:00
|
|
|
'items': self.items_buffer.chunks,
|
2010-10-21 19:21:43 +00:00
|
|
|
'cmdline': sys.argv,
|
2010-10-24 18:18:18 +00:00
|
|
|
'hostname': socket.gethostname(),
|
2010-10-24 20:07:54 +00:00
|
|
|
'username': getuser(),
|
2016-02-05 01:02:04 +00:00
|
|
|
'time': start.isoformat(),
|
|
|
|
'time_end': end.isoformat(),
|
2016-03-12 11:40:39 +00:00
|
|
|
'chunker_params': self.chunker_params,
|
2016-04-07 09:29:52 +00:00
|
|
|
}
|
|
|
|
metadata.update(additional_metadata or {})
|
|
|
|
data = msgpack.packb(StableDict(metadata), unicode_errors='surrogateescape')
|
2011-08-15 20:32:26 +00:00
|
|
|
self.id = self.key.id_hash(data)
|
2016-03-18 02:16:12 +00:00
|
|
|
self.cache.add_chunk(self.id, Chunk(data), self.stats)
|
2011-09-04 21:02:47 +00:00
|
|
|
self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
|
|
|
|
self.manifest.write()
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository.commit()
|
2011-09-10 15:19:02 +00:00
|
|
|
self.cache.commit()
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2015-04-21 18:50:19 +00:00
|
|
|
def calc_stats(self, cache):
|
2012-11-28 20:17:00 +00:00
|
|
|
def add(id):
|
2015-04-19 21:45:05 +00:00
|
|
|
count, size, csize = cache.chunks[id]
|
2012-11-28 20:17:00 +00:00
|
|
|
stats.update(size, csize, count == 1)
|
2015-04-19 22:07:26 +00:00
|
|
|
cache.chunks[id] = count - 1, size, csize
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def add_file_chunks(chunks):
|
|
|
|
for id, _, _ in chunks:
|
|
|
|
add(id)
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2011-07-30 19:13:48 +00:00
|
|
|
# This function is a bit evil since it abuses the cache to calculate
|
2015-04-21 18:50:19 +00:00
|
|
|
# the stats. The cache transaction must be rolled back afterwards
|
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
2012-10-17 09:40:23 +00:00
|
|
|
cache.begin_txn()
|
|
|
|
stats = Statistics()
|
2012-11-28 20:17:00 +00:00
|
|
|
add(self.id)
|
2014-02-16 21:21:18 +00:00
|
|
|
for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
|
2012-11-29 19:38:03 +00:00
|
|
|
add(id)
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(id, chunk)
|
|
|
|
unpacker.feed(data)
|
2011-07-30 19:13:48 +00:00
|
|
|
for item in unpacker:
|
2014-01-22 19:58:48 +00:00
|
|
|
if b'chunks' in item:
|
2012-11-28 20:17:00 +00:00
|
|
|
stats.nfiles += 1
|
2014-01-22 19:58:48 +00:00
|
|
|
add_file_chunks(item[b'chunks'])
|
2011-07-30 19:13:48 +00:00
|
|
|
cache.rollback()
|
2011-07-30 20:50:59 +00:00
|
|
|
return stats
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2016-03-17 21:39:57 +00:00
|
|
|
def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False,
|
|
|
|
hardlink_masters=None, original_path=None):
|
|
|
|
"""
|
|
|
|
Extract archive item.
|
|
|
|
|
|
|
|
:param item: the item to extract
|
|
|
|
:param restore_attrs: restore file attributes
|
|
|
|
:param dry_run: do not write any data
|
|
|
|
:param stdout: write extracted data to stdout
|
|
|
|
:param sparse: write sparse files (chunk-granularity, independent of the original being sparse)
|
|
|
|
:param hardlink_masters: maps paths to (chunks, link_target) for extracting subtrees with hardlinks correctly
|
|
|
|
:param original_path: b'path' key as stored in archive
|
|
|
|
"""
|
2015-03-01 04:07:29 +00:00
|
|
|
if dry_run or stdout:
|
2014-02-18 20:33:06 +00:00
|
|
|
if b'chunks' in item:
|
2016-05-24 20:40:20 +00:00
|
|
|
for _, data in self.pipeline.fetch_many([c.id for c in item[b'chunks']], is_preloaded=True):
|
2015-03-01 04:07:29 +00:00
|
|
|
if stdout:
|
|
|
|
sys.stdout.buffer.write(data)
|
|
|
|
if stdout:
|
|
|
|
sys.stdout.buffer.flush()
|
2014-02-18 20:33:06 +00:00
|
|
|
return
|
|
|
|
|
2016-03-17 21:39:57 +00:00
|
|
|
original_path = original_path or item[b'path']
|
2013-06-30 20:32:27 +00:00
|
|
|
dest = self.cwd
|
2013-08-03 11:34:14 +00:00
|
|
|
if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
|
|
|
|
raise Exception('Path should be relative and local')
|
2013-06-03 11:45:48 +00:00
|
|
|
path = os.path.join(dest, item[b'path'])
|
2012-12-06 22:04:01 +00:00
|
|
|
# Attempt to remove existing files, ignore errors on failure
|
|
|
|
try:
|
|
|
|
st = os.lstat(path)
|
|
|
|
if stat.S_ISDIR(st.st_mode):
|
|
|
|
os.rmdir(path)
|
|
|
|
else:
|
|
|
|
os.unlink(path)
|
2015-04-21 20:29:10 +00:00
|
|
|
except UnicodeEncodeError:
|
2015-12-14 23:17:03 +00:00
|
|
|
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
|
2012-12-06 22:04:01 +00:00
|
|
|
except OSError:
|
|
|
|
pass
|
2013-06-03 11:45:48 +00:00
|
|
|
mode = item[b'mode']
|
2015-05-31 19:53:37 +00:00
|
|
|
if stat.S_ISREG(mode):
|
2010-10-30 11:44:25 +00:00
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
2016-03-17 21:39:57 +00:00
|
|
|
|
2010-10-30 11:44:25 +00:00
|
|
|
# Hard link?
|
2013-06-03 11:45:48 +00:00
|
|
|
if b'source' in item:
|
|
|
|
source = os.path.join(dest, item[b'source'])
|
2012-10-17 09:40:23 +00:00
|
|
|
if os.path.exists(path):
|
|
|
|
os.unlink(path)
|
2016-03-17 21:39:57 +00:00
|
|
|
if not hardlink_masters:
|
|
|
|
os.link(source, path)
|
|
|
|
return
|
|
|
|
item[b'chunks'], link_target = hardlink_masters[item[b'source']]
|
|
|
|
if link_target:
|
|
|
|
# Hard link was extracted previously, just link
|
|
|
|
os.link(link_target, path)
|
|
|
|
return
|
|
|
|
# Extract chunks, since the item which had the chunks was not extracted
|
|
|
|
with open(path, 'wb') as fd:
|
2016-04-16 15:48:47 +00:00
|
|
|
ids = [c.id for c in item[b'chunks']]
|
2016-03-18 02:16:12 +00:00
|
|
|
for _, data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
2016-03-17 21:39:57 +00:00
|
|
|
if sparse and self.zeros.startswith(data):
|
|
|
|
# all-zero chunk: create a hole in a sparse file
|
|
|
|
fd.seek(len(data), 1)
|
|
|
|
else:
|
|
|
|
fd.write(data)
|
|
|
|
pos = fd.tell()
|
|
|
|
fd.truncate(pos)
|
|
|
|
fd.flush()
|
|
|
|
self.restore_attrs(path, item, fd=fd.fileno())
|
|
|
|
if hardlink_masters:
|
|
|
|
# Update master entry with extracted file path, so that following hardlinks don't extract twice.
|
|
|
|
hardlink_masters[item.get(b'source') or original_path] = (None, path)
|
2015-05-31 19:53:37 +00:00
|
|
|
elif stat.S_ISDIR(mode):
|
|
|
|
if not os.path.exists(path):
|
|
|
|
os.makedirs(path)
|
|
|
|
if restore_attrs:
|
|
|
|
self.restore_attrs(path, item)
|
2012-03-01 21:35:43 +00:00
|
|
|
elif stat.S_ISLNK(mode):
|
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
2013-06-03 11:45:48 +00:00
|
|
|
source = item[b'source']
|
2012-03-01 21:35:43 +00:00
|
|
|
if os.path.exists(path):
|
|
|
|
os.unlink(path)
|
2015-11-08 00:05:55 +00:00
|
|
|
try:
|
|
|
|
os.symlink(source, path)
|
|
|
|
except UnicodeEncodeError:
|
2015-12-14 23:17:03 +00:00
|
|
|
raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
|
2012-03-01 21:35:43 +00:00
|
|
|
self.restore_attrs(path, item, symlink=True)
|
2015-05-31 19:53:37 +00:00
|
|
|
elif stat.S_ISFIFO(mode):
|
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
|
|
|
os.mkfifo(path)
|
|
|
|
self.restore_attrs(path, item)
|
2012-03-01 21:35:43 +00:00
|
|
|
elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
|
2013-06-03 11:45:48 +00:00
|
|
|
os.mknod(path, item[b'mode'], item[b'rdev'])
|
2012-03-01 21:35:43 +00:00
|
|
|
self.restore_attrs(path, item)
|
2010-10-30 11:44:25 +00:00
|
|
|
else:
|
2013-06-03 11:45:48 +00:00
|
|
|
raise Exception('Unknown archive item type %r' % item[b'mode'])
|
2010-10-20 20:53:58 +00:00
|
|
|
|
2013-06-03 11:45:48 +00:00
|
|
|
def restore_attrs(self, path, item, symlink=False, fd=None):
|
2012-02-29 22:59:17 +00:00
|
|
|
uid = gid = None
|
|
|
|
if not self.numeric_owner:
|
2013-06-03 11:45:48 +00:00
|
|
|
uid = user2uid(item[b'user'])
|
|
|
|
gid = group2gid(item[b'group'])
|
2014-04-13 18:26:46 +00:00
|
|
|
uid = item[b'uid'] if uid is None else uid
|
|
|
|
gid = item[b'gid'] if gid is None else gid
|
2013-06-03 11:45:48 +00:00
|
|
|
# This code is a bit of a mess due to os specific differences
|
2010-10-20 20:53:58 +00:00
|
|
|
try:
|
2013-06-03 11:45:48 +00:00
|
|
|
if fd:
|
|
|
|
os.fchown(fd, uid, gid)
|
|
|
|
else:
|
|
|
|
os.lchown(path, uid, gid)
|
2010-10-20 20:53:58 +00:00
|
|
|
except OSError:
|
|
|
|
pass
|
2013-06-03 11:45:48 +00:00
|
|
|
if fd:
|
|
|
|
os.fchmod(fd, item[b'mode'])
|
|
|
|
elif not symlink:
|
|
|
|
os.chmod(path, item[b'mode'])
|
|
|
|
elif has_lchmod: # Not available on Linux
|
|
|
|
os.lchmod(path, item[b'mode'])
|
2014-05-18 16:28:26 +00:00
|
|
|
mtime = bigint_to_int(item[b'mtime'])
|
2015-10-26 01:07:55 +00:00
|
|
|
if b'atime' in item:
|
|
|
|
atime = bigint_to_int(item[b'atime'])
|
|
|
|
else:
|
|
|
|
# old archives only had mtime in item metadata
|
|
|
|
atime = mtime
|
2015-12-14 21:53:31 +00:00
|
|
|
if fd:
|
2015-10-26 01:07:55 +00:00
|
|
|
os.utime(fd, None, ns=(atime, mtime))
|
2015-12-14 21:53:31 +00:00
|
|
|
else:
|
2015-10-26 01:07:55 +00:00
|
|
|
os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
|
2014-04-13 18:26:46 +00:00
|
|
|
acl_set(path, item, self.numeric_owner)
|
2016-05-17 22:22:49 +00:00
|
|
|
if b'bsdflags' in item:
|
2014-04-08 19:52:26 +00:00
|
|
|
try:
|
2016-05-17 22:22:49 +00:00
|
|
|
set_flags(path, item[b'bsdflags'], fd=fd)
|
2014-04-08 19:52:26 +00:00
|
|
|
except OSError:
|
|
|
|
pass
|
2016-04-16 21:52:16 +00:00
|
|
|
# chown removes Linux capabilities, so set the extended attributes at the end, after chown, since they include
|
|
|
|
# the Linux capabilities in the "security.capability" attribute.
|
|
|
|
xattrs = item.get(b'xattrs', {})
|
|
|
|
for k, v in xattrs.items():
|
|
|
|
try:
|
|
|
|
xattr.setxattr(fd or path, k, v, follow_symlinks=False)
|
|
|
|
except OSError as e:
|
|
|
|
if e.errno not in (errno.ENOTSUP, errno.EACCES):
|
|
|
|
# only raise if the errno is not on our ignore list:
|
|
|
|
# ENOTSUP == xattrs not supported here
|
|
|
|
# EACCES == permission denied to set this specific xattr
|
|
|
|
# (this may happen related to security.* keys)
|
|
|
|
raise
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2016-04-08 05:07:14 +00:00
|
|
|
def set_meta(self, key, value):
|
2015-03-24 06:11:00 +00:00
|
|
|
metadata = StableDict(self._load_meta(self.id))
|
2016-04-08 05:07:14 +00:00
|
|
|
metadata[key] = value
|
2015-03-24 06:11:00 +00:00
|
|
|
data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
|
|
|
new_id = self.key.id_hash(data)
|
2016-03-18 02:16:12 +00:00
|
|
|
self.cache.add_chunk(new_id, Chunk(data), self.stats)
|
2016-04-08 05:07:14 +00:00
|
|
|
self.manifest.archives[self.name] = {'id': new_id, 'time': metadata[b'time']}
|
2015-03-24 06:11:00 +00:00
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
2016-04-07 09:29:52 +00:00
|
|
|
self.id = new_id
|
2016-04-08 05:07:14 +00:00
|
|
|
|
|
|
|
def rename(self, name):
|
|
|
|
if name in self.manifest.archives:
|
|
|
|
raise self.AlreadyExists(name)
|
|
|
|
oldname = self.name
|
|
|
|
self.name = name
|
|
|
|
self.set_meta(b'name', name)
|
|
|
|
del self.manifest.archives[oldname]
|
2015-03-24 06:11:00 +00:00
|
|
|
|
2016-01-16 19:46:49 +00:00
|
|
|
def delete(self, stats, progress=False):
|
2013-05-17 12:30:39 +00:00
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
2016-01-16 19:46:49 +00:00
|
|
|
items_ids = self.metadata[b'items']
|
|
|
|
pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
|
|
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
|
if progress:
|
|
|
|
pi.show(i)
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(items_id, data)
|
|
|
|
unpacker.feed(data)
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(items_id, stats)
|
2011-07-30 20:50:59 +00:00
|
|
|
for item in unpacker:
|
2014-01-22 19:58:48 +00:00
|
|
|
if b'chunks' in item:
|
2013-06-03 11:45:48 +00:00
|
|
|
for chunk_id, size, csize in item[b'chunks']:
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(chunk_id, stats)
|
2016-01-16 19:46:49 +00:00
|
|
|
if progress:
|
|
|
|
pi.finish()
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(self.id, stats)
|
2011-09-04 21:02:47 +00:00
|
|
|
del self.manifest.archives[self.name]
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2010-10-31 19:31:56 +00:00
|
|
|
def stat_attrs(self, st, path):
|
2010-10-31 20:55:09 +00:00
|
|
|
item = {
|
2013-06-03 11:45:48 +00:00
|
|
|
b'mode': st.st_mode,
|
|
|
|
b'uid': st.st_uid, b'user': uid2user(st.st_uid),
|
|
|
|
b'gid': st.st_gid, b'group': gid2group(st.st_gid),
|
2015-12-14 21:39:43 +00:00
|
|
|
b'atime': int_to_bigint(st.st_atime_ns),
|
|
|
|
b'ctime': int_to_bigint(st.st_ctime_ns),
|
|
|
|
b'mtime': int_to_bigint(st.st_mtime_ns),
|
2010-10-30 11:44:25 +00:00
|
|
|
}
|
2012-02-29 22:59:17 +00:00
|
|
|
if self.numeric_owner:
|
2013-06-03 11:45:48 +00:00
|
|
|
item[b'user'] = item[b'group'] = None
|
2013-07-29 19:09:31 +00:00
|
|
|
xattrs = xattr.get_all(path, follow_symlinks=False)
|
|
|
|
if xattrs:
|
2014-02-16 22:36:48 +00:00
|
|
|
item[b'xattrs'] = StableDict(xattrs)
|
2016-05-17 22:22:49 +00:00
|
|
|
bsdflags = get_flags(path, st)
|
|
|
|
if bsdflags:
|
|
|
|
item[b'bsdflags'] = bsdflags
|
2014-08-01 13:50:18 +00:00
|
|
|
acl_get(path, item, st, self.numeric_owner)
|
2010-10-31 20:55:09 +00:00
|
|
|
return item
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2015-03-22 14:52:43 +00:00
|
|
|
def process_dir(self, path, st):
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path)}
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2010-11-29 20:08:37 +00:00
|
|
|
self.add_item(item)
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'd' # directory
|
|
|
|
|
|
|
|
def process_fifo(self, path, st):
|
|
|
|
item = {b'path': make_path_safe(path)}
|
|
|
|
item.update(self.stat_attrs(st, path))
|
|
|
|
self.add_item(item)
|
|
|
|
return 'f' # fifo
|
2010-10-31 19:12:32 +00:00
|
|
|
|
2012-03-03 13:02:22 +00:00
|
|
|
def process_dev(self, path, st):
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
|
2012-03-03 13:02:22 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
if stat.S_ISCHR(st.st_mode):
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'c' # char device
|
2015-03-08 18:18:21 +00:00
|
|
|
elif stat.S_ISBLK(st.st_mode):
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'b' # block device
|
2012-03-03 13:02:22 +00:00
|
|
|
|
2010-10-20 20:53:58 +00:00
|
|
|
def process_symlink(self, path, st):
|
2010-10-20 17:59:15 +00:00
|
|
|
source = os.readlink(path)
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path), b'source': source}
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2010-11-29 20:08:37 +00:00
|
|
|
self.add_item(item)
|
2015-03-22 14:52:43 +00:00
|
|
|
return 's' # symlink
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2015-03-01 03:29:44 +00:00
|
|
|
def process_stdin(self, path, cache):
|
|
|
|
uid, gid = 0, 0
|
|
|
|
fd = sys.stdin.buffer # binary
|
|
|
|
chunks = []
|
2016-03-18 02:16:12 +00:00
|
|
|
for data in self.chunker.chunkify(fd):
|
|
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data), self.stats))
|
2015-03-01 03:29:44 +00:00
|
|
|
self.stats.nfiles += 1
|
2015-11-03 18:52:49 +00:00
|
|
|
t = int_to_bigint(int(time.time()) * 1000000000)
|
2015-03-01 03:29:44 +00:00
|
|
|
item = {
|
|
|
|
b'path': path,
|
|
|
|
b'chunks': chunks,
|
|
|
|
b'mode': 0o100660, # regular file, ug=rw
|
|
|
|
b'uid': uid, b'user': uid2user(uid),
|
|
|
|
b'gid': gid, b'group': gid2group(gid),
|
2015-11-03 18:52:49 +00:00
|
|
|
b'mtime': t, b'atime': t, b'ctime': t,
|
2015-03-01 03:29:44 +00:00
|
|
|
}
|
|
|
|
self.add_item(item)
|
2015-09-08 01:12:45 +00:00
|
|
|
return 'i' # stdin
|
2015-03-01 03:29:44 +00:00
|
|
|
|
2016-03-15 14:38:55 +00:00
|
|
|
def process_file(self, path, st, cache, ignore_inode=False):
|
2015-03-08 18:18:21 +00:00
|
|
|
status = None
|
2013-08-03 11:34:14 +00:00
|
|
|
safe_path = make_path_safe(path)
|
2010-10-25 20:31:18 +00:00
|
|
|
# Is it a hard link?
|
2010-10-20 20:53:58 +00:00
|
|
|
if st.st_nlink > 1:
|
|
|
|
source = self.hard_links.get((st.st_ino, st.st_dev))
|
|
|
|
if (st.st_ino, st.st_dev) in self.hard_links:
|
2011-06-19 18:34:46 +00:00
|
|
|
item = self.stat_attrs(st, path)
|
2016-03-17 21:39:57 +00:00
|
|
|
item.update({
|
|
|
|
b'path': safe_path,
|
|
|
|
b'source': source,
|
|
|
|
})
|
2011-06-19 18:34:46 +00:00
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
status = 'h' # regular file, hardlink (to already seen inodes)
|
|
|
|
return status
|
2010-10-20 20:53:58 +00:00
|
|
|
else:
|
|
|
|
self.hard_links[st.st_ino, st.st_dev] = safe_path
|
2016-04-23 20:57:04 +00:00
|
|
|
path_hash = self.key.id_hash(safe_encode(os.path.join(self.cwd, path)))
|
2015-10-16 01:16:31 +00:00
|
|
|
first_run = not cache.files
|
2016-03-15 14:38:55 +00:00
|
|
|
ids = cache.file_known_and_unchanged(path_hash, st, ignore_inode)
|
2015-10-16 01:16:31 +00:00
|
|
|
if first_run:
|
2016-03-26 13:31:54 +00:00
|
|
|
logger.debug('Processing files ...')
|
2011-07-30 19:13:48 +00:00
|
|
|
chunks = None
|
2010-10-25 20:31:18 +00:00
|
|
|
if ids is not None:
|
|
|
|
# Make sure all ids are available
|
2014-01-22 19:58:48 +00:00
|
|
|
for id_ in ids:
|
|
|
|
if not cache.seen_chunk(id_):
|
2010-10-25 20:31:18 +00:00
|
|
|
break
|
|
|
|
else:
|
2014-01-22 19:58:48 +00:00
|
|
|
chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
|
2015-03-08 18:18:21 +00:00
|
|
|
status = 'U' # regular file, unchanged
|
|
|
|
else:
|
|
|
|
status = 'A' # regular file, added
|
2016-03-17 21:39:57 +00:00
|
|
|
item = {
|
|
|
|
b'path': safe_path,
|
|
|
|
b'hardlink_master': st.st_nlink > 1, # item is a hard link and has the chunks
|
|
|
|
}
|
2010-10-25 20:31:18 +00:00
|
|
|
# Only chunkify the file if needed
|
2011-07-30 19:13:48 +00:00
|
|
|
if chunks is None:
|
2016-04-18 23:13:10 +00:00
|
|
|
compress = self.compression_decider1.decide(path)
|
|
|
|
logger.debug('%s -> compression %s', path, compress['name'])
|
2016-02-18 22:01:55 +00:00
|
|
|
fh = Archive._open_rb(path)
|
2015-04-08 16:43:53 +00:00
|
|
|
with os.fdopen(fh, 'rb') as fd:
|
2011-07-30 19:13:48 +00:00
|
|
|
chunks = []
|
2016-03-18 02:16:12 +00:00
|
|
|
for data in self.chunker.chunkify(fd, fh):
|
2016-04-18 23:13:10 +00:00
|
|
|
chunks.append(cache.add_chunk(self.key.id_hash(data),
|
|
|
|
Chunk(data, compress=compress),
|
|
|
|
self.stats))
|
2015-12-27 10:06:03 +00:00
|
|
|
if self.show_progress:
|
|
|
|
self.stats.show_progress(item=item, dt=0.2)
|
2016-04-16 15:48:47 +00:00
|
|
|
cache.memorize_file(path_hash, st, [c.id for c in chunks])
|
2015-03-08 18:18:21 +00:00
|
|
|
status = status or 'M' # regular file, modified (if not 'A' already)
|
2015-12-27 10:06:03 +00:00
|
|
|
item[b'chunks'] = chunks
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2011-08-07 18:00:18 +00:00
|
|
|
self.stats.nfiles += 1
|
2011-08-02 21:20:46 +00:00
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
return status
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2010-10-21 19:21:43 +00:00
|
|
|
@staticmethod
|
2013-06-20 10:44:58 +00:00
|
|
|
def list_archives(repository, key, manifest, cache=None):
|
2015-05-26 00:04:41 +00:00
|
|
|
# expensive! see also Manifest.list_archive_infos.
|
2011-09-04 21:02:47 +00:00
|
|
|
for name, info in manifest.archives.items():
|
2013-06-20 10:44:58 +00:00
|
|
|
yield Archive(repository, key, manifest, name, cache=cache)
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2014-08-30 13:10:41 +00:00
|
|
|
@staticmethod
|
2016-02-18 22:01:55 +00:00
|
|
|
def _open_rb(path):
|
|
|
|
try:
|
|
|
|
# if we have O_NOATIME, this likely will succeed if we are root or owner of file:
|
|
|
|
return os.open(path, flags_noatime)
|
|
|
|
except PermissionError:
|
|
|
|
if flags_noatime == flags_normal:
|
|
|
|
# we do not have O_NOATIME, no need to try again:
|
|
|
|
raise
|
|
|
|
# Was this EPERM due to the O_NOATIME flag? Try again without it:
|
|
|
|
return os.open(path, flags_normal)
|
2014-08-30 13:10:41 +00:00
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2015-07-15 09:30:25 +00:00
|
|
|
class RobustUnpacker:
|
2014-02-24 21:43:17 +00:00
|
|
|
"""A restartable/robust version of the streaming msgpack unpacker
|
|
|
|
"""
|
|
|
|
def __init__(self, validator):
|
2015-07-11 16:31:49 +00:00
|
|
|
super().__init__()
|
2015-11-08 13:24:58 +00:00
|
|
|
self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
|
2014-02-24 21:43:17 +00:00
|
|
|
self.validator = validator
|
|
|
|
self._buffered_data = []
|
|
|
|
self._resync = False
|
|
|
|
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
|
|
|
|
|
|
|
|
def resync(self):
|
|
|
|
self._buffered_data = []
|
|
|
|
self._resync = True
|
|
|
|
|
|
|
|
def feed(self, data):
|
|
|
|
if self._resync:
|
|
|
|
self._buffered_data.append(data)
|
|
|
|
else:
|
|
|
|
self._unpacker.feed(data)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self):
|
|
|
|
if self._resync:
|
2014-02-24 22:37:21 +00:00
|
|
|
data = b''.join(self._buffered_data)
|
2014-02-24 21:43:17 +00:00
|
|
|
while self._resync:
|
|
|
|
if not data:
|
|
|
|
raise StopIteration
|
2014-03-06 19:05:13 +00:00
|
|
|
# Abort early if the data does not look like a serialized dict
|
|
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
|
|
data = data[1:]
|
|
|
|
continue
|
|
|
|
# Make sure it looks like an item dict
|
|
|
|
for pattern in self.item_keys:
|
|
|
|
if data[1:].startswith(pattern):
|
|
|
|
break
|
|
|
|
else:
|
2014-03-04 20:15:52 +00:00
|
|
|
data = data[1:]
|
|
|
|
continue
|
2014-03-06 19:05:13 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
|
|
|
|
self._unpacker.feed(data)
|
|
|
|
try:
|
|
|
|
item = next(self._unpacker)
|
2014-02-24 21:43:17 +00:00
|
|
|
if self.validator(item):
|
|
|
|
self._resync = False
|
2014-02-24 22:37:21 +00:00
|
|
|
return item
|
|
|
|
# Ignore exceptions that might be raised when feeding
|
|
|
|
# msgpack with invalid data
|
|
|
|
except (TypeError, ValueError, StopIteration):
|
|
|
|
pass
|
|
|
|
data = data[1:]
|
2014-02-24 21:43:17 +00:00
|
|
|
else:
|
2014-02-24 22:37:21 +00:00
|
|
|
return next(self._unpacker)
|
2014-02-24 21:43:17 +00:00
|
|
|
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
class ArchiveChecker:
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.error_found = False
|
|
|
|
self.possibly_superseded = set()
|
|
|
|
|
2016-05-13 20:50:34 +00:00
|
|
|
def check(self, repository, repair=False, archive=None, last=None, prefix=None, verify_data=False,
|
|
|
|
save_space=False):
|
|
|
|
"""Perform a set of checks on 'repository'
|
|
|
|
|
|
|
|
:param repair: enable repair mode, write updated or corrected data into repository
|
|
|
|
:param archive: only check this archive
|
|
|
|
:param last: only check this number of recent archives
|
|
|
|
:param prefix: only check archives with this prefix
|
|
|
|
:param verify_data: integrity verification of data referenced by archives
|
|
|
|
:param save_space: Repository.commit(save_space)
|
|
|
|
"""
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Starting archive consistency check...')
|
2015-12-12 23:39:15 +00:00
|
|
|
self.check_all = archive is None and last is None and prefix is None
|
2014-02-24 22:37:21 +00:00
|
|
|
self.repair = repair
|
|
|
|
self.repository = repository
|
|
|
|
self.init_chunks()
|
|
|
|
self.key = self.identify_key(repository)
|
2015-03-17 22:47:21 +00:00
|
|
|
if Manifest.MANIFEST_ID not in self.chunks:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.error("Repository manifest not found!")
|
|
|
|
self.error_found = True
|
2014-02-24 22:37:21 +00:00
|
|
|
self.manifest = self.rebuild_manifest()
|
|
|
|
else:
|
|
|
|
self.manifest, _ = Manifest.load(repository, key=self.key)
|
2015-12-12 23:39:15 +00:00
|
|
|
self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
|
2016-05-13 20:50:34 +00:00
|
|
|
if verify_data:
|
|
|
|
self.verify_data()
|
2015-08-09 10:43:57 +00:00
|
|
|
self.orphan_chunks_check()
|
2015-11-18 01:27:25 +00:00
|
|
|
self.finish(save_space=save_space)
|
2015-12-07 23:21:46 +00:00
|
|
|
if self.error_found:
|
|
|
|
logger.error('Archive consistency check complete, problems found.')
|
|
|
|
else:
|
2015-10-02 14:58:08 +00:00
|
|
|
logger.info('Archive consistency check complete, no problems found.')
|
2014-02-24 22:37:21 +00:00
|
|
|
return self.repair or not self.error_found
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
def init_chunks(self):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Fetch a list of all object keys from repository
|
|
|
|
"""
|
2015-08-08 22:36:17 +00:00
|
|
|
# Explicitly set the initial hash table capacity to avoid performance issues
|
2014-02-17 17:25:25 +00:00
|
|
|
# due to hash table "resonance"
|
|
|
|
capacity = int(len(self.repository) * 1.2)
|
2014-07-10 13:32:12 +00:00
|
|
|
self.chunks = ChunkIndex(capacity)
|
2014-02-16 21:21:18 +00:00
|
|
|
marker = None
|
|
|
|
while True:
|
|
|
|
result = self.repository.list(limit=10000, marker=marker)
|
|
|
|
if not result:
|
|
|
|
break
|
|
|
|
marker = result[-1]
|
2016-04-16 15:48:47 +00:00
|
|
|
init_entry = ChunkIndexEntry(refcount=0, size=0, csize=0)
|
2014-02-16 21:21:18 +00:00
|
|
|
for id_ in result:
|
2016-04-16 15:48:47 +00:00
|
|
|
self.chunks[id_] = init_entry
|
2014-02-16 21:21:18 +00:00
|
|
|
|
|
|
|
def identify_key(self, repository):
|
|
|
|
cdata = repository.get(next(self.chunks.iteritems())[0])
|
|
|
|
return key_factory(repository, cdata)
|
|
|
|
|
2016-05-13 20:50:34 +00:00
|
|
|
def verify_data(self):
|
|
|
|
logger.info('Starting cryptographic data integrity verification...')
|
|
|
|
pi = ProgressIndicatorPercent(total=len(self.chunks), msg="Verifying data %6.2f%%", step=0.01, same_line=True)
|
|
|
|
count = errors = 0
|
|
|
|
for chunk_id, (refcount, *_) in self.chunks.iteritems():
|
|
|
|
pi.show()
|
|
|
|
if not refcount:
|
|
|
|
continue
|
|
|
|
encrypted_data = self.repository.get(chunk_id)
|
|
|
|
try:
|
|
|
|
_, data = self.key.decrypt(chunk_id, encrypted_data)
|
|
|
|
except IntegrityError as integrity_error:
|
|
|
|
self.error_found = True
|
|
|
|
errors += 1
|
|
|
|
logger.error('chunk %s, integrity error: %s', bin_to_hex(chunk_id), integrity_error)
|
|
|
|
count += 1
|
|
|
|
pi.finish()
|
|
|
|
log = logger.error if errors else logger.info
|
|
|
|
log('Finished cryptographic data integrity verification, verified %d chunks with %d integrity errors.', count, errors)
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
def rebuild_manifest(self):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Rebuild the manifest object if it is missing
|
|
|
|
|
|
|
|
Iterates through all objects in the repository looking for archive metadata blocks.
|
|
|
|
"""
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Rebuilding missing manifest, this might take some time...')
|
2014-02-16 21:21:18 +00:00
|
|
|
manifest = Manifest(self.key, self.repository)
|
|
|
|
for chunk_id, _ in self.chunks.iteritems():
|
|
|
|
cdata = self.repository.get(chunk_id)
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(chunk_id, cdata)
|
2014-03-01 14:00:21 +00:00
|
|
|
# Some basic sanity checks of the payload before feeding it into msgpack
|
|
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
|
|
continue
|
2015-03-17 22:47:21 +00:00
|
|
|
if b'cmdline' not in data or b'\xa7version\x01' not in data:
|
2014-03-01 14:00:21 +00:00
|
|
|
continue
|
2014-02-16 21:21:18 +00:00
|
|
|
try:
|
|
|
|
archive = msgpack.unpackb(data)
|
2015-03-20 01:31:39 +00:00
|
|
|
# Ignore exceptions that might be raised when feeding
|
|
|
|
# msgpack with invalid data
|
|
|
|
except (TypeError, ValueError, StopIteration):
|
2014-02-16 21:21:18 +00:00
|
|
|
continue
|
|
|
|
if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
|
2014-02-16 21:21:18 +00:00
|
|
|
manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Manifest rebuild complete.')
|
2014-02-16 21:21:18 +00:00
|
|
|
return manifest
|
|
|
|
|
2015-12-12 23:39:15 +00:00
|
|
|
def rebuild_refcounts(self, archive=None, last=None, prefix=None):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Rebuild object reference counts by walking the metadata
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
Missing and/or incorrect data is repaired when detected
|
|
|
|
"""
|
2014-02-16 21:21:18 +00:00
|
|
|
# Exclude the manifest from chunks
|
|
|
|
del self.chunks[Manifest.MANIFEST_ID]
|
2014-02-18 20:16:36 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
def mark_as_possibly_superseded(id_):
|
2016-04-16 15:48:47 +00:00
|
|
|
if self.chunks.get(id_, ChunkIndexEntry(0, 0, 0)).refcount == 0:
|
2014-02-16 21:21:18 +00:00
|
|
|
self.possibly_superseded.add(id_)
|
|
|
|
|
|
|
|
def add_callback(chunk):
|
2016-03-18 02:16:12 +00:00
|
|
|
id_ = self.key.id_hash(chunk.data)
|
2014-02-16 21:21:18 +00:00
|
|
|
cdata = self.key.encrypt(chunk)
|
2016-03-18 02:16:12 +00:00
|
|
|
add_reference(id_, len(chunk.data), len(cdata), cdata)
|
2014-02-16 21:21:18 +00:00
|
|
|
return id_
|
|
|
|
|
|
|
|
def add_reference(id_, size, csize, cdata=None):
|
|
|
|
try:
|
2016-04-11 22:10:44 +00:00
|
|
|
self.chunks.incref(id_)
|
2014-02-16 21:21:18 +00:00
|
|
|
except KeyError:
|
|
|
|
assert cdata is not None
|
2016-04-16 15:48:47 +00:00
|
|
|
self.chunks[id_] = ChunkIndexEntry(refcount=1, size=size, csize=csize)
|
2014-02-16 21:21:18 +00:00
|
|
|
if self.repair:
|
|
|
|
self.repository.put(id_, cdata)
|
|
|
|
|
|
|
|
def verify_file_chunks(item):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Verifies that all file chunks are present
|
|
|
|
|
|
|
|
Missing file chunks will be replaced with new chunks of the same
|
|
|
|
length containing all zeros.
|
|
|
|
"""
|
2014-02-16 21:21:18 +00:00
|
|
|
offset = 0
|
|
|
|
chunk_list = []
|
|
|
|
for chunk_id, size, csize in item[b'chunks']:
|
2015-03-17 22:47:21 +00:00
|
|
|
if chunk_id not in self.chunks:
|
2014-02-16 21:21:18 +00:00
|
|
|
# If a file chunk is missing, create an all empty replacement chunk
|
2016-04-23 20:57:04 +00:00
|
|
|
logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(safe_decode(item[b'path']), offset, offset + size))
|
2015-12-07 23:21:46 +00:00
|
|
|
self.error_found = True
|
2014-02-16 21:21:18 +00:00
|
|
|
data = bytes(size)
|
|
|
|
chunk_id = self.key.id_hash(data)
|
2016-03-18 02:16:12 +00:00
|
|
|
cdata = self.key.encrypt(Chunk(data))
|
2014-02-16 21:21:18 +00:00
|
|
|
csize = len(cdata)
|
|
|
|
add_reference(chunk_id, size, csize, cdata)
|
|
|
|
else:
|
|
|
|
add_reference(chunk_id, size, csize)
|
|
|
|
chunk_list.append((chunk_id, size, csize))
|
|
|
|
offset += size
|
|
|
|
item[b'chunks'] = chunk_list
|
|
|
|
|
|
|
|
def robust_iterator(archive):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Iterates through all archive items
|
|
|
|
|
|
|
|
Missing item chunks will be skipped and the msgpack stream will be restarted
|
|
|
|
"""
|
2014-02-24 21:43:17 +00:00
|
|
|
unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
|
|
|
|
_state = 0
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
def missing_chunk_detector(chunk_id):
|
2014-02-24 21:43:17 +00:00
|
|
|
nonlocal _state
|
2015-03-17 22:47:21 +00:00
|
|
|
if _state % 2 != int(chunk_id not in self.chunks):
|
2014-02-24 21:43:17 +00:00
|
|
|
_state += 1
|
|
|
|
return _state
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2015-11-03 22:45:49 +00:00
|
|
|
def report(msg, chunk_id, chunk_no):
|
2016-04-23 20:42:56 +00:00
|
|
|
cid = bin_to_hex(chunk_id)
|
2015-11-03 22:45:49 +00:00
|
|
|
msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
|
2015-12-07 23:21:46 +00:00
|
|
|
self.error_found = True
|
|
|
|
logger.error(msg)
|
2015-11-03 22:45:49 +00:00
|
|
|
|
|
|
|
i = 0
|
2014-02-16 21:21:18 +00:00
|
|
|
for state, items in groupby(archive[b'items'], missing_chunk_detector):
|
2014-02-24 21:43:17 +00:00
|
|
|
items = list(items)
|
2014-02-16 21:21:18 +00:00
|
|
|
if state % 2:
|
2015-11-03 22:45:49 +00:00
|
|
|
for chunk_id in items:
|
|
|
|
report('item metadata chunk missing', chunk_id, i)
|
|
|
|
i += 1
|
2014-02-24 21:43:17 +00:00
|
|
|
continue
|
|
|
|
if state > 0:
|
|
|
|
unpacker.resync()
|
2014-03-13 21:29:47 +00:00
|
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(chunk_id, cdata)
|
|
|
|
unpacker.feed(data)
|
2015-11-03 22:45:49 +00:00
|
|
|
try:
|
|
|
|
for item in unpacker:
|
|
|
|
if isinstance(item, dict):
|
|
|
|
yield item
|
|
|
|
else:
|
|
|
|
report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
|
|
|
|
except Exception:
|
|
|
|
report('Exception while unpacking item metadata', chunk_id, i)
|
|
|
|
raise
|
|
|
|
i += 1
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2015-08-08 20:11:40 +00:00
|
|
|
if archive is None:
|
|
|
|
# we need last N or all archives
|
|
|
|
archive_items = sorted(self.manifest.archives.items(), reverse=True,
|
|
|
|
key=lambda name_info: name_info[1][b'time'])
|
2015-12-12 23:39:15 +00:00
|
|
|
if prefix is not None:
|
|
|
|
archive_items = [item for item in archive_items if item[0].startswith(prefix)]
|
|
|
|
num_archives = len(archive_items)
|
2015-08-08 20:11:40 +00:00
|
|
|
end = None if last is None else min(num_archives, last)
|
|
|
|
else:
|
|
|
|
# we only want one specific archive
|
|
|
|
archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
|
2016-05-13 20:50:34 +00:00
|
|
|
if not archive_items:
|
|
|
|
logger.error("Archive '%s' not found.", archive)
|
2015-08-08 20:11:40 +00:00
|
|
|
num_archives = 1
|
|
|
|
end = 1
|
2016-01-16 22:42:54 +00:00
|
|
|
|
|
|
|
with cache_if_remote(self.repository) as repository:
|
|
|
|
for i, (name, info) in enumerate(archive_items[:end]):
|
|
|
|
logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
|
|
|
|
archive_id = info[b'id']
|
|
|
|
if archive_id not in self.chunks:
|
|
|
|
logger.error('Archive metadata block is missing!')
|
|
|
|
self.error_found = True
|
|
|
|
del self.manifest.archives[name]
|
|
|
|
continue
|
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
|
cdata = self.repository.get(archive_id)
|
2016-03-18 02:16:12 +00:00
|
|
|
_, data = self.key.decrypt(archive_id, cdata)
|
2016-01-16 22:42:54 +00:00
|
|
|
archive = StableDict(msgpack.unpackb(data))
|
|
|
|
if archive[b'version'] != 1:
|
|
|
|
raise Exception('Unknown archive metadata version')
|
2016-04-17 01:15:19 +00:00
|
|
|
decode_dict(archive, ARCHIVE_TEXT_KEYS)
|
2016-04-23 20:57:04 +00:00
|
|
|
archive[b'cmdline'] = [safe_decode(arg) for arg in archive[b'cmdline']]
|
2016-01-16 22:42:54 +00:00
|
|
|
items_buffer = ChunkBuffer(self.key)
|
|
|
|
items_buffer.write_chunk = add_callback
|
|
|
|
for item in robust_iterator(archive):
|
|
|
|
if b'chunks' in item:
|
|
|
|
verify_file_chunks(item)
|
|
|
|
items_buffer.add(item)
|
|
|
|
items_buffer.flush(flush=True)
|
|
|
|
for previous_item_id in archive[b'items']:
|
|
|
|
mark_as_possibly_superseded(previous_item_id)
|
|
|
|
archive[b'items'] = items_buffer.chunks
|
|
|
|
data = msgpack.packb(archive, unicode_errors='surrogateescape')
|
|
|
|
new_archive_id = self.key.id_hash(data)
|
2016-03-18 02:16:12 +00:00
|
|
|
cdata = self.key.encrypt(Chunk(data))
|
2016-01-16 22:42:54 +00:00
|
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
|
|
info[b'id'] = new_archive_id
|
2014-02-24 22:37:21 +00:00
|
|
|
|
2015-08-09 10:43:57 +00:00
|
|
|
def orphan_chunks_check(self):
|
|
|
|
if self.check_all:
|
2016-04-16 15:48:47 +00:00
|
|
|
unused = {id_ for id_, entry in self.chunks.iteritems() if entry.refcount == 0}
|
2015-08-09 10:43:57 +00:00
|
|
|
orphaned = unused - self.possibly_superseded
|
|
|
|
if orphaned:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.error('{} orphaned objects found!'.format(len(orphaned)))
|
|
|
|
self.error_found = True
|
2015-08-09 10:43:57 +00:00
|
|
|
if self.repair:
|
|
|
|
for id_ in unused:
|
|
|
|
self.repository.delete(id_)
|
|
|
|
else:
|
2016-04-03 15:58:15 +00:00
|
|
|
logger.info('Orphaned objects check skipped (needs all archives checked).')
|
2015-08-09 10:43:57 +00:00
|
|
|
|
2015-11-18 01:27:25 +00:00
|
|
|
def finish(self, save_space=False):
|
2014-02-24 22:37:21 +00:00
|
|
|
if self.repair:
|
|
|
|
self.manifest.write()
|
2015-11-18 01:27:25 +00:00
|
|
|
self.repository.commit(save_space=save_space)
|
2016-04-07 09:29:52 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ArchiveRecreater:
|
|
|
|
AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024
|
|
|
|
"""Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes."""
|
|
|
|
|
|
|
|
class FakeTargetArchive:
|
|
|
|
def __init__(self):
|
|
|
|
self.stats = Statistics()
|
|
|
|
|
|
|
|
class Interrupted(Exception):
|
|
|
|
def __init__(self, metadata=None):
|
|
|
|
self.metadata = metadata or {}
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def is_temporary_archive(archive_name):
|
|
|
|
return archive_name.endswith('.recreate')
|
|
|
|
|
|
|
|
def __init__(self, repository, manifest, key, cache, matcher,
|
|
|
|
exclude_caches=False, exclude_if_present=None, keep_tag_files=False,
|
2016-04-18 23:13:10 +00:00
|
|
|
chunker_params=None, compression=None, compression_files=None,
|
2016-04-07 09:29:52 +00:00
|
|
|
dry_run=False, stats=False, progress=False, file_status_printer=None):
|
|
|
|
self.repository = repository
|
|
|
|
self.key = key
|
|
|
|
self.manifest = manifest
|
|
|
|
self.cache = cache
|
|
|
|
|
|
|
|
self.matcher = matcher
|
|
|
|
self.exclude_caches = exclude_caches
|
|
|
|
self.exclude_if_present = exclude_if_present or []
|
|
|
|
self.keep_tag_files = keep_tag_files
|
|
|
|
|
|
|
|
self.chunker_params = chunker_params or CHUNKER_PARAMS
|
|
|
|
self.recompress = bool(compression)
|
2016-04-18 23:13:10 +00:00
|
|
|
self.compression = compression or CompressionSpec('none')
|
|
|
|
self.seen_chunks = set()
|
|
|
|
self.compression_decider1 = CompressionDecider1(compression or CompressionSpec('none'),
|
|
|
|
compression_files or [])
|
|
|
|
key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none'))
|
2016-04-07 09:29:52 +00:00
|
|
|
|
|
|
|
self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100)
|
|
|
|
logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold))
|
|
|
|
|
|
|
|
self.dry_run = dry_run
|
|
|
|
self.stats = stats
|
|
|
|
self.progress = progress
|
|
|
|
self.print_file_status = file_status_printer or (lambda *args: None)
|
|
|
|
|
|
|
|
self.interrupt = False
|
|
|
|
self.errors = False
|
|
|
|
|
2016-04-10 12:09:05 +00:00
|
|
|
def recreate(self, archive_name, comment=None):
|
2016-04-07 09:29:52 +00:00
|
|
|
assert not self.is_temporary_archive(archive_name)
|
|
|
|
archive = self.open_archive(archive_name)
|
|
|
|
target, resume_from = self.create_target_or_resume(archive)
|
|
|
|
if self.exclude_if_present or self.exclude_caches:
|
|
|
|
self.matcher_add_tagged_dirs(archive)
|
2016-04-10 12:09:05 +00:00
|
|
|
if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None:
|
2016-04-07 09:29:52 +00:00
|
|
|
logger.info("Skipping archive %s, nothing to do", archive_name)
|
|
|
|
return True
|
|
|
|
try:
|
|
|
|
self.process_items(archive, target, resume_from)
|
|
|
|
except self.Interrupted as e:
|
|
|
|
return self.save(archive, target, completed=False, metadata=e.metadata)
|
2016-04-10 12:09:05 +00:00
|
|
|
return self.save(archive, target, comment)
|
2016-04-07 09:29:52 +00:00
|
|
|
|
|
|
|
def process_items(self, archive, target, resume_from=None):
|
|
|
|
matcher = self.matcher
|
|
|
|
target_is_subset = not matcher.empty()
|
|
|
|
hardlink_masters = {} if target_is_subset else None
|
|
|
|
|
|
|
|
def item_is_hardlink_master(item):
|
|
|
|
return (target_is_subset and
|
|
|
|
stat.S_ISREG(item[b'mode']) and
|
|
|
|
item.get(b'hardlink_master', True) and
|
|
|
|
b'source' not in item and
|
|
|
|
not matcher.match(item[b'path']))
|
|
|
|
|
|
|
|
for item in archive.iter_items():
|
|
|
|
if item_is_hardlink_master(item):
|
|
|
|
# Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters
|
|
|
|
hardlink_masters[item[b'path']] = (item.get(b'chunks'), None)
|
|
|
|
continue
|
|
|
|
if resume_from:
|
|
|
|
# Fast forward to after the last processed file
|
|
|
|
if item[b'path'] == resume_from:
|
|
|
|
logger.info('Fast-forwarded to %s', remove_surrogates(item[b'path']))
|
|
|
|
resume_from = None
|
|
|
|
continue
|
|
|
|
if not matcher.match(item[b'path']):
|
|
|
|
self.print_file_status('x', item[b'path'])
|
|
|
|
continue
|
|
|
|
if target_is_subset and stat.S_ISREG(item[b'mode']) and item.get(b'source') in hardlink_masters:
|
|
|
|
# master of this hard link is outside the target subset
|
|
|
|
chunks, new_source = hardlink_masters[item[b'source']]
|
|
|
|
if new_source is None:
|
|
|
|
# First item to use this master, move the chunks
|
|
|
|
item[b'chunks'] = chunks
|
|
|
|
hardlink_masters[item[b'source']] = (None, item[b'path'])
|
|
|
|
del item[b'source']
|
|
|
|
else:
|
|
|
|
# Master was already moved, only update this item's source
|
|
|
|
item[b'source'] = new_source
|
|
|
|
if self.dry_run:
|
|
|
|
self.print_file_status('-', item[b'path'])
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
self.process_item(archive, target, item)
|
|
|
|
except self.Interrupted:
|
|
|
|
if self.progress:
|
|
|
|
target.stats.show_progress(final=True)
|
|
|
|
raise
|
|
|
|
if self.progress:
|
|
|
|
target.stats.show_progress(final=True)
|
|
|
|
|
|
|
|
def process_item(self, archive, target, item):
|
|
|
|
if b'chunks' in item:
|
|
|
|
item[b'chunks'] = self.process_chunks(archive, target, item)
|
|
|
|
target.stats.nfiles += 1
|
|
|
|
target.add_item(item)
|
|
|
|
self.print_file_status(file_status(item[b'mode']), item[b'path'])
|
|
|
|
if self.interrupt:
|
|
|
|
raise self.Interrupted
|
|
|
|
|
|
|
|
def process_chunks(self, archive, target, item):
|
|
|
|
"""Return new chunk ID list for 'item'."""
|
2016-04-18 23:13:10 +00:00
|
|
|
# TODO: support --compression-from
|
2016-04-07 09:29:52 +00:00
|
|
|
if not self.recompress and not target.recreate_rechunkify:
|
|
|
|
for chunk_id, size, csize in item[b'chunks']:
|
|
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
|
return item[b'chunks']
|
|
|
|
new_chunks = self.process_partial_chunks(target)
|
|
|
|
chunk_iterator = self.create_chunk_iterator(archive, target, item)
|
|
|
|
consume(chunk_iterator, len(new_chunks))
|
|
|
|
for chunk in chunk_iterator:
|
2016-03-18 02:16:12 +00:00
|
|
|
chunk_id = self.key.id_hash(chunk.data)
|
2016-04-07 09:29:52 +00:00
|
|
|
if chunk_id in self.seen_chunks:
|
|
|
|
new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats))
|
|
|
|
else:
|
|
|
|
# TODO: detect / skip / --always-recompress
|
|
|
|
chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=self.recompress)
|
|
|
|
new_chunks.append((chunk_id, size, csize))
|
|
|
|
self.seen_chunks.add(chunk_id)
|
|
|
|
if self.recompress:
|
|
|
|
# This tracks how many bytes are uncommitted but compactable, since we are recompressing
|
|
|
|
# existing chunks.
|
|
|
|
target.recreate_uncomitted_bytes += csize
|
|
|
|
if target.recreate_uncomitted_bytes >= self.autocommit_threshold:
|
|
|
|
# Issue commits to limit additional space usage when recompressing chunks
|
|
|
|
target.recreate_uncomitted_bytes = 0
|
|
|
|
self.repository.commit()
|
|
|
|
if self.progress:
|
|
|
|
target.stats.show_progress(item=item, dt=0.2)
|
|
|
|
if self.interrupt:
|
|
|
|
raise self.Interrupted({
|
|
|
|
'recreate_partial_chunks': new_chunks,
|
|
|
|
})
|
|
|
|
return new_chunks
|
|
|
|
|
|
|
|
def create_chunk_iterator(self, archive, target, item):
|
|
|
|
"""Return iterator of chunks to store for 'item' from 'archive' in 'target'."""
|
|
|
|
chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item[b'chunks']])
|
|
|
|
if target.recreate_rechunkify:
|
|
|
|
# The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk
|
|
|
|
# (does not load the entire file into memory)
|
|
|
|
file = ChunkIteratorFileWrapper(chunk_iterator)
|
2016-03-18 02:16:12 +00:00
|
|
|
|
|
|
|
def _chunk_iterator():
|
|
|
|
for data in target.chunker.chunkify(file):
|
|
|
|
yield Chunk(data)
|
|
|
|
|
|
|
|
chunk_iterator = _chunk_iterator()
|
2016-04-07 09:29:52 +00:00
|
|
|
return chunk_iterator
|
|
|
|
|
|
|
|
def process_partial_chunks(self, target):
|
|
|
|
"""Return chunks from a previous run for archive 'target' (if any) or an empty list."""
|
|
|
|
if not target.recreate_partial_chunks:
|
|
|
|
return []
|
|
|
|
# No incref, create_target_or_resume already did that before to deleting the old target archive
|
|
|
|
# So just copy these over
|
|
|
|
partial_chunks = target.recreate_partial_chunks
|
|
|
|
target.recreate_partial_chunks = None
|
|
|
|
for chunk_id, size, csize in partial_chunks:
|
|
|
|
self.seen_chunks.add(chunk_id)
|
|
|
|
logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks))
|
|
|
|
return partial_chunks
|
|
|
|
|
2016-04-10 12:09:05 +00:00
|
|
|
def save(self, archive, target, comment=None, completed=True, metadata=None):
|
2016-04-07 09:29:52 +00:00
|
|
|
"""Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict."""
|
|
|
|
if self.dry_run:
|
|
|
|
return completed
|
|
|
|
if completed:
|
|
|
|
timestamp = archive.ts.replace(tzinfo=None)
|
2016-04-10 12:09:05 +00:00
|
|
|
if comment is None:
|
|
|
|
comment = archive.metadata.get(b'comment', '')
|
|
|
|
target.save(timestamp=timestamp, comment=comment, additional_metadata={
|
2016-04-07 09:29:52 +00:00
|
|
|
'cmdline': archive.metadata[b'cmdline'],
|
|
|
|
'recreate_cmdline': sys.argv,
|
|
|
|
})
|
|
|
|
archive.delete(Statistics(), progress=self.progress)
|
|
|
|
target.rename(archive.name)
|
|
|
|
if self.stats:
|
|
|
|
target.end = datetime.utcnow()
|
|
|
|
log_multi(DASHES,
|
|
|
|
str(target),
|
|
|
|
DASHES,
|
|
|
|
str(target.stats),
|
|
|
|
str(self.cache),
|
|
|
|
DASHES)
|
|
|
|
else:
|
|
|
|
additional_metadata = metadata or {}
|
|
|
|
additional_metadata.update({
|
|
|
|
'recreate_source_id': archive.id,
|
|
|
|
'recreate_args': sys.argv[1:],
|
|
|
|
})
|
|
|
|
target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata)
|
|
|
|
logger.info('Run the same command again to resume.')
|
|
|
|
return completed
|
|
|
|
|
|
|
|
def matcher_add_tagged_dirs(self, archive):
|
|
|
|
"""Add excludes to the matcher created by exclude_cache and exclude_if_present."""
|
|
|
|
def exclude(dir, tag_item):
|
|
|
|
if self.keep_tag_files:
|
|
|
|
tag_files.append(PathPrefixPattern(tag_item[b'path']))
|
|
|
|
tagged_dirs.append(FnmatchPattern(dir + '/'))
|
|
|
|
else:
|
|
|
|
tagged_dirs.append(PathPrefixPattern(dir))
|
|
|
|
|
|
|
|
matcher = self.matcher
|
|
|
|
tag_files = []
|
|
|
|
tagged_dirs = []
|
2016-04-17 01:15:19 +00:00
|
|
|
# build hardlink masters, but only for paths ending in CACHE_TAG_NAME, so we can read hard-linked TAGs
|
2016-04-07 09:29:52 +00:00
|
|
|
cachedir_masters = {}
|
|
|
|
|
|
|
|
for item in archive.iter_items(
|
2016-04-17 01:15:19 +00:00
|
|
|
filter=lambda item: item[b'path'].endswith(CACHE_TAG_NAME) or matcher.match(item[b'path'])):
|
|
|
|
if item[b'path'].endswith(CACHE_TAG_NAME):
|
2016-04-07 09:29:52 +00:00
|
|
|
cachedir_masters[item[b'path']] = item
|
|
|
|
if stat.S_ISREG(item[b'mode']):
|
|
|
|
dir, tag_file = os.path.split(item[b'path'])
|
|
|
|
if tag_file in self.exclude_if_present:
|
|
|
|
exclude(dir, item)
|
2016-04-17 01:15:19 +00:00
|
|
|
if self.exclude_caches and tag_file == CACHE_TAG_NAME:
|
2016-04-07 09:29:52 +00:00
|
|
|
if b'chunks' in item:
|
|
|
|
file = open_item(archive, item)
|
|
|
|
else:
|
|
|
|
file = open_item(archive, cachedir_masters[item[b'source']])
|
2016-04-17 01:15:19 +00:00
|
|
|
if file.read(len(CACHE_TAG_CONTENTS)).startswith(CACHE_TAG_CONTENTS):
|
2016-04-07 09:29:52 +00:00
|
|
|
exclude(dir, item)
|
|
|
|
matcher.add(tag_files, True)
|
|
|
|
matcher.add(tagged_dirs, False)
|
|
|
|
|
|
|
|
def create_target_or_resume(self, archive):
|
|
|
|
"""Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path"""
|
|
|
|
if self.dry_run:
|
|
|
|
return self.FakeTargetArchive(), None
|
|
|
|
target_name = archive.name + '.recreate'
|
|
|
|
resume = target_name in self.manifest.archives
|
|
|
|
target, resume_from = None, None
|
|
|
|
if resume:
|
|
|
|
target, resume_from = self.try_resume(archive, target_name)
|
|
|
|
if not target:
|
|
|
|
target = self.create_target_archive(target_name)
|
|
|
|
# If the archives use the same chunker params, then don't rechunkify
|
|
|
|
target.recreate_rechunkify = tuple(archive.metadata.get(b'chunker_params')) != self.chunker_params
|
|
|
|
return target, resume_from
|
|
|
|
|
|
|
|
def try_resume(self, archive, target_name):
|
|
|
|
"""Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
|
|
|
|
logger.info('Found %s, will resume interrupted operation', target_name)
|
|
|
|
old_target = self.open_archive(target_name)
|
|
|
|
resume_id = old_target.metadata[b'recreate_source_id']
|
2016-04-23 20:57:04 +00:00
|
|
|
resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
|
2016-04-07 09:29:52 +00:00
|
|
|
if resume_id != archive.id:
|
|
|
|
logger.warning('Source archive changed, will discard %s and start over', target_name)
|
2016-04-23 20:42:56 +00:00
|
|
|
logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
|
2016-04-07 09:29:52 +00:00
|
|
|
logger.warning('Current fingerprint: %s', archive.fpr)
|
|
|
|
old_target.delete(Statistics(), progress=self.progress)
|
|
|
|
return None, None # can't resume
|
|
|
|
if resume_args != sys.argv[1:]:
|
|
|
|
logger.warning('Command line changed, this might lead to inconsistencies')
|
|
|
|
logger.warning('Saved: %s', repr(resume_args))
|
|
|
|
logger.warning('Current: %s', repr(sys.argv[1:]))
|
|
|
|
target = self.create_target_archive(target_name + '.temp')
|
|
|
|
logger.info('Replaying items from interrupted operation...')
|
|
|
|
item = None
|
|
|
|
for item in old_target.iter_items():
|
|
|
|
if b'chunks' in item:
|
|
|
|
for chunk in item[b'chunks']:
|
2016-04-16 15:48:47 +00:00
|
|
|
self.cache.chunk_incref(chunk.id, target.stats)
|
2016-04-07 09:29:52 +00:00
|
|
|
target.stats.nfiles += 1
|
|
|
|
target.add_item(item)
|
|
|
|
if item:
|
|
|
|
resume_from = item[b'path']
|
|
|
|
else:
|
|
|
|
resume_from = None
|
|
|
|
if self.progress:
|
|
|
|
old_target.stats.show_progress(final=True)
|
|
|
|
target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
|
2016-04-10 13:59:10 +00:00
|
|
|
for chunk_id, size, csize in target.recreate_partial_chunks:
|
|
|
|
if not self.cache.seen_chunk(chunk_id):
|
|
|
|
try:
|
|
|
|
# Repository has __contains__, RemoteRepository doesn't
|
|
|
|
self.repository.get(chunk_id)
|
|
|
|
except Repository.ObjectNotFound:
|
|
|
|
# delete/prune/check between invocations: these chunks are gone.
|
|
|
|
target.recreate_partial_chunks = None
|
|
|
|
break
|
|
|
|
# fast-lane insert into chunks cache
|
|
|
|
self.cache.chunks[chunk_id] = (1, size, csize)
|
|
|
|
target.stats.update(size, csize, True)
|
|
|
|
continue
|
2016-04-07 09:29:52 +00:00
|
|
|
# incref now, otherwise old_target.delete() might delete these chunks
|
|
|
|
self.cache.chunk_incref(chunk_id, target.stats)
|
|
|
|
old_target.delete(Statistics(), progress=self.progress)
|
|
|
|
logger.info('Done replaying items')
|
|
|
|
return target, resume_from
|
|
|
|
|
|
|
|
def create_target_archive(self, name):
|
|
|
|
target = Archive(self.repository, self.key, self.manifest, name, create=True,
|
|
|
|
progress=self.progress, chunker_params=self.chunker_params, cache=self.cache,
|
2016-04-18 23:13:10 +00:00
|
|
|
checkpoint_interval=0, compression=self.compression)
|
2016-04-07 09:29:52 +00:00
|
|
|
target.recreate_partial_chunks = None
|
|
|
|
target.recreate_uncomitted_bytes = 0
|
|
|
|
return target
|
|
|
|
|
|
|
|
def open_archive(self, name, **kwargs):
|
|
|
|
return Archive(self.repository, self.key, self.manifest, name, cache=self.cache, **kwargs)
|