2015-10-02 19:56:21 +00:00
|
|
|
from binascii import hexlify
|
2015-04-19 21:24:14 +00:00
|
|
|
from datetime import datetime
|
2010-10-24 20:07:54 +00:00
|
|
|
from getpass import getuser
|
2014-02-16 21:21:18 +00:00
|
|
|
from itertools import groupby
|
2014-02-23 12:15:57 +00:00
|
|
|
import errno
|
convert most print() calls to logging
the logging level varies: most is logging.info(), in some place
logging.warning() or logging.error() are used when the condition is
clearly an error or warning. in other cases, we keep using print, but
force writing to sys.stderr, unless we interact with the user.
there were 77 calls to print before this commit, now there are 7, most
of which in the archiver module, which interacts directly with the
user. in one case there, we still use print() only because logging is
not setup properly yet during argument parsing.
it could be argued that commands like info or list should use print
directly, but we have converted them anyways, without ill effects on
the unit tests
unit tests still use print() in some places
this switches all informational output to stderr, which should help
with, if not fix jborg/attic#312 directly
2015-10-01 17:41:42 +00:00
|
|
|
|
2015-10-06 16:33:55 +00:00
|
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger()
|
2015-05-22 17:21:41 +00:00
|
|
|
from .key import key_factory
|
|
|
|
from .remote import cache_if_remote
|
2015-10-06 16:33:55 +00:00
|
|
|
|
2010-10-20 17:59:15 +00:00
|
|
|
import os
|
2010-10-24 18:18:18 +00:00
|
|
|
import socket
|
2010-10-20 17:59:15 +00:00
|
|
|
import stat
|
2010-10-20 19:08:46 +00:00
|
|
|
import sys
|
2011-09-10 15:19:02 +00:00
|
|
|
import time
|
2013-06-03 11:45:48 +00:00
|
|
|
from io import BytesIO
|
2015-05-22 17:21:41 +00:00
|
|
|
from . import xattr
|
2015-10-09 16:44:31 +00:00
|
|
|
from .helpers import parse_timestamp, Error, uid2user, user2uid, gid2group, group2gid, format_timedelta, \
|
2015-11-13 15:38:50 +00:00
|
|
|
Manifest, Statistics, decode_dict, make_path_safe, StableDict, int_to_bigint, bigint_to_int, \
|
2015-12-14 21:39:43 +00:00
|
|
|
ProgressIndicatorPercent
|
2015-11-13 15:38:50 +00:00
|
|
|
from .platform import acl_get, acl_set
|
|
|
|
from .chunker import Chunker
|
|
|
|
from .hashindex import ChunkIndex
|
|
|
|
import msgpack
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2011-08-07 15:10:21 +00:00
|
|
|
ITEMS_BUFFER = 1024 * 1024
|
2015-06-19 23:20:46 +00:00
|
|
|
|
2015-12-18 20:44:14 +00:00
|
|
|
CHUNK_MIN_EXP = 19 # 2**19 == 512kiB
|
2015-06-20 23:46:41 +00:00
|
|
|
CHUNK_MAX_EXP = 23 # 2**23 == 8MiB
|
|
|
|
HASH_WINDOW_SIZE = 0xfff # 4095B
|
2015-12-18 20:44:14 +00:00
|
|
|
HASH_MASK_BITS = 21 # results in ~2MiB chunks statistically
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2015-06-20 23:46:41 +00:00
|
|
|
# defaults, use --chunker-params to override
|
|
|
|
CHUNKER_PARAMS = (CHUNK_MIN_EXP, CHUNK_MAX_EXP, HASH_MASK_BITS, HASH_WINDOW_SIZE)
|
simple sparse file support, made chunk buffer size flexible
Implemented sparse file support to remove this blocker for people backing up lots of
huge sparse files (like VM images). Attic could not support this use case yet as it would
have restored all files to their fully expanded size, possibly running out of disk space if
the total expanded size would be bigger than the available space.
Please note that this is a very simple implementation of sparse file support - at backup time,
it does not do anything special (it just reads all these zero bytes, chunks, compresses and
encrypts them as usual). At restore time, it detects chunks that are completely filled with zeros
and does a seek on the output file rather than a normal data write, so it creates a hole in
a sparse file. The chunk size for these all-zero chunks is currently 10MiB, so it'll create holes
of multiples of that size (depends also a bit on fs block size, alignment, previously written data).
Special cases like sparse files starting and/or ending with a hole are supported.
Please note that it will currently always create sparse files at restore time if it detects all-zero
chunks.
Also improved:
I needed a constant for the max. chunk size, so I introduced CHUNK_MAX (see also
existing CHUNK_MIN) for the maximum chunk size (which is the same as the chunk
buffer size).
Attic still always uses 10MiB chunk buffer size now, but it could be changed now more easily.
2015-04-15 14:29:18 +00:00
|
|
|
|
2016-01-15 19:56:21 +00:00
|
|
|
# chunker params for the items metadata stream, finer granularity
|
|
|
|
ITEMS_CHUNKER_PARAMS = (12, 16, 14, HASH_WINDOW_SIZE)
|
|
|
|
|
2013-06-03 11:45:48 +00:00
|
|
|
has_lchmod = hasattr(os, 'lchmod')
|
2014-04-08 19:52:26 +00:00
|
|
|
has_lchflags = hasattr(os, 'lchflags')
|
2010-10-27 17:30:21 +00:00
|
|
|
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
class DownloadPipeline:
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def __init__(self, repository, key):
|
|
|
|
self.repository = repository
|
|
|
|
self.key = key
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-23 21:13:08 +00:00
|
|
|
def unpack_many(self, ids, filter=None, preload=False):
|
2014-01-22 19:58:48 +00:00
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
|
|
|
for data in self.fetch_many(ids):
|
|
|
|
unpacker.feed(data)
|
|
|
|
items = [decode_dict(item, (b'path', b'source', b'user', b'group')) for item in unpacker]
|
|
|
|
if filter:
|
|
|
|
items = [item for item in items if filter(item)]
|
2014-01-23 21:13:08 +00:00
|
|
|
if preload:
|
|
|
|
for item in items:
|
|
|
|
if b'chunks' in item:
|
|
|
|
self.repository.preload([c[0] for c in item[b'chunks']])
|
2014-01-22 19:58:48 +00:00
|
|
|
for item in items:
|
|
|
|
yield item
|
|
|
|
|
|
|
|
def fetch_many(self, ids, is_preloaded=False):
|
2014-02-16 21:21:18 +00:00
|
|
|
for id_, data in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)):
|
2014-01-22 19:58:48 +00:00
|
|
|
yield self.key.decrypt(id_, data)
|
|
|
|
|
|
|
|
|
|
|
|
class ChunkBuffer:
|
|
|
|
BUFFER_SIZE = 1 * 1024 * 1024
|
|
|
|
|
2016-01-15 19:56:21 +00:00
|
|
|
def __init__(self, key, chunker_params=ITEMS_CHUNKER_PARAMS):
|
2014-07-10 13:44:29 +00:00
|
|
|
self.buffer = BytesIO()
|
2014-01-22 19:58:48 +00:00
|
|
|
self.packer = msgpack.Packer(unicode_errors='surrogateescape')
|
|
|
|
self.chunks = []
|
|
|
|
self.key = key
|
2015-06-20 23:46:41 +00:00
|
|
|
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def add(self, item):
|
2014-02-16 22:36:48 +00:00
|
|
|
self.buffer.write(self.packer.pack(StableDict(item)))
|
2014-02-16 21:21:18 +00:00
|
|
|
if self.is_full():
|
|
|
|
self.flush()
|
|
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
|
raise NotImplementedError
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def flush(self, flush=False):
|
|
|
|
if self.buffer.tell() == 0:
|
|
|
|
return
|
|
|
|
self.buffer.seek(0)
|
2014-08-03 13:04:41 +00:00
|
|
|
chunks = list(bytes(s) for s in self.chunker.chunkify(self.buffer))
|
2014-01-22 19:58:48 +00:00
|
|
|
self.buffer.seek(0)
|
|
|
|
self.buffer.truncate(0)
|
2014-08-30 13:10:41 +00:00
|
|
|
# Leave the last partial chunk in the buffer unless flush is True
|
2014-01-22 19:58:48 +00:00
|
|
|
end = None if flush or len(chunks) == 1 else -1
|
|
|
|
for chunk in chunks[:end]:
|
2014-02-16 21:21:18 +00:00
|
|
|
self.chunks.append(self.write_chunk(chunk))
|
2014-01-22 19:58:48 +00:00
|
|
|
if end == -1:
|
|
|
|
self.buffer.write(chunks[-1])
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def is_full(self):
|
|
|
|
return self.buffer.tell() > self.BUFFER_SIZE
|
2012-11-27 23:03:35 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
class CacheChunkBuffer(ChunkBuffer):
|
|
|
|
|
2016-01-15 19:56:21 +00:00
|
|
|
def __init__(self, cache, key, stats, chunker_params=ITEMS_CHUNKER_PARAMS):
|
2015-07-11 16:31:49 +00:00
|
|
|
super().__init__(key, chunker_params)
|
2014-02-16 21:21:18 +00:00
|
|
|
self.cache = cache
|
|
|
|
self.stats = stats
|
|
|
|
|
|
|
|
def write_chunk(self, chunk):
|
|
|
|
id_, _, _ = self.cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats)
|
|
|
|
return id_
|
|
|
|
|
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
class Archive:
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class DoesNotExist(Error):
|
|
|
|
"""Archive {} does not exist"""
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2013-12-15 19:35:29 +00:00
|
|
|
class AlreadyExists(Error):
|
|
|
|
"""Archive {} already exists"""
|
2011-09-10 15:19:02 +00:00
|
|
|
|
2015-04-21 20:29:10 +00:00
|
|
|
class IncompatibleFilesystemEncodingError(Error):
|
|
|
|
"""Failed to encode filename "{}" into file system encoding "{}". Consider configuring the LANG environment variable."""
|
|
|
|
|
2013-06-20 10:44:58 +00:00
|
|
|
def __init__(self, repository, key, manifest, name, cache=None, create=False,
|
2015-06-19 23:20:46 +00:00
|
|
|
checkpoint_interval=300, numeric_owner=False, progress=False,
|
2015-10-02 19:56:21 +00:00
|
|
|
chunker_params=CHUNKER_PARAMS,
|
2016-02-04 23:18:24 +00:00
|
|
|
start=datetime.utcnow(), end=datetime.utcnow()):
|
2013-06-03 11:45:48 +00:00
|
|
|
self.cwd = os.getcwd()
|
2011-07-30 19:13:48 +00:00
|
|
|
self.key = key
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository = repository
|
2011-07-30 19:13:48 +00:00
|
|
|
self.cache = cache
|
2011-09-04 21:02:47 +00:00
|
|
|
self.manifest = manifest
|
2010-10-20 20:53:58 +00:00
|
|
|
self.hard_links = {}
|
2011-08-07 15:10:21 +00:00
|
|
|
self.stats = Statistics()
|
2015-03-24 03:24:54 +00:00
|
|
|
self.show_progress = progress
|
2011-09-10 15:19:02 +00:00
|
|
|
self.name = name
|
|
|
|
self.checkpoint_interval = checkpoint_interval
|
2012-02-29 22:59:17 +00:00
|
|
|
self.numeric_owner = numeric_owner
|
2015-10-02 19:56:21 +00:00
|
|
|
self.start = start
|
|
|
|
self.end = end
|
2014-01-22 19:58:48 +00:00
|
|
|
self.pipeline = DownloadPipeline(self.repository, self.key)
|
2011-09-10 15:19:02 +00:00
|
|
|
if create:
|
2016-01-15 19:56:21 +00:00
|
|
|
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
|
2015-06-20 23:46:41 +00:00
|
|
|
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
|
2011-09-10 15:19:02 +00:00
|
|
|
if name in manifest.archives:
|
2012-12-09 22:06:33 +00:00
|
|
|
raise self.AlreadyExists(name)
|
2011-09-10 15:19:02 +00:00
|
|
|
self.last_checkpoint = time.time()
|
|
|
|
i = 0
|
|
|
|
while True:
|
|
|
|
self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
|
2015-03-17 22:47:21 +00:00
|
|
|
if self.checkpoint_name not in manifest.archives:
|
2011-09-10 15:19:02 +00:00
|
|
|
break
|
|
|
|
i += 1
|
|
|
|
else:
|
2012-12-10 19:48:39 +00:00
|
|
|
if name not in self.manifest.archives:
|
2012-12-09 22:06:33 +00:00
|
|
|
raise self.DoesNotExist(name)
|
2012-12-10 19:48:39 +00:00
|
|
|
info = self.manifest.archives[name]
|
2013-06-03 11:45:48 +00:00
|
|
|
self.load(info[b'id'])
|
2015-06-20 23:46:41 +00:00
|
|
|
self.zeros = b'\0' * (1 << chunker_params[1])
|
2011-08-15 20:32:26 +00:00
|
|
|
|
2015-03-24 06:11:00 +00:00
|
|
|
def _load_meta(self, id):
|
|
|
|
data = self.key.decrypt(id, self.repository.get(id))
|
|
|
|
metadata = msgpack.unpackb(data)
|
|
|
|
if metadata[b'version'] != 1:
|
|
|
|
raise Exception('Unknown archive metadata version')
|
|
|
|
return metadata
|
|
|
|
|
2010-10-21 19:21:43 +00:00
|
|
|
def load(self, id):
|
|
|
|
self.id = id
|
2015-03-24 06:11:00 +00:00
|
|
|
self.metadata = self._load_meta(self.id)
|
2016-02-05 01:02:04 +00:00
|
|
|
decode_dict(self.metadata, (b'name', b'hostname', b'username', b'time', b'time_end'))
|
2013-06-03 11:45:48 +00:00
|
|
|
self.metadata[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in self.metadata[b'cmdline']]
|
|
|
|
self.name = self.metadata[b'name']
|
2010-10-25 17:51:47 +00:00
|
|
|
|
2011-06-16 19:55:54 +00:00
|
|
|
@property
|
|
|
|
def ts(self):
|
2016-02-05 01:02:04 +00:00
|
|
|
"""Timestamp of archive creation (start) in UTC"""
|
2016-02-07 01:35:31 +00:00
|
|
|
ts = self.metadata[b'time']
|
|
|
|
return parse_timestamp(ts)
|
2011-06-16 19:55:54 +00:00
|
|
|
|
2016-02-05 01:02:04 +00:00
|
|
|
@property
|
|
|
|
def ts_end(self):
|
|
|
|
"""Timestamp of archive creation (end) in UTC"""
|
2016-02-07 01:35:31 +00:00
|
|
|
# fall back to time if there is no time_end present in metadata
|
|
|
|
ts = self.metadata.get(b'time_end') or self.metadata[b'time']
|
|
|
|
return parse_timestamp(ts)
|
2016-02-05 01:02:04 +00:00
|
|
|
|
2015-10-02 19:56:21 +00:00
|
|
|
@property
|
|
|
|
def fpr(self):
|
|
|
|
return hexlify(self.id).decode('ascii')
|
|
|
|
|
|
|
|
@property
|
|
|
|
def duration(self):
|
2016-01-30 20:32:45 +00:00
|
|
|
return format_timedelta(self.end - self.start)
|
2015-10-02 19:56:21 +00:00
|
|
|
|
|
|
|
def __str__(self):
|
2015-10-16 05:54:04 +00:00
|
|
|
return '''Archive name: {0.name}
|
2015-10-02 19:56:21 +00:00
|
|
|
Archive fingerprint: {0.fpr}
|
|
|
|
Start time: {0.start:%c}
|
|
|
|
End time: {0.end:%c}
|
|
|
|
Duration: {0.duration}
|
2015-10-16 05:54:04 +00:00
|
|
|
Number of files: {0.stats.nfiles}'''.format(self)
|
2015-10-02 19:56:21 +00:00
|
|
|
|
2011-08-11 19:18:13 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return 'Archive(%r)' % self.name
|
|
|
|
|
2014-01-23 21:13:08 +00:00
|
|
|
def iter_items(self, filter=None, preload=False):
|
|
|
|
for item in self.pipeline.unpack_many(self.metadata[b'items'], filter=filter, preload=preload):
|
|
|
|
yield item
|
2010-11-29 20:08:37 +00:00
|
|
|
|
2011-08-02 21:20:46 +00:00
|
|
|
def add_item(self, item):
|
2015-11-03 19:21:52 +00:00
|
|
|
unknown_keys = set(item) - ITEM_KEYS
|
|
|
|
assert not unknown_keys, ('unknown item metadata keys detected, please update ITEM_KEYS: %s',
|
|
|
|
','.join(k.decode('ascii') for k in unknown_keys))
|
2015-12-27 10:06:03 +00:00
|
|
|
if self.show_progress:
|
|
|
|
self.stats.show_progress(item=item, dt=0.2)
|
2014-01-22 19:58:48 +00:00
|
|
|
self.items_buffer.add(item)
|
2014-03-05 19:00:27 +00:00
|
|
|
if time.time() - self.last_checkpoint > self.checkpoint_interval:
|
2011-09-10 15:19:02 +00:00
|
|
|
self.write_checkpoint()
|
2014-03-05 19:00:27 +00:00
|
|
|
self.last_checkpoint = time.time()
|
2010-12-04 20:03:02 +00:00
|
|
|
|
2011-09-10 15:19:02 +00:00
|
|
|
def write_checkpoint(self):
|
|
|
|
self.save(self.checkpoint_name)
|
|
|
|
del self.manifest.archives[self.checkpoint_name]
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
2011-09-10 15:19:02 +00:00
|
|
|
|
2015-04-18 19:36:10 +00:00
|
|
|
def save(self, name=None, timestamp=None):
|
2011-09-10 15:19:02 +00:00
|
|
|
name = name or self.name
|
2011-09-04 21:02:47 +00:00
|
|
|
if name in self.manifest.archives:
|
2011-09-10 15:19:02 +00:00
|
|
|
raise self.AlreadyExists(name)
|
2014-01-22 19:58:48 +00:00
|
|
|
self.items_buffer.flush(flush=True)
|
2015-04-18 19:36:10 +00:00
|
|
|
if timestamp is None:
|
2016-02-07 01:35:31 +00:00
|
|
|
self.end = datetime.utcnow()
|
2016-02-05 01:02:04 +00:00
|
|
|
start = self.start
|
|
|
|
end = self.end
|
|
|
|
else:
|
2016-02-07 01:35:31 +00:00
|
|
|
self.end = timestamp
|
2016-02-05 01:02:04 +00:00
|
|
|
start = timestamp
|
|
|
|
end = timestamp # we only have 1 value
|
2014-02-18 22:09:12 +00:00
|
|
|
metadata = StableDict({
|
2010-10-20 19:08:46 +00:00
|
|
|
'version': 1,
|
2010-10-20 17:59:15 +00:00
|
|
|
'name': name,
|
2014-01-22 19:58:48 +00:00
|
|
|
'items': self.items_buffer.chunks,
|
2010-10-21 19:21:43 +00:00
|
|
|
'cmdline': sys.argv,
|
2010-10-24 18:18:18 +00:00
|
|
|
'hostname': socket.gethostname(),
|
2010-10-24 20:07:54 +00:00
|
|
|
'username': getuser(),
|
2016-02-05 01:02:04 +00:00
|
|
|
'time': start.isoformat(),
|
|
|
|
'time_end': end.isoformat(),
|
2014-02-18 22:09:12 +00:00
|
|
|
})
|
2013-06-03 11:45:48 +00:00
|
|
|
data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
2011-08-15 20:32:26 +00:00
|
|
|
self.id = self.key.id_hash(data)
|
2011-09-10 15:19:02 +00:00
|
|
|
self.cache.add_chunk(self.id, data, self.stats)
|
2011-09-04 21:02:47 +00:00
|
|
|
self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
|
|
|
|
self.manifest.write()
|
2013-06-20 10:44:58 +00:00
|
|
|
self.repository.commit()
|
2011-09-10 15:19:02 +00:00
|
|
|
self.cache.commit()
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2015-04-21 18:50:19 +00:00
|
|
|
def calc_stats(self, cache):
|
2012-11-28 20:17:00 +00:00
|
|
|
def add(id):
|
2015-04-19 21:45:05 +00:00
|
|
|
count, size, csize = cache.chunks[id]
|
2012-11-28 20:17:00 +00:00
|
|
|
stats.update(size, csize, count == 1)
|
2015-04-19 22:07:26 +00:00
|
|
|
cache.chunks[id] = count - 1, size, csize
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2014-01-22 19:58:48 +00:00
|
|
|
def add_file_chunks(chunks):
|
|
|
|
for id, _, _ in chunks:
|
|
|
|
add(id)
|
2015-07-11 16:31:49 +00:00
|
|
|
|
2011-07-30 19:13:48 +00:00
|
|
|
# This function is a bit evil since it abuses the cache to calculate
|
2015-04-21 18:50:19 +00:00
|
|
|
# the stats. The cache transaction must be rolled back afterwards
|
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
2012-10-17 09:40:23 +00:00
|
|
|
cache.begin_txn()
|
|
|
|
stats = Statistics()
|
2012-11-28 20:17:00 +00:00
|
|
|
add(self.id)
|
2014-02-16 21:21:18 +00:00
|
|
|
for id, chunk in zip(self.metadata[b'items'], self.repository.get_many(self.metadata[b'items'])):
|
2012-11-29 19:38:03 +00:00
|
|
|
add(id)
|
2012-11-28 20:17:00 +00:00
|
|
|
unpacker.feed(self.key.decrypt(id, chunk))
|
2011-07-30 19:13:48 +00:00
|
|
|
for item in unpacker:
|
2014-01-22 19:58:48 +00:00
|
|
|
if b'chunks' in item:
|
2012-11-28 20:17:00 +00:00
|
|
|
stats.nfiles += 1
|
2014-01-22 19:58:48 +00:00
|
|
|
add_file_chunks(item[b'chunks'])
|
2011-07-30 19:13:48 +00:00
|
|
|
cache.rollback()
|
2011-07-30 20:50:59 +00:00
|
|
|
return stats
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2015-04-17 23:16:26 +00:00
|
|
|
def extract_item(self, item, restore_attrs=True, dry_run=False, stdout=False, sparse=False):
|
2015-03-01 04:07:29 +00:00
|
|
|
if dry_run or stdout:
|
2014-02-18 20:33:06 +00:00
|
|
|
if b'chunks' in item:
|
2015-03-01 04:07:29 +00:00
|
|
|
for data in self.pipeline.fetch_many([c[0] for c in item[b'chunks']], is_preloaded=True):
|
|
|
|
if stdout:
|
|
|
|
sys.stdout.buffer.write(data)
|
|
|
|
if stdout:
|
|
|
|
sys.stdout.buffer.flush()
|
2014-02-18 20:33:06 +00:00
|
|
|
return
|
|
|
|
|
2013-06-30 20:32:27 +00:00
|
|
|
dest = self.cwd
|
2013-08-03 11:34:14 +00:00
|
|
|
if item[b'path'].startswith('/') or item[b'path'].startswith('..'):
|
|
|
|
raise Exception('Path should be relative and local')
|
2013-06-03 11:45:48 +00:00
|
|
|
path = os.path.join(dest, item[b'path'])
|
2012-12-06 22:04:01 +00:00
|
|
|
# Attempt to remove existing files, ignore errors on failure
|
|
|
|
try:
|
|
|
|
st = os.lstat(path)
|
|
|
|
if stat.S_ISDIR(st.st_mode):
|
|
|
|
os.rmdir(path)
|
|
|
|
else:
|
|
|
|
os.unlink(path)
|
2015-04-21 20:29:10 +00:00
|
|
|
except UnicodeEncodeError:
|
2015-12-14 23:17:03 +00:00
|
|
|
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
|
2012-12-06 22:04:01 +00:00
|
|
|
except OSError:
|
|
|
|
pass
|
2013-06-03 11:45:48 +00:00
|
|
|
mode = item[b'mode']
|
2015-05-31 19:53:37 +00:00
|
|
|
if stat.S_ISREG(mode):
|
2010-10-30 11:44:25 +00:00
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
|
|
|
# Hard link?
|
2013-06-03 11:45:48 +00:00
|
|
|
if b'source' in item:
|
|
|
|
source = os.path.join(dest, item[b'source'])
|
2012-10-17 09:40:23 +00:00
|
|
|
if os.path.exists(path):
|
|
|
|
os.unlink(path)
|
|
|
|
os.link(source, path)
|
2010-10-30 11:44:25 +00:00
|
|
|
else:
|
2013-06-03 11:45:48 +00:00
|
|
|
with open(path, 'wb') as fd:
|
2014-01-22 19:58:48 +00:00
|
|
|
ids = [c[0] for c in item[b'chunks']]
|
|
|
|
for data in self.pipeline.fetch_many(ids, is_preloaded=True):
|
2015-06-20 23:46:41 +00:00
|
|
|
if sparse and self.zeros.startswith(data):
|
simple sparse file support, made chunk buffer size flexible
Implemented sparse file support to remove this blocker for people backing up lots of
huge sparse files (like VM images). Attic could not support this use case yet as it would
have restored all files to their fully expanded size, possibly running out of disk space if
the total expanded size would be bigger than the available space.
Please note that this is a very simple implementation of sparse file support - at backup time,
it does not do anything special (it just reads all these zero bytes, chunks, compresses and
encrypts them as usual). At restore time, it detects chunks that are completely filled with zeros
and does a seek on the output file rather than a normal data write, so it creates a hole in
a sparse file. The chunk size for these all-zero chunks is currently 10MiB, so it'll create holes
of multiples of that size (depends also a bit on fs block size, alignment, previously written data).
Special cases like sparse files starting and/or ending with a hole are supported.
Please note that it will currently always create sparse files at restore time if it detects all-zero
chunks.
Also improved:
I needed a constant for the max. chunk size, so I introduced CHUNK_MAX (see also
existing CHUNK_MIN) for the maximum chunk size (which is the same as the chunk
buffer size).
Attic still always uses 10MiB chunk buffer size now, but it could be changed now more easily.
2015-04-15 14:29:18 +00:00
|
|
|
# all-zero chunk: create a hole in a sparse file
|
|
|
|
fd.seek(len(data), 1)
|
|
|
|
else:
|
|
|
|
fd.write(data)
|
|
|
|
pos = fd.tell()
|
|
|
|
fd.truncate(pos)
|
2013-07-19 11:13:54 +00:00
|
|
|
fd.flush()
|
2013-06-03 11:45:48 +00:00
|
|
|
self.restore_attrs(path, item, fd=fd.fileno())
|
2015-05-31 19:53:37 +00:00
|
|
|
elif stat.S_ISDIR(mode):
|
|
|
|
if not os.path.exists(path):
|
|
|
|
os.makedirs(path)
|
|
|
|
if restore_attrs:
|
|
|
|
self.restore_attrs(path, item)
|
2012-03-01 21:35:43 +00:00
|
|
|
elif stat.S_ISLNK(mode):
|
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
2013-06-03 11:45:48 +00:00
|
|
|
source = item[b'source']
|
2012-03-01 21:35:43 +00:00
|
|
|
if os.path.exists(path):
|
|
|
|
os.unlink(path)
|
2015-11-08 00:05:55 +00:00
|
|
|
try:
|
|
|
|
os.symlink(source, path)
|
|
|
|
except UnicodeEncodeError:
|
2015-12-14 23:17:03 +00:00
|
|
|
raise self.IncompatibleFilesystemEncodingError(source, sys.getfilesystemencoding()) from None
|
2012-03-01 21:35:43 +00:00
|
|
|
self.restore_attrs(path, item, symlink=True)
|
2015-05-31 19:53:37 +00:00
|
|
|
elif stat.S_ISFIFO(mode):
|
|
|
|
if not os.path.exists(os.path.dirname(path)):
|
|
|
|
os.makedirs(os.path.dirname(path))
|
|
|
|
os.mkfifo(path)
|
|
|
|
self.restore_attrs(path, item)
|
2012-03-01 21:35:43 +00:00
|
|
|
elif stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
|
2013-06-03 11:45:48 +00:00
|
|
|
os.mknod(path, item[b'mode'], item[b'rdev'])
|
2012-03-01 21:35:43 +00:00
|
|
|
self.restore_attrs(path, item)
|
2010-10-30 11:44:25 +00:00
|
|
|
else:
|
2013-06-03 11:45:48 +00:00
|
|
|
raise Exception('Unknown archive item type %r' % item[b'mode'])
|
2010-10-20 20:53:58 +00:00
|
|
|
|
2013-06-03 11:45:48 +00:00
|
|
|
def restore_attrs(self, path, item, symlink=False, fd=None):
|
2015-08-28 21:22:26 +00:00
|
|
|
xattrs = item.get(b'xattrs', {})
|
|
|
|
for k, v in xattrs.items():
|
|
|
|
try:
|
|
|
|
xattr.setxattr(fd or path, k, v, follow_symlinks=False)
|
|
|
|
except OSError as e:
|
2015-08-28 22:11:04 +00:00
|
|
|
if e.errno not in (errno.ENOTSUP, errno.EACCES, ):
|
|
|
|
# only raise if the errno is not on our ignore list:
|
|
|
|
# ENOTSUP == xattrs not supported here
|
|
|
|
# EACCES == permission denied to set this specific xattr
|
|
|
|
# (this may happen related to security.* keys)
|
2015-08-28 21:22:26 +00:00
|
|
|
raise
|
2012-02-29 22:59:17 +00:00
|
|
|
uid = gid = None
|
|
|
|
if not self.numeric_owner:
|
2013-06-03 11:45:48 +00:00
|
|
|
uid = user2uid(item[b'user'])
|
|
|
|
gid = group2gid(item[b'group'])
|
2014-04-13 18:26:46 +00:00
|
|
|
uid = item[b'uid'] if uid is None else uid
|
|
|
|
gid = item[b'gid'] if gid is None else gid
|
2013-06-03 11:45:48 +00:00
|
|
|
# This code is a bit of a mess due to os specific differences
|
2010-10-20 20:53:58 +00:00
|
|
|
try:
|
2013-06-03 11:45:48 +00:00
|
|
|
if fd:
|
|
|
|
os.fchown(fd, uid, gid)
|
|
|
|
else:
|
|
|
|
os.lchown(path, uid, gid)
|
2010-10-20 20:53:58 +00:00
|
|
|
except OSError:
|
|
|
|
pass
|
2013-06-03 11:45:48 +00:00
|
|
|
if fd:
|
|
|
|
os.fchmod(fd, item[b'mode'])
|
|
|
|
elif not symlink:
|
|
|
|
os.chmod(path, item[b'mode'])
|
|
|
|
elif has_lchmod: # Not available on Linux
|
|
|
|
os.lchmod(path, item[b'mode'])
|
2014-05-18 16:28:26 +00:00
|
|
|
mtime = bigint_to_int(item[b'mtime'])
|
2015-10-26 01:07:55 +00:00
|
|
|
if b'atime' in item:
|
|
|
|
atime = bigint_to_int(item[b'atime'])
|
|
|
|
else:
|
|
|
|
# old archives only had mtime in item metadata
|
|
|
|
atime = mtime
|
2015-12-14 21:53:31 +00:00
|
|
|
if fd:
|
2015-10-26 01:07:55 +00:00
|
|
|
os.utime(fd, None, ns=(atime, mtime))
|
2015-12-14 21:53:31 +00:00
|
|
|
else:
|
2015-10-26 01:07:55 +00:00
|
|
|
os.utime(path, None, ns=(atime, mtime), follow_symlinks=False)
|
2014-04-13 18:26:46 +00:00
|
|
|
acl_set(path, item, self.numeric_owner)
|
2014-04-08 19:52:26 +00:00
|
|
|
# Only available on OS X and FreeBSD
|
|
|
|
if has_lchflags and b'bsdflags' in item:
|
|
|
|
try:
|
|
|
|
os.lchflags(path, item[b'bsdflags'])
|
|
|
|
except OSError:
|
|
|
|
pass
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2015-03-24 06:11:00 +00:00
|
|
|
def rename(self, name):
|
|
|
|
if name in self.manifest.archives:
|
|
|
|
raise self.AlreadyExists(name)
|
|
|
|
metadata = StableDict(self._load_meta(self.id))
|
|
|
|
metadata[b'name'] = name
|
|
|
|
data = msgpack.packb(metadata, unicode_errors='surrogateescape')
|
|
|
|
new_id = self.key.id_hash(data)
|
|
|
|
self.cache.add_chunk(new_id, data, self.stats)
|
|
|
|
self.manifest.archives[name] = {'id': new_id, 'time': metadata[b'time']}
|
|
|
|
self.cache.chunk_decref(self.id, self.stats)
|
|
|
|
del self.manifest.archives[self.name]
|
|
|
|
|
2016-01-16 19:46:49 +00:00
|
|
|
def delete(self, stats, progress=False):
|
2013-05-17 12:30:39 +00:00
|
|
|
unpacker = msgpack.Unpacker(use_list=False)
|
2016-01-16 19:46:49 +00:00
|
|
|
items_ids = self.metadata[b'items']
|
|
|
|
pi = ProgressIndicatorPercent(total=len(items_ids), msg="Decrementing references %3.0f%%", same_line=True)
|
|
|
|
for (i, (items_id, data)) in enumerate(zip(items_ids, self.repository.get_many(items_ids))):
|
|
|
|
if progress:
|
|
|
|
pi.show(i)
|
2014-03-19 21:32:07 +00:00
|
|
|
unpacker.feed(self.key.decrypt(items_id, data))
|
|
|
|
self.cache.chunk_decref(items_id, stats)
|
2011-07-30 20:50:59 +00:00
|
|
|
for item in unpacker:
|
2014-01-22 19:58:48 +00:00
|
|
|
if b'chunks' in item:
|
2013-06-03 11:45:48 +00:00
|
|
|
for chunk_id, size, csize in item[b'chunks']:
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(chunk_id, stats)
|
2016-01-16 19:46:49 +00:00
|
|
|
if progress:
|
|
|
|
pi.finish()
|
2014-03-19 21:32:07 +00:00
|
|
|
self.cache.chunk_decref(self.id, stats)
|
2011-09-04 21:02:47 +00:00
|
|
|
del self.manifest.archives[self.name]
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2010-10-31 19:31:56 +00:00
|
|
|
def stat_attrs(self, st, path):
|
2010-10-31 20:55:09 +00:00
|
|
|
item = {
|
2013-06-03 11:45:48 +00:00
|
|
|
b'mode': st.st_mode,
|
|
|
|
b'uid': st.st_uid, b'user': uid2user(st.st_uid),
|
|
|
|
b'gid': st.st_gid, b'group': gid2group(st.st_gid),
|
2015-12-14 21:39:43 +00:00
|
|
|
b'atime': int_to_bigint(st.st_atime_ns),
|
|
|
|
b'ctime': int_to_bigint(st.st_ctime_ns),
|
|
|
|
b'mtime': int_to_bigint(st.st_mtime_ns),
|
2010-10-30 11:44:25 +00:00
|
|
|
}
|
2012-02-29 22:59:17 +00:00
|
|
|
if self.numeric_owner:
|
2013-06-03 11:45:48 +00:00
|
|
|
item[b'user'] = item[b'group'] = None
|
2013-07-29 19:09:31 +00:00
|
|
|
xattrs = xattr.get_all(path, follow_symlinks=False)
|
|
|
|
if xattrs:
|
2014-02-16 22:36:48 +00:00
|
|
|
item[b'xattrs'] = StableDict(xattrs)
|
2014-04-08 19:52:26 +00:00
|
|
|
if has_lchflags and st.st_flags:
|
|
|
|
item[b'bsdflags'] = st.st_flags
|
2014-08-01 13:50:18 +00:00
|
|
|
acl_get(path, item, st, self.numeric_owner)
|
2010-10-31 20:55:09 +00:00
|
|
|
return item
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2015-03-22 14:52:43 +00:00
|
|
|
def process_dir(self, path, st):
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path)}
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2010-11-29 20:08:37 +00:00
|
|
|
self.add_item(item)
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'd' # directory
|
|
|
|
|
|
|
|
def process_fifo(self, path, st):
|
|
|
|
item = {b'path': make_path_safe(path)}
|
|
|
|
item.update(self.stat_attrs(st, path))
|
|
|
|
self.add_item(item)
|
|
|
|
return 'f' # fifo
|
2010-10-31 19:12:32 +00:00
|
|
|
|
2012-03-03 13:02:22 +00:00
|
|
|
def process_dev(self, path, st):
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path), b'rdev': st.st_rdev}
|
2012-03-03 13:02:22 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
if stat.S_ISCHR(st.st_mode):
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'c' # char device
|
2015-03-08 18:18:21 +00:00
|
|
|
elif stat.S_ISBLK(st.st_mode):
|
2015-03-22 14:52:43 +00:00
|
|
|
return 'b' # block device
|
2012-03-03 13:02:22 +00:00
|
|
|
|
2010-10-20 20:53:58 +00:00
|
|
|
def process_symlink(self, path, st):
|
2010-10-20 17:59:15 +00:00
|
|
|
source = os.readlink(path)
|
2013-08-03 11:34:14 +00:00
|
|
|
item = {b'path': make_path_safe(path), b'source': source}
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2010-11-29 20:08:37 +00:00
|
|
|
self.add_item(item)
|
2015-03-22 14:52:43 +00:00
|
|
|
return 's' # symlink
|
2010-10-30 11:44:25 +00:00
|
|
|
|
2015-03-01 03:29:44 +00:00
|
|
|
def process_stdin(self, path, cache):
|
|
|
|
uid, gid = 0, 0
|
|
|
|
fd = sys.stdin.buffer # binary
|
|
|
|
chunks = []
|
|
|
|
for chunk in self.chunker.chunkify(fd):
|
|
|
|
chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
|
|
|
|
self.stats.nfiles += 1
|
2015-11-03 18:52:49 +00:00
|
|
|
t = int_to_bigint(int(time.time()) * 1000000000)
|
2015-03-01 03:29:44 +00:00
|
|
|
item = {
|
|
|
|
b'path': path,
|
|
|
|
b'chunks': chunks,
|
|
|
|
b'mode': 0o100660, # regular file, ug=rw
|
|
|
|
b'uid': uid, b'user': uid2user(uid),
|
|
|
|
b'gid': gid, b'group': gid2group(gid),
|
2015-11-03 18:52:49 +00:00
|
|
|
b'mtime': t, b'atime': t, b'ctime': t,
|
2015-03-01 03:29:44 +00:00
|
|
|
}
|
|
|
|
self.add_item(item)
|
2015-09-08 01:12:45 +00:00
|
|
|
return 'i' # stdin
|
2015-03-01 03:29:44 +00:00
|
|
|
|
2010-10-21 19:21:43 +00:00
|
|
|
def process_file(self, path, st, cache):
|
2015-03-08 18:18:21 +00:00
|
|
|
status = None
|
2013-08-03 11:34:14 +00:00
|
|
|
safe_path = make_path_safe(path)
|
2010-10-25 20:31:18 +00:00
|
|
|
# Is it a hard link?
|
2010-10-20 20:53:58 +00:00
|
|
|
if st.st_nlink > 1:
|
|
|
|
source = self.hard_links.get((st.st_ino, st.st_dev))
|
|
|
|
if (st.st_ino, st.st_dev) in self.hard_links:
|
2011-06-19 18:34:46 +00:00
|
|
|
item = self.stat_attrs(st, path)
|
2013-06-03 11:45:48 +00:00
|
|
|
item.update({b'path': safe_path, b'source': source})
|
2011-06-19 18:34:46 +00:00
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
status = 'h' # regular file, hardlink (to already seen inodes)
|
|
|
|
return status
|
2010-10-20 20:53:58 +00:00
|
|
|
else:
|
|
|
|
self.hard_links[st.st_ino, st.st_dev] = safe_path
|
2013-06-22 11:35:16 +00:00
|
|
|
path_hash = self.key.id_hash(os.path.join(self.cwd, path).encode('utf-8', 'surrogateescape'))
|
2015-10-16 01:16:31 +00:00
|
|
|
first_run = not cache.files
|
2011-07-30 19:13:48 +00:00
|
|
|
ids = cache.file_known_and_unchanged(path_hash, st)
|
2015-10-16 01:16:31 +00:00
|
|
|
if first_run:
|
|
|
|
logger.info('processing files')
|
2011-07-30 19:13:48 +00:00
|
|
|
chunks = None
|
2010-10-25 20:31:18 +00:00
|
|
|
if ids is not None:
|
|
|
|
# Make sure all ids are available
|
2014-01-22 19:58:48 +00:00
|
|
|
for id_ in ids:
|
|
|
|
if not cache.seen_chunk(id_):
|
2010-10-25 20:31:18 +00:00
|
|
|
break
|
|
|
|
else:
|
2014-01-22 19:58:48 +00:00
|
|
|
chunks = [cache.chunk_incref(id_, self.stats) for id_ in ids]
|
2015-03-08 18:18:21 +00:00
|
|
|
status = 'U' # regular file, unchanged
|
|
|
|
else:
|
|
|
|
status = 'A' # regular file, added
|
2015-12-27 10:06:03 +00:00
|
|
|
item = {b'path': safe_path}
|
2010-10-25 20:31:18 +00:00
|
|
|
# Only chunkify the file if needed
|
2011-07-30 19:13:48 +00:00
|
|
|
if chunks is None:
|
2015-04-08 16:43:53 +00:00
|
|
|
fh = Archive._open_rb(path, st)
|
|
|
|
with os.fdopen(fh, 'rb') as fd:
|
2011-07-30 19:13:48 +00:00
|
|
|
chunks = []
|
2015-04-08 16:43:53 +00:00
|
|
|
for chunk in self.chunker.chunkify(fd, fh):
|
2011-08-07 15:10:21 +00:00
|
|
|
chunks.append(cache.add_chunk(self.key.id_hash(chunk), chunk, self.stats))
|
2015-12-27 10:06:03 +00:00
|
|
|
if self.show_progress:
|
|
|
|
self.stats.show_progress(item=item, dt=0.2)
|
2014-01-22 19:58:48 +00:00
|
|
|
cache.memorize_file(path_hash, st, [c[0] for c in chunks])
|
2015-03-08 18:18:21 +00:00
|
|
|
status = status or 'M' # regular file, modified (if not 'A' already)
|
2015-12-27 10:06:03 +00:00
|
|
|
item[b'chunks'] = chunks
|
2010-10-31 19:31:56 +00:00
|
|
|
item.update(self.stat_attrs(st, path))
|
2011-08-07 18:00:18 +00:00
|
|
|
self.stats.nfiles += 1
|
2011-08-02 21:20:46 +00:00
|
|
|
self.add_item(item)
|
2015-03-08 18:18:21 +00:00
|
|
|
return status
|
2010-10-20 17:59:15 +00:00
|
|
|
|
2010-10-21 19:21:43 +00:00
|
|
|
@staticmethod
|
2013-06-20 10:44:58 +00:00
|
|
|
def list_archives(repository, key, manifest, cache=None):
|
2015-05-26 00:04:41 +00:00
|
|
|
# expensive! see also Manifest.list_archive_infos.
|
2011-09-04 21:02:47 +00:00
|
|
|
for name, info in manifest.archives.items():
|
2013-06-20 10:44:58 +00:00
|
|
|
yield Archive(repository, key, manifest, name, cache=cache)
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2014-08-30 13:10:41 +00:00
|
|
|
@staticmethod
|
|
|
|
def _open_rb(path, st):
|
2015-03-24 21:08:06 +00:00
|
|
|
flags_normal = os.O_RDONLY | getattr(os, 'O_BINARY', 0)
|
2015-10-26 00:43:58 +00:00
|
|
|
flags_noatime = flags_normal | getattr(os, 'O_NOATIME', 0)
|
2014-08-30 13:10:41 +00:00
|
|
|
euid = None
|
|
|
|
|
|
|
|
def open_simple(p, s):
|
2015-04-08 16:43:53 +00:00
|
|
|
return os.open(p, flags_normal)
|
2015-03-24 21:08:06 +00:00
|
|
|
|
|
|
|
def open_noatime(p, s):
|
2015-04-08 16:43:53 +00:00
|
|
|
return os.open(p, flags_noatime)
|
2014-08-30 13:10:41 +00:00
|
|
|
|
|
|
|
def open_noatime_if_owner(p, s):
|
2015-03-20 00:40:51 +00:00
|
|
|
if euid == 0 or s.st_uid == euid:
|
|
|
|
# we are root or owner of file
|
2015-03-24 21:08:06 +00:00
|
|
|
return open_noatime(p, s)
|
2014-08-30 13:10:41 +00:00
|
|
|
else:
|
2015-03-24 21:08:06 +00:00
|
|
|
return open_simple(p, s)
|
2014-08-30 13:10:41 +00:00
|
|
|
|
2015-03-24 21:08:06 +00:00
|
|
|
def open_noatime_with_fallback(p, s):
|
2014-08-30 13:10:41 +00:00
|
|
|
try:
|
|
|
|
fd = os.open(p, flags_noatime)
|
|
|
|
except PermissionError:
|
|
|
|
# Was this EPERM due to the O_NOATIME flag?
|
2015-03-24 21:08:06 +00:00
|
|
|
fd = os.open(p, flags_normal)
|
2014-08-30 13:10:41 +00:00
|
|
|
# Yes, it was -- otherwise the above line would have thrown
|
|
|
|
# another exception.
|
2015-03-20 00:40:51 +00:00
|
|
|
nonlocal euid
|
2014-08-30 13:10:41 +00:00
|
|
|
euid = os.geteuid()
|
|
|
|
# So in future, let's check whether the file is owned by us
|
|
|
|
# before attempting to use O_NOATIME.
|
|
|
|
Archive._open_rb = open_noatime_if_owner
|
2015-04-08 16:43:53 +00:00
|
|
|
return fd
|
2014-08-30 13:10:41 +00:00
|
|
|
|
2015-03-24 21:08:06 +00:00
|
|
|
if flags_noatime != flags_normal:
|
2014-08-30 13:10:41 +00:00
|
|
|
# Always use O_NOATIME version.
|
2015-03-24 21:08:06 +00:00
|
|
|
Archive._open_rb = open_noatime_with_fallback
|
2014-08-30 13:10:41 +00:00
|
|
|
else:
|
|
|
|
# Always use non-O_NOATIME version.
|
|
|
|
Archive._open_rb = open_simple
|
|
|
|
return Archive._open_rb(path, st)
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2015-11-03 19:21:52 +00:00
|
|
|
# this set must be kept complete, otherwise the RobustUnpacker might malfunction:
|
|
|
|
ITEM_KEYS = set([b'path', b'source', b'rdev', b'chunks',
|
|
|
|
b'mode', b'user', b'group', b'uid', b'gid', b'mtime', b'atime', b'ctime',
|
2016-01-30 20:32:45 +00:00
|
|
|
b'xattrs', b'bsdflags', b'acl_nfs4', b'acl_access', b'acl_default', b'acl_extended', ])
|
2015-11-03 19:21:52 +00:00
|
|
|
|
|
|
|
|
2015-07-15 09:30:25 +00:00
|
|
|
class RobustUnpacker:
|
2014-02-24 21:43:17 +00:00
|
|
|
"""A restartable/robust version of the streaming msgpack unpacker
|
|
|
|
"""
|
|
|
|
def __init__(self, validator):
|
2015-07-11 16:31:49 +00:00
|
|
|
super().__init__()
|
2015-11-08 13:24:58 +00:00
|
|
|
self.item_keys = [msgpack.packb(name) for name in ITEM_KEYS]
|
2014-02-24 21:43:17 +00:00
|
|
|
self.validator = validator
|
|
|
|
self._buffered_data = []
|
|
|
|
self._resync = False
|
|
|
|
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
|
|
|
|
|
|
|
|
def resync(self):
|
|
|
|
self._buffered_data = []
|
|
|
|
self._resync = True
|
|
|
|
|
|
|
|
def feed(self, data):
|
|
|
|
if self._resync:
|
|
|
|
self._buffered_data.append(data)
|
|
|
|
else:
|
|
|
|
self._unpacker.feed(data)
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
return self
|
|
|
|
|
|
|
|
def __next__(self):
|
|
|
|
if self._resync:
|
2014-02-24 22:37:21 +00:00
|
|
|
data = b''.join(self._buffered_data)
|
2014-02-24 21:43:17 +00:00
|
|
|
while self._resync:
|
|
|
|
if not data:
|
|
|
|
raise StopIteration
|
2014-03-06 19:05:13 +00:00
|
|
|
# Abort early if the data does not look like a serialized dict
|
|
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
|
|
data = data[1:]
|
|
|
|
continue
|
|
|
|
# Make sure it looks like an item dict
|
|
|
|
for pattern in self.item_keys:
|
|
|
|
if data[1:].startswith(pattern):
|
|
|
|
break
|
|
|
|
else:
|
2014-03-04 20:15:52 +00:00
|
|
|
data = data[1:]
|
|
|
|
continue
|
2014-03-06 19:05:13 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
self._unpacker = msgpack.Unpacker(object_hook=StableDict)
|
|
|
|
self._unpacker.feed(data)
|
|
|
|
try:
|
|
|
|
item = next(self._unpacker)
|
2014-02-24 21:43:17 +00:00
|
|
|
if self.validator(item):
|
|
|
|
self._resync = False
|
2014-02-24 22:37:21 +00:00
|
|
|
return item
|
|
|
|
# Ignore exceptions that might be raised when feeding
|
|
|
|
# msgpack with invalid data
|
|
|
|
except (TypeError, ValueError, StopIteration):
|
|
|
|
pass
|
|
|
|
data = data[1:]
|
2014-02-24 21:43:17 +00:00
|
|
|
else:
|
2014-02-24 22:37:21 +00:00
|
|
|
return next(self._unpacker)
|
2014-02-24 21:43:17 +00:00
|
|
|
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
class ArchiveChecker:
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
self.error_found = False
|
|
|
|
self.possibly_superseded = set()
|
|
|
|
|
2015-12-12 23:39:15 +00:00
|
|
|
def check(self, repository, repair=False, archive=None, last=None, prefix=None, save_space=False):
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Starting archive consistency check...')
|
2015-12-12 23:39:15 +00:00
|
|
|
self.check_all = archive is None and last is None and prefix is None
|
2014-02-24 22:37:21 +00:00
|
|
|
self.repair = repair
|
|
|
|
self.repository = repository
|
|
|
|
self.init_chunks()
|
|
|
|
self.key = self.identify_key(repository)
|
2015-03-17 22:47:21 +00:00
|
|
|
if Manifest.MANIFEST_ID not in self.chunks:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.error("Repository manifest not found!")
|
|
|
|
self.error_found = True
|
2014-02-24 22:37:21 +00:00
|
|
|
self.manifest = self.rebuild_manifest()
|
|
|
|
else:
|
|
|
|
self.manifest, _ = Manifest.load(repository, key=self.key)
|
2015-12-12 23:39:15 +00:00
|
|
|
self.rebuild_refcounts(archive=archive, last=last, prefix=prefix)
|
2015-08-09 10:43:57 +00:00
|
|
|
self.orphan_chunks_check()
|
2015-11-18 01:27:25 +00:00
|
|
|
self.finish(save_space=save_space)
|
2015-12-07 23:21:46 +00:00
|
|
|
if self.error_found:
|
|
|
|
logger.error('Archive consistency check complete, problems found.')
|
|
|
|
else:
|
2015-10-02 14:58:08 +00:00
|
|
|
logger.info('Archive consistency check complete, no problems found.')
|
2014-02-24 22:37:21 +00:00
|
|
|
return self.repair or not self.error_found
|
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
def init_chunks(self):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Fetch a list of all object keys from repository
|
|
|
|
"""
|
2015-08-08 22:36:17 +00:00
|
|
|
# Explicitly set the initial hash table capacity to avoid performance issues
|
2014-02-17 17:25:25 +00:00
|
|
|
# due to hash table "resonance"
|
|
|
|
capacity = int(len(self.repository) * 1.2)
|
2014-07-10 13:32:12 +00:00
|
|
|
self.chunks = ChunkIndex(capacity)
|
2014-02-16 21:21:18 +00:00
|
|
|
marker = None
|
|
|
|
while True:
|
|
|
|
result = self.repository.list(limit=10000, marker=marker)
|
|
|
|
if not result:
|
|
|
|
break
|
|
|
|
marker = result[-1]
|
|
|
|
for id_ in result:
|
|
|
|
self.chunks[id_] = (0, 0, 0)
|
|
|
|
|
|
|
|
def identify_key(self, repository):
|
|
|
|
cdata = repository.get(next(self.chunks.iteritems())[0])
|
|
|
|
return key_factory(repository, cdata)
|
|
|
|
|
|
|
|
def rebuild_manifest(self):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Rebuild the manifest object if it is missing
|
|
|
|
|
|
|
|
Iterates through all objects in the repository looking for archive metadata blocks.
|
|
|
|
"""
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Rebuilding missing manifest, this might take some time...')
|
2014-02-16 21:21:18 +00:00
|
|
|
manifest = Manifest(self.key, self.repository)
|
|
|
|
for chunk_id, _ in self.chunks.iteritems():
|
|
|
|
cdata = self.repository.get(chunk_id)
|
|
|
|
data = self.key.decrypt(chunk_id, cdata)
|
2014-03-01 14:00:21 +00:00
|
|
|
# Some basic sanity checks of the payload before feeding it into msgpack
|
|
|
|
if len(data) < 2 or ((data[0] & 0xf0) != 0x80) or ((data[1] & 0xe0) != 0xa0):
|
|
|
|
continue
|
2015-03-17 22:47:21 +00:00
|
|
|
if b'cmdline' not in data or b'\xa7version\x01' not in data:
|
2014-03-01 14:00:21 +00:00
|
|
|
continue
|
2014-02-16 21:21:18 +00:00
|
|
|
try:
|
|
|
|
archive = msgpack.unpackb(data)
|
2015-03-20 01:31:39 +00:00
|
|
|
# Ignore exceptions that might be raised when feeding
|
|
|
|
# msgpack with invalid data
|
|
|
|
except (TypeError, ValueError, StopIteration):
|
2014-02-16 21:21:18 +00:00
|
|
|
continue
|
|
|
|
if isinstance(archive, dict) and b'items' in archive and b'cmdline' in archive:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Found archive %s', archive[b'name'].decode('utf-8'))
|
2014-02-16 21:21:18 +00:00
|
|
|
manifest.archives[archive[b'name'].decode('utf-8')] = {b'id': chunk_id, b'time': archive[b'time']}
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.info('Manifest rebuild complete.')
|
2014-02-16 21:21:18 +00:00
|
|
|
return manifest
|
|
|
|
|
2015-12-12 23:39:15 +00:00
|
|
|
def rebuild_refcounts(self, archive=None, last=None, prefix=None):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Rebuild object reference counts by walking the metadata
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
Missing and/or incorrect data is repaired when detected
|
|
|
|
"""
|
2014-02-16 21:21:18 +00:00
|
|
|
# Exclude the manifest from chunks
|
|
|
|
del self.chunks[Manifest.MANIFEST_ID]
|
2014-02-18 20:16:36 +00:00
|
|
|
|
2014-02-24 22:37:21 +00:00
|
|
|
def mark_as_possibly_superseded(id_):
|
2014-02-16 21:21:18 +00:00
|
|
|
if self.chunks.get(id_, (0,))[0] == 0:
|
|
|
|
self.possibly_superseded.add(id_)
|
|
|
|
|
|
|
|
def add_callback(chunk):
|
|
|
|
id_ = self.key.id_hash(chunk)
|
|
|
|
cdata = self.key.encrypt(chunk)
|
|
|
|
add_reference(id_, len(chunk), len(cdata), cdata)
|
|
|
|
return id_
|
|
|
|
|
|
|
|
def add_reference(id_, size, csize, cdata=None):
|
|
|
|
try:
|
|
|
|
count, _, _ = self.chunks[id_]
|
|
|
|
self.chunks[id_] = count + 1, size, csize
|
|
|
|
except KeyError:
|
|
|
|
assert cdata is not None
|
|
|
|
self.chunks[id_] = 1, size, csize
|
|
|
|
if self.repair:
|
|
|
|
self.repository.put(id_, cdata)
|
|
|
|
|
|
|
|
def verify_file_chunks(item):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Verifies that all file chunks are present
|
|
|
|
|
|
|
|
Missing file chunks will be replaced with new chunks of the same
|
|
|
|
length containing all zeros.
|
|
|
|
"""
|
2014-02-16 21:21:18 +00:00
|
|
|
offset = 0
|
|
|
|
chunk_list = []
|
|
|
|
for chunk_id, size, csize in item[b'chunks']:
|
2015-03-17 22:47:21 +00:00
|
|
|
if chunk_id not in self.chunks:
|
2014-02-16 21:21:18 +00:00
|
|
|
# If a file chunk is missing, create an all empty replacement chunk
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.error('{}: Missing file chunk detected (Byte {}-{})'.format(item[b'path'].decode('utf-8', 'surrogateescape'), offset, offset + size))
|
|
|
|
self.error_found = True
|
2014-02-16 21:21:18 +00:00
|
|
|
data = bytes(size)
|
|
|
|
chunk_id = self.key.id_hash(data)
|
|
|
|
cdata = self.key.encrypt(data)
|
|
|
|
csize = len(cdata)
|
|
|
|
add_reference(chunk_id, size, csize, cdata)
|
|
|
|
else:
|
|
|
|
add_reference(chunk_id, size, csize)
|
|
|
|
chunk_list.append((chunk_id, size, csize))
|
|
|
|
offset += size
|
|
|
|
item[b'chunks'] = chunk_list
|
|
|
|
|
|
|
|
def robust_iterator(archive):
|
2014-02-24 22:37:21 +00:00
|
|
|
"""Iterates through all archive items
|
|
|
|
|
|
|
|
Missing item chunks will be skipped and the msgpack stream will be restarted
|
|
|
|
"""
|
2014-02-24 21:43:17 +00:00
|
|
|
unpacker = RobustUnpacker(lambda item: isinstance(item, dict) and b'path' in item)
|
|
|
|
_state = 0
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2014-02-16 21:21:18 +00:00
|
|
|
def missing_chunk_detector(chunk_id):
|
2014-02-24 21:43:17 +00:00
|
|
|
nonlocal _state
|
2015-03-17 22:47:21 +00:00
|
|
|
if _state % 2 != int(chunk_id not in self.chunks):
|
2014-02-24 21:43:17 +00:00
|
|
|
_state += 1
|
|
|
|
return _state
|
2015-03-17 22:47:21 +00:00
|
|
|
|
2015-11-03 22:45:49 +00:00
|
|
|
def report(msg, chunk_id, chunk_no):
|
|
|
|
cid = hexlify(chunk_id).decode('ascii')
|
|
|
|
msg += ' [chunk: %06d_%s]' % (chunk_no, cid) # see debug-dump-archive-items
|
2015-12-07 23:21:46 +00:00
|
|
|
self.error_found = True
|
|
|
|
logger.error(msg)
|
2015-11-03 22:45:49 +00:00
|
|
|
|
|
|
|
i = 0
|
2014-02-16 21:21:18 +00:00
|
|
|
for state, items in groupby(archive[b'items'], missing_chunk_detector):
|
2014-02-24 21:43:17 +00:00
|
|
|
items = list(items)
|
2014-02-16 21:21:18 +00:00
|
|
|
if state % 2:
|
2015-11-03 22:45:49 +00:00
|
|
|
for chunk_id in items:
|
|
|
|
report('item metadata chunk missing', chunk_id, i)
|
|
|
|
i += 1
|
2014-02-24 21:43:17 +00:00
|
|
|
continue
|
|
|
|
if state > 0:
|
|
|
|
unpacker.resync()
|
2014-03-13 21:29:47 +00:00
|
|
|
for chunk_id, cdata in zip(items, repository.get_many(items)):
|
2014-02-24 21:43:17 +00:00
|
|
|
unpacker.feed(self.key.decrypt(chunk_id, cdata))
|
2015-11-03 22:45:49 +00:00
|
|
|
try:
|
|
|
|
for item in unpacker:
|
|
|
|
if isinstance(item, dict):
|
|
|
|
yield item
|
|
|
|
else:
|
|
|
|
report('Did not get expected metadata dict when unpacking item metadata', chunk_id, i)
|
|
|
|
except Exception:
|
|
|
|
report('Exception while unpacking item metadata', chunk_id, i)
|
|
|
|
raise
|
|
|
|
i += 1
|
2014-02-16 21:21:18 +00:00
|
|
|
|
2015-08-08 20:11:40 +00:00
|
|
|
if archive is None:
|
|
|
|
# we need last N or all archives
|
|
|
|
archive_items = sorted(self.manifest.archives.items(), reverse=True,
|
|
|
|
key=lambda name_info: name_info[1][b'time'])
|
2015-12-12 23:39:15 +00:00
|
|
|
if prefix is not None:
|
|
|
|
archive_items = [item for item in archive_items if item[0].startswith(prefix)]
|
|
|
|
num_archives = len(archive_items)
|
2015-08-08 20:11:40 +00:00
|
|
|
end = None if last is None else min(num_archives, last)
|
|
|
|
else:
|
|
|
|
# we only want one specific archive
|
|
|
|
archive_items = [item for item in self.manifest.archives.items() if item[0] == archive]
|
|
|
|
num_archives = 1
|
|
|
|
end = 1
|
2016-01-16 22:42:54 +00:00
|
|
|
|
|
|
|
with cache_if_remote(self.repository) as repository:
|
|
|
|
for i, (name, info) in enumerate(archive_items[:end]):
|
|
|
|
logger.info('Analyzing archive {} ({}/{})'.format(name, num_archives - i, num_archives))
|
|
|
|
archive_id = info[b'id']
|
|
|
|
if archive_id not in self.chunks:
|
|
|
|
logger.error('Archive metadata block is missing!')
|
|
|
|
self.error_found = True
|
|
|
|
del self.manifest.archives[name]
|
|
|
|
continue
|
|
|
|
mark_as_possibly_superseded(archive_id)
|
|
|
|
cdata = self.repository.get(archive_id)
|
|
|
|
data = self.key.decrypt(archive_id, cdata)
|
|
|
|
archive = StableDict(msgpack.unpackb(data))
|
|
|
|
if archive[b'version'] != 1:
|
|
|
|
raise Exception('Unknown archive metadata version')
|
2016-02-05 01:02:04 +00:00
|
|
|
decode_dict(archive, (b'name', b'hostname', b'username', b'time', b'time_end'))
|
2016-01-16 22:42:54 +00:00
|
|
|
archive[b'cmdline'] = [arg.decode('utf-8', 'surrogateescape') for arg in archive[b'cmdline']]
|
|
|
|
items_buffer = ChunkBuffer(self.key)
|
|
|
|
items_buffer.write_chunk = add_callback
|
|
|
|
for item in robust_iterator(archive):
|
|
|
|
if b'chunks' in item:
|
|
|
|
verify_file_chunks(item)
|
|
|
|
items_buffer.add(item)
|
|
|
|
items_buffer.flush(flush=True)
|
|
|
|
for previous_item_id in archive[b'items']:
|
|
|
|
mark_as_possibly_superseded(previous_item_id)
|
|
|
|
archive[b'items'] = items_buffer.chunks
|
|
|
|
data = msgpack.packb(archive, unicode_errors='surrogateescape')
|
|
|
|
new_archive_id = self.key.id_hash(data)
|
|
|
|
cdata = self.key.encrypt(data)
|
|
|
|
add_reference(new_archive_id, len(data), len(cdata), cdata)
|
|
|
|
info[b'id'] = new_archive_id
|
2014-02-24 22:37:21 +00:00
|
|
|
|
2015-08-09 10:43:57 +00:00
|
|
|
def orphan_chunks_check(self):
|
|
|
|
if self.check_all:
|
|
|
|
unused = set()
|
|
|
|
for id_, (count, size, csize) in self.chunks.iteritems():
|
|
|
|
if count == 0:
|
|
|
|
unused.add(id_)
|
|
|
|
orphaned = unused - self.possibly_superseded
|
|
|
|
if orphaned:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.error('{} orphaned objects found!'.format(len(orphaned)))
|
|
|
|
self.error_found = True
|
2015-08-09 10:43:57 +00:00
|
|
|
if self.repair:
|
|
|
|
for id_ in unused:
|
|
|
|
self.repository.delete(id_)
|
|
|
|
else:
|
2015-12-07 23:21:46 +00:00
|
|
|
logger.warning('Orphaned objects check skipped (needs all archives checked).')
|
2015-08-09 10:43:57 +00:00
|
|
|
|
2015-11-18 01:27:25 +00:00
|
|
|
def finish(self, save_space=False):
|
2014-02-24 22:37:21 +00:00
|
|
|
if self.repair:
|
|
|
|
self.manifest.write()
|
2015-11-18 01:27:25 +00:00
|
|
|
self.repository.commit(save_space=save_space)
|