mirror of https://github.com/borgbackup/borg.git
Track shadowing of log entries
Fixes (benign) index vs. log inconsistencies when segments are skipped during compaction.
This commit is contained in:
parent
21c3fb3b93
commit
ac41ebcf78
|
@ -306,6 +306,10 @@ class Repository:
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
self.check_transaction()
|
self.check_transaction()
|
||||||
self.index = self.open_index(transaction_id, False)
|
self.index = self.open_index(transaction_id, False)
|
||||||
|
# This is an index of shadowed log entries during this transaction. Consider the following sequence:
|
||||||
|
# segment_n PUT A, segment_x DELETE A
|
||||||
|
# After the "DELETE A" in segment_x the shadow index will contain "A -> (n,)".
|
||||||
|
self.shadow_index = defaultdict(list)
|
||||||
if transaction_id is None:
|
if transaction_id is None:
|
||||||
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
|
self.segments = {} # XXX bad name: usage_count_of_segment_x = self.segments[x]
|
||||||
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
|
self.compact = FreeSpace() # XXX bad name: freeable_space_of_segment_x = self.compact[x]
|
||||||
|
@ -420,9 +424,8 @@ class Repository:
|
||||||
# complete the current transfer (when some target segment is full)
|
# complete the current transfer (when some target segment is full)
|
||||||
nonlocal unused
|
nonlocal unused
|
||||||
# commit the new, compact, used segments
|
# commit the new, compact, used segments
|
||||||
self.io.write_commit(intermediate=intermediate)
|
segment = self.io.write_commit(intermediate=intermediate)
|
||||||
logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '',
|
logger.debug('complete_xfer: wrote %scommit at segment %d', 'intermediate ' if intermediate else '', segment)
|
||||||
self.io.get_latest_segment())
|
|
||||||
# get rid of the old, sparse, unused segments. free space.
|
# get rid of the old, sparse, unused segments. free space.
|
||||||
for segment in unused:
|
for segment in unused:
|
||||||
logger.debug('complete_xfer: deleting unused segment %d', segment)
|
logger.debug('complete_xfer: deleting unused segment %d', segment)
|
||||||
|
@ -445,7 +448,10 @@ class Repository:
|
||||||
logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
|
logger.debug('compacting segment %d with usage count %d and %d freeable bytes',
|
||||||
segment, segments[segment], freeable_space)
|
segment, segments[segment], freeable_space)
|
||||||
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
|
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
|
||||||
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
|
if tag == TAG_COMMIT:
|
||||||
|
continue
|
||||||
|
in_index = self.index.get(key) == (segment, offset)
|
||||||
|
if tag == TAG_PUT and in_index:
|
||||||
try:
|
try:
|
||||||
new_segment, offset = self.io.write_put(key, data, raise_full=True)
|
new_segment, offset = self.io.write_put(key, data, raise_full=True)
|
||||||
except LoggedIO.SegmentFull:
|
except LoggedIO.SegmentFull:
|
||||||
|
@ -455,8 +461,22 @@ class Repository:
|
||||||
segments.setdefault(new_segment, 0)
|
segments.setdefault(new_segment, 0)
|
||||||
segments[new_segment] += 1
|
segments[new_segment] += 1
|
||||||
segments[segment] -= 1
|
segments[segment] -= 1
|
||||||
|
elif tag == TAG_PUT and not in_index:
|
||||||
|
# If this is a PUT shadowed by a later tag, then it will be gone when this segment is deleted after
|
||||||
|
# this loop. Therefore it is removed from the shadow index.
|
||||||
|
try:
|
||||||
|
self.shadow_index[key].remove(segment)
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
pass
|
||||||
elif tag == TAG_DELETE:
|
elif tag == TAG_DELETE:
|
||||||
if index_transaction_id is None or segment > index_transaction_id:
|
# If the shadow index doesn't contain this key, then we can't say if there's a shadowed older tag,
|
||||||
|
# therefore we do not drop the delete, but write it to a current segment.
|
||||||
|
shadowed_put_exists = key not in self.shadow_index or any(
|
||||||
|
# If the key is in the shadow index and there is any segment with an older PUT of this
|
||||||
|
# key, we have a shadowed put.
|
||||||
|
shadowed < segment for shadowed in self.shadow_index[key])
|
||||||
|
|
||||||
|
if shadowed_put_exists or index_transaction_id is None or segment > index_transaction_id:
|
||||||
# (introduced in 6425d16aa84be1eaaf88)
|
# (introduced in 6425d16aa84be1eaaf88)
|
||||||
# This is needed to avoid object un-deletion if we crash between the commit and the deletion
|
# This is needed to avoid object un-deletion if we crash between the commit and the deletion
|
||||||
# of old segments in complete_xfer().
|
# of old segments in complete_xfer().
|
||||||
|
@ -714,6 +734,7 @@ class Repository:
|
||||||
segment, offset = self.index.pop(id)
|
segment, offset = self.index.pop(id)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise self.ObjectNotFound(id, self.path) from None
|
raise self.ObjectNotFound(id, self.path) from None
|
||||||
|
self.shadow_index[id].append(segment)
|
||||||
self.segments[segment] -= 1
|
self.segments[segment] -= 1
|
||||||
size = self.io.read(segment, offset, id, read_data=False)
|
size = self.io.read(segment, offset, id, read_data=False)
|
||||||
self.compact[segment] += size
|
self.compact[segment] += size
|
||||||
|
@ -1026,6 +1047,7 @@ class LoggedIO:
|
||||||
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
|
crc = self.crc_fmt.pack(crc32(header) & 0xffffffff)
|
||||||
fd.write(b''.join((crc, header)))
|
fd.write(b''.join((crc, header)))
|
||||||
self.close_segment()
|
self.close_segment()
|
||||||
|
return self.segment - 1 # close_segment() increments it
|
||||||
|
|
||||||
|
|
||||||
MAX_DATA_SIZE = MAX_OBJECT_SIZE - LoggedIO.put_header_fmt.size
|
MAX_DATA_SIZE = MAX_OBJECT_SIZE - LoggedIO.put_header_fmt.size
|
||||||
|
|
|
@ -293,7 +293,42 @@ class RepositoryCommitTestCase(RepositoryTestCaseBase):
|
||||||
assert not self.repository.io.segment_exists(last_segment)
|
assert not self.repository.io.segment_exists(last_segment)
|
||||||
for segment, _ in self.repository.io.segment_iterator():
|
for segment, _ in self.repository.io.segment_iterator():
|
||||||
for tag, key, offset, size in self.repository.io.iter_objects(segment):
|
for tag, key, offset, size in self.repository.io.iter_objects(segment):
|
||||||
assert tag != TAG_DELETE
|
if tag == TAG_DELETE:
|
||||||
|
assert segment in self.repository.compact
|
||||||
|
|
||||||
|
def test_shadowed_entries_are_preserved(self):
|
||||||
|
get_latest_segment = self.repository.io.get_latest_segment
|
||||||
|
self.repository.put(H(1), b'1')
|
||||||
|
# This is the segment with our original PUT of interest
|
||||||
|
put_segment = get_latest_segment()
|
||||||
|
self.repository.commit()
|
||||||
|
|
||||||
|
# We now delete H(1), and force this segment to not be compacted, which can happen
|
||||||
|
# if it's not sparse enough (symbolized by H(2) here).
|
||||||
|
self.repository.delete(H(1))
|
||||||
|
self.repository.put(H(2), b'1')
|
||||||
|
delete_segment = get_latest_segment()
|
||||||
|
|
||||||
|
# We pretend these are mostly dense (not sparse) and won't be compacted
|
||||||
|
del self.repository.compact[put_segment]
|
||||||
|
del self.repository.compact[delete_segment]
|
||||||
|
|
||||||
|
self.repository.commit()
|
||||||
|
|
||||||
|
# Now we perform an unrelated operation on the segment containing the DELETE,
|
||||||
|
# causing it to be compacted.
|
||||||
|
self.repository.delete(H(2))
|
||||||
|
self.repository.commit()
|
||||||
|
|
||||||
|
assert self.repository.io.segment_exists(put_segment)
|
||||||
|
assert not self.repository.io.segment_exists(delete_segment)
|
||||||
|
|
||||||
|
# Basic case, since the index survived this must be ok
|
||||||
|
assert H(1) not in self.repository
|
||||||
|
# Nuke index, force replay
|
||||||
|
os.unlink(os.path.join(self.repository.path, 'index.%d' % get_latest_segment()))
|
||||||
|
# Must not reappear
|
||||||
|
assert H(1) not in self.repository
|
||||||
|
|
||||||
|
|
||||||
class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
|
class RepositoryAppendOnlyTestCase(RepositoryTestCaseBase):
|
||||||
|
|
Loading…
Reference in New Issue