1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-20 21:27:32 +00:00

repository: checksum index and hints

This commit is contained in:
Marian Beermann 2017-06-01 23:28:11 +02:00
parent a1fa1b7aec
commit f61ee038d0
2 changed files with 88 additions and 23 deletions

View file

@ -24,6 +24,7 @@
from .lrucache import LRUCache
from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
from .algorithms.checksums import crc32
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
logger = create_logger(__name__)
@ -372,13 +373,27 @@ def commit(self, save_space=False):
self.write_index()
self.rollback()
def _read_integrity(self, transaction_id, key=None):
integrity_path = os.path.join(self.path, 'integrity.%d' % transaction_id)
try:
with open(integrity_path, 'rb') as fd:
integrity = msgpack.unpack(fd)
except FileNotFoundError:
return
if key:
return integrity[key].decode()
else:
return integrity
def open_index(self, transaction_id, auto_recover=True):
if transaction_id is None:
return NSIndex()
index_path = os.path.join(self.path, 'index.%d' % transaction_id).encode('utf-8')
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
integrity_data = self._read_integrity(transaction_id, b'index')
try:
return NSIndex.read(index_path)
except (ValueError, OSError) as exc:
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
return NSIndex.read(fd)
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
os.unlink(index_path)
if not auto_recover:
@ -409,11 +424,11 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
raise
if not self.index or transaction_id is None:
try:
self.index = self.open_index(transaction_id, False)
except (ValueError, OSError) as exc:
self.index = self.open_index(transaction_id, auto_recover=False)
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning('Checking repository transaction due to previous error: %s', exc)
self.check_transaction()
self.index = self.open_index(transaction_id, False)
self.index = self.open_index(transaction_id, auto_recover=False)
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
@ -424,11 +439,12 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
self.io.cleanup(transaction_id)
hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
integrity_data = self._read_integrity(transaction_id, b'hints')
try:
with open(hints_path, 'rb') as fd:
with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
hints = msgpack.unpack(fd)
except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError) as e:
logger.warning('Repository hints file missing or corrupted, trying to recover')
except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
if not isinstance(e, FileNotFoundError):
os.unlink(hints_path)
# index must exist at this point
@ -459,28 +475,66 @@ def prepare_txn(self, transaction_id, do_cleanup=True):
shadowed_segments.remove(segment)
def write_index(self):
hints = {b'version': 2,
b'segments': self.segments,
b'compact': self.compact,
b'storage_quota_use': self.storage_quota_use, }
transaction_id = self.io.get_segments_transaction_id()
assert transaction_id is not None
hints_file = os.path.join(self.path, 'hints.%d' % transaction_id)
with open(hints_file + '.tmp', 'wb') as fd:
msgpack.pack(hints, fd)
def flush_and_sync(fd):
fd.flush()
os.fsync(fd.fileno())
os.rename(hints_file + '.tmp', hints_file)
self.index.write(os.path.join(self.path, 'index.tmp'))
os.rename(os.path.join(self.path, 'index.tmp'),
os.path.join(self.path, 'index.%d' % transaction_id))
def rename_tmp(file):
os.rename(file + '.tmp', file)
hints = {
b'version': 2,
b'segments': self.segments,
b'compact': self.compact,
b'storage_quota_use': self.storage_quota_use,
}
integrity = {
b'version': 2,
}
transaction_id = self.io.get_segments_transaction_id()
assert transaction_id is not None
# Log transaction in append-only mode
if self.append_only:
with open(os.path.join(self.path, 'transactions'), 'a') as log:
print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
# Write hints file
hints_name = 'hints.%d' % transaction_id
hints_file = os.path.join(self.path, hints_name)
with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd:
msgpack.pack(hints, fd)
flush_and_sync(fd)
integrity[b'hints'] = fd.integrity_data
# Write repository index
index_name = 'index.%d' % transaction_id
index_file = os.path.join(self.path, index_name)
with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd:
# XXX: Consider using SyncFile for index write-outs.
self.index.write(fd)
flush_and_sync(fd)
integrity[b'index'] = fd.integrity_data
# Write integrity file, containing checksums of the hints and index files
integrity_name = 'integrity.%d' % transaction_id
integrity_file = os.path.join(self.path, integrity_name)
with open(integrity_file + '.tmp', 'wb') as fd:
msgpack.pack(integrity, fd)
flush_and_sync(fd)
# Rename the integrity file first
rename_tmp(integrity_file)
sync_dir(self.path)
# Rename the others after the integrity file is hypothetically on disk
rename_tmp(hints_file)
rename_tmp(index_file)
sync_dir(self.path)
# Remove old auxiliary files
current = '.%d' % transaction_id
for name in os.listdir(self.path):
if not name.startswith(('index.', 'hints.')):
if not name.startswith(('index.', 'hints.', 'integrity.')):
continue
if name.endswith(current):
continue

View file

@ -501,6 +501,11 @@ def setUp(self):
self.repository.commit()
self.repository.close()
def corrupt(self, file):
with open(file, 'r+b') as fd:
fd.seek(-1, io.SEEK_END)
fd.write(b'1')
def do_commit(self):
with self.repository:
self.repository.put(H(0), b'fox')
@ -537,6 +542,12 @@ def test_index_outside_transaction(self):
with self.repository:
assert len(self.repository) == 1
def test_index_corrupted(self):
self.corrupt(os.path.join(self.repository.path, 'index.1'))
with self.repository:
assert len(self.repository) == 1
assert self.repository.get(H(0)) == b'foo'
def test_unreadable_index(self):
index = os.path.join(self.repository.path, 'index.1')
os.unlink(index)