
1418 lines
66 KiB

import errno
import os
import shutil
import struct
from binascii import hexlify, unhexlify
from collections import defaultdict
from configparser import ConfigParser
from datetime import datetime
from functools import partial
from itertools import islice
import msgpack
from .constants import * # NOQA
from .hashindex import NSIndex
from .helpers import Error, ErrorWithTraceback, IntegrityError, format_file_size, parse_file_size
from .helpers import Location
from .helpers import ProgressIndicatorPercent
from .helpers import bin_to_hex
from .helpers import hostname_is_unique
from .helpers import secure_erase, truncate_and_unlink
from .locking import Lock, LockError, LockErrorT
from .logger import create_logger
from .lrucache import LRUCache
from .platform import SaveFile, SyncFile, sync_dir, safe_fadvise
from .algorithms.checksums import crc32
from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
logger = create_logger(__name__)
LIST_SCAN_LIMIT = 100000 # repo.list() / .scan() result count limit the borg client uses
FreeSpace = partial(defaultdict, int)
class Repository:
Filesystem based transactional key value store
Transactionality is achieved by using a log (aka journal) to record changes. The log is a series of numbered files
called segments. Each segment is a series of log entries. The segment number together with the offset of each
entry relative to its segment start establishes an ordering of the log entries. This is the "definition" of
time for the purposes of the log.
Log entries are either PUT, DELETE or COMMIT.
A COMMIT is always the final log entry in a segment and marks all data from the beginning of the log until the
segment ending with the COMMIT as committed and consistent. The segment number of a segment ending with a COMMIT
is called the transaction ID of that commit, and a segment ending with a COMMIT is called committed.
When reading from a repository it is first checked whether the last segment is committed. If it is not, then
all segments after the last committed segment are deleted; they contain log entries whose consistency is not
established by a COMMIT.
Note that the COMMIT can't establish consistency by itself, but only manages to do so with proper support from
the platform (including the hardware). See platform.base.SyncFile for details.
A PUT inserts a key-value pair. The value is stored in the log entry, hence the repository implements
full data logging, meaning that all data is consistent, not just metadata (which is common in file systems).
A DELETE marks a key as deleted.
For a given key only the last entry regarding the key, which is called current (all other entries are called
superseded), is relevant: If there is no entry or the last entry is a DELETE then the key does not exist.
Otherwise the last PUT defines the value of the key.
By superseding a PUT (with either another PUT or a DELETE) the log entry becomes obsolete. A segment containing
such obsolete entries is called sparse, while a segment containing no such entries is called compact.
Sparse segments can be compacted and thereby disk space freed. This destroys the transaction for which the
superseded entries where current.
On disk layout:
dir/data/<X // SEGMENTS_PER_DIR>/<X>
File system interaction
LoggedIO generally tries to rely on common behaviours across transactional file systems.
Segments that are deleted are truncated first, which avoids problems if the FS needs to
allocate space to delete the dirent of the segment. This mostly affects CoW file systems,
traditional journaling file systems have a fairly good grip on this problem.
Note that deletion, i.e. unlink(2), is atomic on every file system that uses inode reference
counts, which includes pretty much all of them. To remove a dirent the inodes refcount has
to be decreased, but you can't decrease the refcount before removing the dirent nor can you
decrease the refcount after removing the dirent. File systems solve this with a lock,
and by ensuring it all stays within the same FS transaction.
Truncation is generally not atomic in itself, and combining truncate(2) and unlink(2) is of
course never guaranteed to be atomic. Truncation in a classic extent-based FS is done in
roughly two phases, first the extents are removed then the inode is updated. (In practice
this is of course way more complex).
LoggedIO gracefully handles truncate/unlink splits as long as the truncate resulted in
a zero length file. Zero length segments are considered to not exist, while LoggedIO.cleanup()
will still get rid of them.
class DoesNotExist(Error):
"""Repository {} does not exist."""
class AlreadyExists(Error):
"""A repository already exists at {}."""
class InvalidRepository(Error):
"""{} is not a valid repository. Check repo config."""
class CheckNeeded(ErrorWithTraceback):
"""Inconsistency detected. Please run "borg check {}"."""
class ObjectNotFound(ErrorWithTraceback):
"""Object with key {} not found in repository {}."""
def __init__(self, id, repo):
if isinstance(id, bytes):
id = bin_to_hex(id)
super().__init__(id, repo)
class InsufficientFreeSpaceError(Error):
"""Insufficient free space to complete transaction (required: {}, available: {})."""
class StorageQuotaExceeded(Error):
"""The storage quota ({}) has been exceeded ({}). Try deleting some archives."""
def __init__(self, path, create=False, exclusive=False, lock_wait=None, lock=True,
append_only=False, storage_quota=None):
self.path = os.path.abspath(path)
self._location = Location('file://%s' % self.path) = None # type: LoggedIO
self.lock = None
self.index = None
# This is an index of shadowed log entries during this transaction. Consider the following sequence:
# segment_n PUT A, segment_x DELETE A
# After the "DELETE A" in segment_x the shadow index will contain "A -> [n]".
self.shadow_index = {}
self._active_txn = False
self.lock_wait = lock_wait
self.do_lock = lock
self.do_create = create
self.created = False
self.exclusive = exclusive
self.append_only = append_only
self.storage_quota = storage_quota
self.storage_quota_use = 0
self.transaction_doomed = None
def __del__(self):
if self.lock:
assert False, "cleanup happened in Repository.__del__"
def __repr__(self):
return '<%s %s>' % (self.__class__.__name__, self.path)
def __enter__(self):
if self.do_create:
self.do_create = False
self.created = True, bool(self.exclusive), lock_wait=self.lock_wait, lock=self.do_lock)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
no_space_left_on_device = exc_type is OSError and exc_val.errno == errno.ENOSPC
# The ENOSPC could have originated somewhere else besides the Repository. The cleanup is always safe, unless
# EIO or FS corruption ensues, which is why we specifically check for ENOSPC.
if self._active_txn and no_space_left_on_device:
logger.warning('No space left on device, cleaning up partial transaction to free space.')
cleanup = True
cleanup = False
def id_str(self):
return bin_to_hex(
def check_can_create_repository(self, path):
Raise self.AlreadyExists if a repository already exists at *path* or any parent directory.
Checking parent directories is done for two reasons:
(1) It's just a weird thing to do, and usually not intended. A Borg using the "parent" repository
may be confused, or we may accidentally put stuff into the "data/" or "data/<n>/" directories.
(2) When implementing repository quotas (which we currently don't), it's important to prohibit
folks from creating quota-free repositories. Since no one can create a repository within another
repository, user's can only use the quota'd repository, when their --restrict-to-path points
at the user's repository.
if os.path.exists(path) and (not os.path.isdir(path) or os.listdir(path)):
raise self.AlreadyExists(path)
while True:
# Check all parent directories for Borg's repository README
previous_path = path
# Thus, path = previous_path/..
path = os.path.abspath(os.path.join(previous_path, os.pardir))
if path == previous_path:
# We reached the root of the directory hierarchy (/.. = / and C:\.. = C:\).
# Use binary mode to avoid troubles if a README contains some stuff not in our locale
with open(os.path.join(path, 'README'), 'rb') as fd:
# Read only the first ~100 bytes (if any), in case some README file we stumble upon is large.
readme_head =
# The first comparison captures our current variant (REPOSITORY_README), the second comparison
# is an older variant of the README file (used by 1.0.x).
if b'Borg Backup repository' in readme_head or b'Borg repository' in readme_head:
raise self.AlreadyExists(path)
except OSError:
# Ignore FileNotFound, PermissionError, ...
def create(self, path):
"""Create a new empty repository at `path`
if not os.path.exists(path):
with open(os.path.join(path, 'README'), 'w') as fd:
os.mkdir(os.path.join(path, 'data'))
config = ConfigParser(interpolation=None)
config.set('repository', 'version', '1')
config.set('repository', 'segments_per_dir', str(DEFAULT_SEGMENTS_PER_DIR))
config.set('repository', 'max_segment_size', str(DEFAULT_MAX_SEGMENT_SIZE))
config.set('repository', 'append_only', str(int(self.append_only)))
if self.storage_quota:
config.set('repository', 'storage_quota', str(self.storage_quota))
config.set('repository', 'storage_quota', '0')
config.set('repository', 'additional_free_space', '0')
config.set('repository', 'id', bin_to_hex(os.urandom(32)))
self.save_config(path, config)
def save_config(self, path, config):
config_path = os.path.join(path, 'config')
old_config_path = os.path.join(path, 'config.old')
if os.path.isfile(old_config_path):
logger.warning("Old config file not securely erased on previous config update")
if os.path.isfile(config_path):
try:, old_config_path)
except OSError as e:
if e.errno in (errno.EMLINK, errno.ENOSYS, errno.EPERM):
logger.warning("Hardlink failed, cannot securely erase old config file")
with SaveFile(config_path) as fd:
if os.path.isfile(old_config_path):
def save_key(self, keydata):
assert self.config
keydata = keydata.decode('utf-8') # remote repo: msgpack issue #99, getting bytes
self.config.set('repository', 'key', keydata)
self.save_config(self.path, self.config)
def load_key(self):
keydata = self.config.get('repository', 'key')
return keydata.encode('utf-8') # remote repo: msgpack issue #99, returning bytes
def get_free_nonce(self):
if not self.lock.got_exclusive_lock():
raise AssertionError("bug in code, exclusive lock should exist here")
nonce_path = os.path.join(self.path, 'nonce')
with open(nonce_path, 'r') as fd:
return int.from_bytes(unhexlify(, byteorder='big')
except FileNotFoundError:
return None
def commit_nonce_reservation(self, next_unreserved, start_nonce):
if not self.lock.got_exclusive_lock():
raise AssertionError("bug in code, exclusive lock should exist here")
if self.get_free_nonce() != start_nonce:
raise Exception("nonce space reservation with mismatched previous state")
nonce_path = os.path.join(self.path, 'nonce')
with SaveFile(nonce_path, binary=False) as fd:
fd.write(bin_to_hex(next_unreserved.to_bytes(8, byteorder='big')))
def destroy(self):
"""Destroy the repository at `self.path`
if self.append_only:
raise ValueError(self.path + " is in append-only mode")
os.remove(os.path.join(self.path, 'config')) # kill config first
def get_index_transaction_id(self):
indices = sorted(int(fn[6:])
for fn in os.listdir(self.path)
if fn.startswith('index.') and fn[6:].isdigit() and os.stat(os.path.join(self.path, fn)).st_size != 0)
if indices:
return indices[-1]
return None
def check_transaction(self):
index_transaction_id = self.get_index_transaction_id()
segments_transaction_id =
if index_transaction_id is not None and segments_transaction_id is None:
# we have a transaction id from the index, but we did not find *any*
# commit in the segment files (thus no segments transaction id).
# this can happen if a lot of segment files are lost, e.g. due to a
# filesystem or hardware malfunction. it means we have no identifiable
# valid (committed) state of the repo which we could use.
msg = '%s" - although likely this is "beyond repair' % self.path # dirty hack
raise self.CheckNeeded(msg)
# Attempt to automatically rebuild index if we crashed between commit
# tag write and index save
if index_transaction_id != segments_transaction_id:
if index_transaction_id is not None and index_transaction_id > segments_transaction_id:
replay_from = None
replay_from = index_transaction_id
self.replay_segments(replay_from, segments_transaction_id)
def get_transaction_id(self):
return self.get_index_transaction_id()
def break_lock(self):
Lock(os.path.join(self.path, 'lock')).break_lock()
def open(self, path, exclusive, lock_wait=None, lock=True):
self.path = path
if not os.path.isdir(path):
raise self.DoesNotExist(path)
if lock:
self.lock = Lock(os.path.join(path, 'lock'), exclusive, timeout=lock_wait, kill_stale_locks=hostname_is_unique()).acquire()
self.lock = None
self.config = ConfigParser(interpolation=None), 'config'))
if 'repository' not in self.config.sections() or self.config.getint('repository', 'version') != 1:
raise self.InvalidRepository(path)
self.max_segment_size = self.config.getint('repository', 'max_segment_size')
self.segments_per_dir = self.config.getint('repository', 'segments_per_dir')
self.additional_free_space = parse_file_size(self.config.get('repository', 'additional_free_space', fallback=0))
# append_only can be set in the constructor
# it shouldn't be overridden (True -> False) here
self.append_only = self.append_only or self.config.getboolean('repository', 'append_only', fallback=False)
if self.storage_quota is None:
# self.storage_quota is None => no explicit storage_quota was specified, use repository setting.
self.storage_quota = self.config.getint('repository', 'storage_quota', fallback=0) = unhexlify(self.config.get('repository', 'id').strip()) = LoggedIO(self.path, self.max_segment_size, self.segments_per_dir)
def close(self):
if self.lock:
if = None
self.lock = None
def commit(self, save_space=False):
"""Commit transaction
# save_space is not used anymore, but stays for RPC/API compatibility.
if self.transaction_doomed:
exception = self.transaction_doomed
raise exception
if not self.append_only:
def _read_integrity(self, transaction_id, key):
integrity_file = 'integrity.%d' % transaction_id
integrity_path = os.path.join(self.path, integrity_file)
with open(integrity_path, 'rb') as fd:
integrity = msgpack.unpack(fd)
except FileNotFoundError:
if integrity.get(b'version') != 2:
logger.warning('Unknown integrity data version %r in %s', integrity.get(b'version'), integrity_file)
return integrity[key].decode()
def open_index(self, transaction_id, auto_recover=True):
if transaction_id is None:
return NSIndex()
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
integrity_data = self._read_integrity(transaction_id, b'index')
with IntegrityCheckedFile(index_path, write=False, integrity_data=integrity_data) as fd:
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning('Repository index missing or corrupted, trying to recover from: %s', exc)
if not auto_recover:
# don't leave an open transaction around
return self.open_index(self.get_transaction_id())
def prepare_txn(self, transaction_id, do_cleanup=True):
self._active_txn = True
if not self.lock.got_exclusive_lock():
if self.exclusive is not None:
# self.exclusive is either True or False, thus a new client is active here.
# if it is False and we get here, the caller did not use exclusive=True although
# it is needed for a write operation. if it is True and we get here, something else
# went very wrong, because we should have a exclusive lock, but we don't.
raise AssertionError("bug in code, exclusive lock should exist here")
# if we are here, this is an old client talking to a new server (expecting lock upgrade).
# or we are replaying segments and might need a lock upgrade for that.
except (LockError, LockErrorT):
# if upgrading the lock to exclusive fails, we do not have an
# active transaction. this is important for "serve" mode, where
# the repository instance lives on - even if exceptions happened.
self._active_txn = False
if not self.index or transaction_id is None:
self.index = self.open_index(transaction_id, auto_recover=False)
except (ValueError, OSError, FileIntegrityError) as exc:
logger.warning('Checking repository transaction due to previous error: %s', exc)
self.index = self.open_index(transaction_id, auto_recover=False)
if transaction_id is None:
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
self.storage_quota_use = 0
if do_cleanup:
hints_path = os.path.join(self.path, 'hints.%d' % transaction_id)
index_path = os.path.join(self.path, 'index.%d' % transaction_id)
integrity_data = self._read_integrity(transaction_id, b'hints')
with IntegrityCheckedFile(hints_path, write=False, integrity_data=integrity_data) as fd:
hints = msgpack.unpack(fd)
except (msgpack.UnpackException, msgpack.ExtraData, FileNotFoundError, FileIntegrityError) as e:
logger.warning('Repository hints file missing or corrupted, trying to recover: %s', e)
if not isinstance(e, FileNotFoundError):
# index must exist at this point
if hints[b'version'] == 1:
logger.debug('Upgrading from v1 hints.%d', transaction_id)
self.segments = hints[b'segments']
self.compact = FreeSpace()
self.storage_quota_use = 0
for segment in sorted(hints[b'compact']):
logger.debug('Rebuilding sparse info for segment %d', segment)
logger.debug('Upgrade to v2 hints complete')
elif hints[b'version'] != 2:
raise ValueError('Unknown hints file version: %d' % hints[b'version'])
self.segments = hints[b'segments']
self.compact = FreeSpace(hints[b'compact'])
self.storage_quota_use = hints.get(b'storage_quota_use', 0)
# Drop uncommitted segments in the shadow index
for key, shadowed_segments in self.shadow_index.items():
for segment in list(shadowed_segments):
if segment > transaction_id:
def write_index(self):
def flush_and_sync(fd):
def rename_tmp(file):
os.rename(file + '.tmp', file)
hints = {
b'version': 2,
b'segments': self.segments,
b'compact': self.compact,
b'storage_quota_use': self.storage_quota_use,
integrity = {
# Integrity version started at 2, the current hints version.
# Thus, integrity version == hints version, for now.
b'version': 2,
transaction_id =
assert transaction_id is not None
# Log transaction in append-only mode
if self.append_only:
with open(os.path.join(self.path, 'transactions'), 'a') as log:
print('transaction %d, UTC time %s' % (transaction_id, datetime.utcnow().isoformat()), file=log)
# Write hints file
hints_name = 'hints.%d' % transaction_id
hints_file = os.path.join(self.path, hints_name)
with IntegrityCheckedFile(hints_file + '.tmp', filename=hints_name, write=True) as fd:
msgpack.pack(hints, fd)
integrity[b'hints'] = fd.integrity_data
# Write repository index
index_name = 'index.%d' % transaction_id
index_file = os.path.join(self.path, index_name)
with IntegrityCheckedFile(index_file + '.tmp', filename=index_name, write=True) as fd:
# XXX: Consider using SyncFile for index write-outs.
integrity[b'index'] = fd.integrity_data
# Write integrity file, containing checksums of the hints and index files
integrity_name = 'integrity.%d' % transaction_id
integrity_file = os.path.join(self.path, integrity_name)
with open(integrity_file + '.tmp', 'wb') as fd:
msgpack.pack(integrity, fd)
# Rename the integrity file first
# Rename the others after the integrity file is hypothetically on disk
# Remove old auxiliary files
current = '.%d' % transaction_id
for name in os.listdir(self.path):
if not name.startswith(('index.', 'hints.', 'integrity.')):
if name.endswith(current):
os.unlink(os.path.join(self.path, name))
self.index = None
def check_free_space(self):
"""Pre-commit check for sufficient free space to actually perform the commit."""
# As a baseline we take four times the current (on-disk) index size.
# At this point the index may only be updated by compaction, which won't resize it.
# We still apply a factor of four so that a later, separate invocation can free space
# (journaling all deletes for all chunks is one index size) or still make minor additions
# (which may grow the index up to twice its current size).
# Note that in a subsequent operation the committed index is still on-disk, therefore we
# arrive at index_size * (1 + 2 + 1).
# In that order: journaled deletes (1), hashtable growth (2), persisted index (1).
required_free_space = self.index.size() * 4
# Conservatively estimate hints file size:
# 10 bytes for each segment-refcount pair, 10 bytes for each segment-space pair
# Assume maximum of 5 bytes per integer. Segment numbers will usually be packed more densely (1-3 bytes),
# as will refcounts and free space integers. For 5 MiB segments this estimate is good to ~20 PB repo size.
# Add 4K to generously account for constant format overhead.
hints_size = len(self.segments) * 10 + len(self.compact) * 10 + 4096
required_free_space += hints_size
required_free_space += self.additional_free_space
if not self.append_only:
full_segment_size = self.max_segment_size + MAX_OBJECT_SIZE
if len(self.compact) < 10:
# This is mostly for the test suite to avoid overestimated free space needs. This can be annoying
# if TMP is a small-ish tmpfs.
compact_working_space = sum( - free for segment, free in self.compact.items())
logger.debug('check_free_space: few segments, not requiring a full free segment')
compact_working_space = min(compact_working_space, full_segment_size)
logger.debug('check_free_space: calculated working space for compact as %d bytes', compact_working_space)
required_free_space += compact_working_space
# Keep one full worst-case segment free in non-append-only mode
required_free_space += full_segment_size
st_vfs = os.statvfs(self.path)
except OSError as os_error:
logger.warning('Failed to check free space before committing: ' + str(os_error))
# f_bavail: even as root - don't touch the Federal Block Reserve!
free_space = st_vfs.f_bavail * st_vfs.f_bsize
logger.debug('check_free_space: required bytes {}, free bytes {}'.format(required_free_space, free_space))
if free_space < required_free_space:
if self.created:
logger.error('Not enough free space to initialize repository at this location.')
formatted_required = format_file_size(required_free_space)
formatted_free = format_file_size(free_space)
raise self.InsufficientFreeSpaceError(formatted_required, formatted_free)
def log_storage_quota(self):
if self.storage_quota:'Storage quota: %s out of %s used.',
format_file_size(self.storage_quota_use), format_file_size(self.storage_quota))
def compact_segments(self):
"""Compact sparse segments by copying data into new segments
if not self.compact:
index_transaction_id = self.get_index_transaction_id()
segments = self.segments
unused = [] # list of segments, that are not used anymore
logger = create_logger('borg.debug.compact_segments')
def complete_xfer(intermediate=True):
# complete the current transfer (when some target segment is full)
nonlocal unused
# commit the new, compact, used segments
segment =
logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
# get rid of the old, sparse, unused segments. free space.
for segment in unused:
logger.debug('complete_xfer: deleting unused segment %d', segment)
assert self.segments.pop(segment) == 0, 'Corrupted segment reference count - corrupted index or hints'
del self.compact[segment]
unused = []
logger.debug('compaction started.')
pi = ProgressIndicatorPercent(total=len(self.compact), msg='Compacting segments %3.0f%%', step=1,
for segment, freeable_space in sorted(self.compact.items()):
if not
logger.warning('segment %d not found, but listed in compaction data', segment)
del self.compact[segment]
segment_size =
if segment_size > 0.2 * self.max_segment_size and freeable_space < 0.15 * segment_size:
logger.debug('not compacting segment %d (only %d bytes are sparse)', segment, freeable_space)
segments.setdefault(segment, 0)
logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
segment, segments[segment], freeable_space)
for tag, key, offset, data in, include_data=True):
if tag == TAG_COMMIT:
in_index = self.index.get(key)
is_index_object = in_index == (segment, offset)
if tag == TAG_PUT and is_index_object:
new_segment, offset =, data, raise_full=True)
except LoggedIO.SegmentFull:
new_segment, offset =, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_PUT and not is_index_object:
# If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
# this loop. Therefore it is removed from the shadow index.
except (KeyError, ValueError):
elif tag == TAG_DELETE and not in_index:
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
# therefore we do not drop the delete, but write it to a current segment.
shadowed_put_exists = key not in self.shadow_index or any(
# If the key is in the shadow index and there is any segment with an older PUT of this
# key, we have a shadowed put.
shadowed < segment for shadowed in self.shadow_index[key])
delete_is_not_stable = index_transaction_id is None or segment > index_transaction_id
if shadowed_put_exists or delete_is_not_stable:
# (introduced in 6425d16aa84be1eaaf88)
# This is needed to avoid object un-deletion if we crash between the commit and the deletion
# of old segments in complete_xfer().
# However, this only happens if the crash also affects the FS to the effect that file deletions
# did not materialize consistently after journal recovery. If they always materialize in-order
# then this is not a problem, because the old segment containing a deleted object would be deleted
# before the segment containing the delete.
# Consider the following series of operations if we would not do this, ie. this entire if:
# would be removed.
# Columns are segments, lines are different keys (line 1 = some key, line 2 = some other key)
# Legend: P=TAG_PUT, D=TAG_DELETE, c=commit, i=index is written for latest commit
# Segment | 1 | 2 | 3
# --------+-------+-----+------
# Key 1 | P | D |
# Key 2 | P | | P
# commits | c i | c | c i
# --------+-------+-----+------
# ^- compact_segments starts
# ^- complete_xfer commits, after that complete_xfer deletes
# segments 1 and 2 (and then the index would be written).
# Now we crash. But only segment 2 gets deleted, while segment 1 is still around. Now key 1
# is suddenly undeleted (because the delete in segment 2 is now missing).
# Again, note the requirement here. We delete these in the correct order that this doesn't happen,
# and only if the FS materialization of these deletes is reordered or parts dropped this can happen.
# In this case it doesn't cause outright corruption, 'just' an index count mismatch, which will be
# fixed by borg-check --repair.
# Note that in this check the index state is the proxy for a "most definitely settled" repository state,
# ie. the assumption is that *all* operations on segments <= index state are completed and stable.
new_segment, size =, raise_full=True)
except LoggedIO.SegmentFull:
new_segment, size =
self.compact[new_segment] += size
segments.setdefault(new_segment, 0)
assert segments[segment] == 0, 'Corrupted segment reference count - corrupted index or hints'
logger.debug('compaction completed.')
def replay_segments(self, index_transaction_id, segments_transaction_id):
# fake an old client, so that in case we do not have an exclusive lock yet, prepare_txn will upgrade the lock:
remember_exclusive = self.exclusive
self.exclusive = None
self.prepare_txn(index_transaction_id, do_cleanup=False)
segment_count = sum(1 for _ in
pi = ProgressIndicatorPercent(total=segment_count, msg='Replaying segments %3.0f%%',
for i, (segment, filename) in enumerate(
if index_transaction_id is not None and segment <= index_transaction_id:
if segment > segments_transaction_id:
objects =
self._update_index(segment, objects)
self.exclusive = remember_exclusive
def _update_index(self, segment, objects, report=None):
"""some code shared between replay_segments and check"""
self.segments[segment] = 0
for tag, key, offset, size in objects:
if tag == TAG_PUT:
# If this PUT supersedes an older PUT, mark the old segment for compaction and count the free space
s, _ = self.index[key]
self.compact[s] += size
self.segments[s] -= 1
except KeyError:
self.index[key] = segment, offset
self.segments[segment] += 1
self.storage_quota_use += size
elif tag == TAG_DELETE:
# if the deleted PUT is not in the index, there is nothing to clean up
s, offset = self.index.pop(key)
except KeyError:
# the old index is not necessarily valid for this transaction (e.g. compaction); if the segment
# is already gone, then it was already compacted.
self.segments[s] -= 1
size =, offset, key, read_data=False)
self.storage_quota_use -= size
self.compact[s] += size
elif tag == TAG_COMMIT:
msg = 'Unexpected tag {} in segment {}'.format(tag, segment)
if report is None:
raise self.CheckNeeded(msg)
if self.segments[segment] == 0:
self.compact[segment] +=
def _rebuild_sparse(self, segment):
"""Rebuild sparse bytes count for a single segment relative to the current index."""
self.compact[segment] = 0
if self.segments[segment] == 0:
self.compact[segment] +=
for tag, key, offset, size in, read_data=False):
if tag == TAG_PUT:
if self.index.get(key, (-1, -1)) != (segment, offset):
# This PUT is superseded later
self.compact[segment] += size
elif tag == TAG_DELETE:
# The outcome of the DELETE has been recorded in the PUT branch already
self.compact[segment] += size
def check(self, repair=False, save_space=False):
"""Check repository consistency
This method verifies all segment checksums and makes sure
the index is consistent with the data stored in the segments.
if self.append_only and repair:
raise ValueError(self.path + " is in append-only mode")
error_found = False
def report_error(msg):
nonlocal error_found
error_found = True
logger.error(msg)'Starting repository check')
assert not self._active_txn
transaction_id = self.get_transaction_id()
current_index = self.open_index(transaction_id)
logger.debug('Read committed index of transaction %d', transaction_id)
except Exception as exc:
transaction_id =
current_index = None
logger.debug('Failed to read committed index (%s)', exc)
if transaction_id is None:
logger.debug('No segments transaction found')
transaction_id = self.get_index_transaction_id()
if transaction_id is None:
logger.debug('No index transaction found, trying latest segment')
transaction_id =
if transaction_id is None:
report_error('This repository contains no valid data.')
return False
if repair:
segments_transaction_id =
logger.debug('Segment transaction is %s', segments_transaction_id)
logger.debug('Determined transaction is %s', transaction_id)
self.prepare_txn(None) # self.index, self.compact, self.segments all empty now!
segment_count = sum(1 for _ in
logger.debug('Found %d segments', segment_count)
pi = ProgressIndicatorPercent(total=segment_count, msg='Checking segments %3.1f%%', step=0.1,
for i, (segment, filename) in enumerate(
if segment > transaction_id:
objects = list(
except IntegrityError as err:
objects = []
if repair:, filename)
objects = list(
self._update_index(segment, objects, report_error)
# self.index, self.segments, self.compact now reflect the state of the segment files up to <transaction_id>
# We might need to add a commit tag if no committed segment is found
if repair and segments_transaction_id is None:
report_error('Adding commit tag to segment {}'.format(transaction_id)) = transaction_id + 1'Starting repository index check')
if current_index and not repair:
# current_index = "as found on disk"
# self.index = "as rebuilt in-memory from segments"
if len(current_index) != len(self.index):
report_error('Index object count mismatch.')
logger.error('committed index: %d objects', len(current_index))
logger.error('rebuilt index: %d objects', len(self.index))
line_format = '%-64s %-16s %-16s'
not_found = '<not found>'
logger.warning(line_format, 'ID', 'rebuilt index', 'committed index')
for key, value in self.index.iteritems():
current_value = current_index.get(key, not_found)
if current_value != value:
logger.warning(line_format, bin_to_hex(key), value, current_value)
for key, current_value in current_index.iteritems():
if key in self.index:
value = self.index.get(key, not_found)
if current_value != value:
logger.warning(line_format, bin_to_hex(key), value, current_value)
elif current_index:
for key, value in self.index.iteritems():
if current_index.get(key, (-1, -1)) != value:
report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
if repair:
if error_found:
if repair:'Completed repository check, errors found and repaired.')
logger.error('Completed repository check, errors found.')
else:'Completed repository check, no problems found.')
return not error_found or repair
def _rollback(self, *, cleanup):
if cleanup:
self.index = None
self._active_txn = False
self.transaction_doomed = None
def rollback(self):
# note: when used in remote mode, this is time limited, see RemoteRepository.shutdown_time.
def __len__(self):
if not self.index:
self.index = self.open_index(self.get_transaction_id())
return len(self.index)
def __contains__(self, id):
if not self.index:
self.index = self.open_index(self.get_transaction_id())
return id in self.index
def list(self, limit=None, marker=None):
list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
if not self.index:
self.index = self.open_index(self.get_transaction_id())
return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
def scan(self, limit=None, marker=None):
list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
fetching data in this order does linear reads and reuses stuff from disk cache.
We rely on repository.check() has run already (either now or some time before) and that:
- if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
- if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
- the repository segments are valid (no CRC errors).
if we encounter CRC errors in segment entry headers, rest of segment is skipped.
if limit is not None and limit < 1:
raise ValueError('please use limit > 0 or limit = None')
if not self.index:
transaction_id = self.get_transaction_id()
self.index = self.open_index(transaction_id)
at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
start_segment, start_offset = (0, 0) if at_start else self.index[marker]
result = []
for segment, filename in
obj_iterator =, start_offset, read_data=False, include_data=False)
while True:
tag, id, offset, size = next(obj_iterator)
except (StopIteration, IntegrityError):
# either end-of-segment or an error - we can not seek to objects at
# higher offsets than one that has an error in the header fields
if start_offset > 0:
# we are using a marker and the marker points to the last object we have already
# returned in the previous scan() call - thus, we need to skip this one object.
# also, for the next segment, we need to start at offset 0.
start_offset = 0
if tag == TAG_PUT and (segment, offset) == self.index.get(id):
# we have found an existing and current object
if len(result) == limit:
return result
return result
def get(self, id):
if not self.index:
self.index = self.open_index(self.get_transaction_id())
segment, offset = self.index[id]
return, offset, id)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
def get_many(self, ids, is_preloaded=False):
for id_ in ids:
yield self.get(id_)
def put(self, id, data, wait=True):
"""put a repo object
Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
if not self._active_txn:
segment, offset = self.index[id]
except KeyError:
self.segments[segment] -= 1
size =, offset, id, read_data=False)
self.storage_quota_use -= size
self.compact[segment] += size
segment, size =
self.compact[segment] += size
self.segments.setdefault(segment, 0)
segment, offset =, data)
self.storage_quota_use += len(data) +
self.segments.setdefault(segment, 0)
self.segments[segment] += 1
self.index[id] = segment, offset
if self.storage_quota and self.storage_quota_use > self.storage_quota:
self.transaction_doomed = self.StorageQuotaExceeded(
format_file_size(self.storage_quota), format_file_size(self.storage_quota_use))
raise self.transaction_doomed
def delete(self, id, wait=True):
"""delete a repo object
Note: when doing calls with wait=False this gets async and caller must
deal with async results / exceptions later.
if not self._active_txn:
segment, offset = self.index.pop(id)
except KeyError:
raise self.ObjectNotFound(id, self.path) from None
self.shadow_index.setdefault(id, []).append(segment)
self.segments[segment] -= 1
size =, offset, id, read_data=False)
self.storage_quota_use -= size
self.compact[segment] += size
segment, size =
self.compact[segment] += size
self.segments.setdefault(segment, 0)
def async_response(self, wait=True):
"""Get one async result (only applies to remote repositories).
async commands (== calls with wait=False, e.g. delete and put) have no results,
but may raise exceptions. These async exceptions must get collected later via
async_response() calls. Repeat the call until it returns None.
The previous calls might either return one (non-None) result or raise an exception.
If wait=True is given and there are outstanding responses, it will wait for them
to arrive. With wait=False, it will only return already received responses.
def preload(self, ids):
"""Preload objects (only applies to remote repositories)
class LoggedIO:
class SegmentFull(Exception):
"""raised when a segment is full, before opening next"""
header_fmt = struct.Struct('<IIB')
assert header_fmt.size == 9
put_header_fmt = struct.Struct('<IIB32s')
assert put_header_fmt.size == 41
header_no_crc_fmt = struct.Struct('<IB')
assert header_no_crc_fmt.size == 5
crc_fmt = struct.Struct('<I')
assert crc_fmt.size == 4
_commit = header_no_crc_fmt.pack(9, TAG_COMMIT)
COMMIT = crc_fmt.pack(crc32(_commit)) + _commit
def __init__(self, path, limit, segments_per_dir, capacity=90):
self.path = path
self.fds = LRUCache(capacity,
self.segment = 0
self.limit = limit
self.segments_per_dir = segments_per_dir
self.offset = 0
self._write_fd = None
def close(self):
self.fds = None # Just to make sure we're disabled
def close_fd(self, fd):
safe_fadvise(fd.fileno(), 0, 0, 'DONTNEED')
def segment_iterator(self, segment=None, reverse=False):
if segment is None:
segment = 0 if not reverse else 2 ** 32 - 1
data_path = os.path.join(self.path, 'data')
start_segment_dir = segment // self.segments_per_dir
dirs = os.listdir(data_path)
if not reverse:
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) >= start_segment_dir]
dirs = [dir for dir in dirs if dir.isdigit() and int(dir) <= start_segment_dir]
dirs = sorted(dirs, key=int, reverse=reverse)
for dir in dirs:
filenames = os.listdir(os.path.join(data_path, dir))
if not reverse:
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) >= segment]
filenames = [filename for filename in filenames if filename.isdigit() and int(filename) <= segment]
filenames = sorted(filenames, key=int, reverse=reverse)
for filename in filenames:
# Note: Do not filter out logically deleted segments (see "File system interaction" above),
# since this is used by cleanup and txn state detection as well.
yield int(filename), os.path.join(data_path, dir, filename)
def get_latest_segment(self):
for segment, filename in self.segment_iterator(reverse=True):
return segment
return None
def get_segments_transaction_id(self):
"""Return the last committed segment.
for segment, filename in self.segment_iterator(reverse=True):
if self.is_committed_segment(segment):
return segment
return None
def cleanup(self, transaction_id):
"""Delete segment files left by aborted transactions
self.segment = transaction_id + 1
for segment, filename in self.segment_iterator(reverse=True):
if segment > transaction_id:
def is_committed_segment(self, segment):
"""Check if segment ends with a COMMIT_TAG tag
iterator = self.iter_objects(segment)
except IntegrityError:
return False
with open(self.segment_filename(segment), 'rb') as fd:
try:, os.SEEK_END)
except OSError as e:
# return False if segment file is empty or too small
if e.errno == errno.EINVAL:
return False
raise e
if != self.COMMIT:
return False
seen_commit = False
while True:
tag, key, offset, _ = next(iterator)
except IntegrityError:
return False
except StopIteration:
if tag == TAG_COMMIT:
seen_commit = True
if seen_commit:
return False
return seen_commit
def segment_filename(self, segment):
return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
def get_write_fd(self, no_new=False, raise_full=False):
if not no_new and self.offset and self.offset > self.limit:
if raise_full:
raise self.SegmentFull
if not self._write_fd:
if self.segment % self.segments_per_dir == 0:
dirname = os.path.join(self.path, 'data', str(self.segment // self.segments_per_dir))
if not os.path.exists(dirname):
sync_dir(os.path.join(self.path, 'data'))
self._write_fd = SyncFile(self.segment_filename(self.segment), binary=True)
self.offset = MAGIC_LEN
return self._write_fd
def get_fd(self, segment):
return self.fds[segment]
except KeyError:
fd = open(self.segment_filename(segment), 'rb')
self.fds[segment] = fd
return fd
def close_segment(self):
# set self._write_fd to None early to guard against reentry from error handling code paths:
fd, self._write_fd = self._write_fd, None
if fd is not None:
self.segment += 1
self.offset = 0
def delete_segment(self, segment):
if segment in self.fds:
del self.fds[segment]
except FileNotFoundError:
def segment_exists(self, segment):
filename = self.segment_filename(segment)
# When deleting segments, they are first truncated. If truncate(2) and unlink(2) are split
# across FS transactions, then logically deleted segments will show up as truncated.
return os.path.exists(filename) and os.path.getsize(filename)
def segment_size(self, segment):
return os.path.getsize(self.segment_filename(segment))
def iter_objects(self, segment, offset=0, include_data=False, read_data=True):
Return object iterator for *segment*.
If read_data is False then include_data must be False as well.
Integrity checks are skipped: all data obtained from the iterator must be considered informational.
The iterator returns four-tuples of (tag, key, offset, data|size).
fd = self.get_fd(segment)
if offset == 0:
# we are touching this segment for the first time, check the MAGIC.
# Repository.scan() calls us with segment > 0 when it continues an ongoing iteration
# from a marker position - but then we have checked the magic before already.
if != MAGIC:
raise IntegrityError('Invalid segment magic [segment {}, offset {}]'.format(segment, 0))
offset = MAGIC_LEN
header =
while header:
size, tag, key, data = self._read(fd, self.header_fmt, header, segment, offset,
if include_data:
yield tag, key, offset, data
yield tag, key, offset, size
offset += size
# we must get the fd via get_fd() here again as we yielded to our caller and it might
# have triggered closing of the fd we had before (e.g. by calling for
# different segment(s)).
# by calling get_fd() here again we also make our fd "recently used" so it likely
# does not get kicked out of self.fds LRUcache.
fd = self.get_fd(segment)
header =
def recover_segment(self, segment, filename):
if segment in self.fds:
del self.fds[segment]
with open(filename, 'rb') as fd:
# XXX: Rather use mmap, this loads the entire segment (up to 500 MB by default) into memory.
data = memoryview(
os.rename(filename, filename + '.beforerecover')'attempting to recover ' + filename)
with open(filename, 'wb') as fd:
while len(data) >= self.header_fmt.size:
crc, size, tag = self.header_fmt.unpack(data[:self.header_fmt.size])
if size < self.header_fmt.size or size > len(data):
data = data[1:]
if crc32(data[4:size]) & 0xffffffff != crc:
data = data[1:]
data = data[size:]
def read(self, segment, offset, id, read_data=True):
Read entry from *segment* at *offset* with *id*.
If read_data is False the size of the entry is returned instead and integrity checks are skipped.
The return value should thus be considered informational.
if segment == self.segment and self._write_fd:
fd = self.get_fd(segment)
header =
size, tag, key, data = self._read(fd, self.put_header_fmt, header, segment, offset, (TAG_PUT, ), read_data)
if id != key:
raise IntegrityError('Invalid segment entry header, is not for wanted id [segment {}, offset {}]'.format(
segment, offset))
return data if read_data else size
def _read(self, fd, fmt, header, segment, offset, acceptable_tags, read_data=True):
# some code shared by read() and iter_objects()
hdr_tuple = fmt.unpack(header)
except struct.error as err:
raise IntegrityError('Invalid segment entry header [segment {}, offset {}]: {}'.format(
segment, offset, err)) from None
if fmt is self.put_header_fmt:
crc, size, tag, key = hdr_tuple
elif fmt is self.header_fmt:
crc, size, tag = hdr_tuple
key = None
raise TypeError("_read called with unsupported format")
if size > MAX_OBJECT_SIZE:
# if you get this on an archive made with borg < 1.0.7 and millions of files and
# you need to restore it, you can disable this check by using "if False:" above.
raise IntegrityError('Invalid segment entry size {} - too big [segment {}, offset {}]'.format(
size, segment, offset))
if size < fmt.size:
raise IntegrityError('Invalid segment entry size {} - too small [segment {}, offset {}]'.format(
size, segment, offset))
length = size - fmt.size
if read_data:
data =
if len(data) != length:
raise IntegrityError('Segment entry data short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, len(data)))
if crc32(data, crc32(memoryview(header)[4:])) & 0xffffffff != crc:
raise IntegrityError('Segment entry checksum mismatch [segment {}, offset {}]'.format(
segment, offset))
if key is None and tag in (TAG_PUT, TAG_DELETE):
key, data = data[:32], data[32:]
if key is None and tag in (TAG_PUT, TAG_DELETE):
key =
length -= 32
if len(key) != 32:
raise IntegrityError('Segment entry key short read [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, 32, len(key)))
oldpos = fd.tell()
seeked =, os.SEEK_CUR) - oldpos
data = None
if seeked != length:
raise IntegrityError('Segment entry data short seek [segment {}, offset {}]: expected {}, got {} bytes'.format(
segment, offset, length, seeked))
if tag not in acceptable_tags:
raise IntegrityError('Invalid segment entry header, did not get acceptable tag [segment {}, offset {}]'.format(
segment, offset))
return size, tag, key, data
def write_put(self, id, data, raise_full=False):
data_size = len(data)
if data_size > MAX_DATA_SIZE:
# this would push the segment entry size beyond MAX_OBJECT_SIZE.
raise IntegrityError('More than allowed put data [{} > {}]'.format(data_size, MAX_DATA_SIZE))
fd = self.get_write_fd(raise_full=raise_full)
size = data_size + self.put_header_fmt.size
offset = self.offset
header = self.header_no_crc_fmt.pack(size, TAG_PUT)
crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
fd.write(b''.join((crc, header, id, data)))
self.offset += size
return self.segment, offset
def write_delete(self, id, raise_full=False):
fd = self.get_write_fd(raise_full=raise_full)
header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
fd.write(b''.join((crc, header, id)))
self.offset += self.put_header_fmt.size
return self.segment, self.put_header_fmt.size
def write_commit(self, intermediate=False):
if intermediate:
# Intermediate commits go directly into the current segment - this makes checking their validity more
# expensive, but is faster and reduces clobber.
fd = self.get_write_fd()
fd = self.get_write_fd()
header = self.header_no_crc_fmt.pack(self.header_fmt.size, TAG_COMMIT)
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
fd.write(b''.join((crc, header)))
return self.segment - 1 # close_segment() increments it
# MAX_OBJECT_SIZE = <20 MiB (MAX_DATA_SIZE) + 41 bytes for a Repository PUT header, which consists of
# a 1 byte tag ID, 4 byte CRC, 4 byte size and 32 bytes for the ID.
MAX_OBJECT_SIZE = MAX_DATA_SIZE + LoggedIO.put_header_fmt.size
assert MAX_OBJECT_SIZE == 20971520 == 20 * 1024 * 1024