compact_segments: save_space -> free unused segments quickly

as soon as one target segment is full, it is a good time to commit it and remove the source segments
that are already completely unused (because they were transferred int the target segment).

so, for compact_segments(save_space=True), the additional space needed should be about 1 segment size.

note: we can't just do that at the end of one source segment as this might create very small
target segments, which is not wanted.
This commit is contained in:
Thomas Waldmann 2015-11-18 02:27:25 +01:00
parent 5f1fcb3e63
commit 0c076ad114
5 changed files with 64 additions and 30 deletions

View File

@ -661,7 +661,7 @@ class ArchiveChecker:
self.error_found = False
self.possibly_superseded = set()
def check(self, repository, repair=False, archive=None, last=None):
def check(self, repository, repair=False, archive=None, last=None, save_space=False):
logger.info('Starting archive consistency check...')
self.check_all = archive is None and last is None
self.repair = repair
@ -676,7 +676,7 @@ class ArchiveChecker:
self.manifest, _ = Manifest.load(repository, key=self.key)
self.rebuild_refcounts(archive=archive, last=last)
self.orphan_chunks_check()
self.finish()
self.finish(save_space=save_space)
if self.error_found:
logger.error('Archive consistency check complete, problems found.')
else:
@ -885,7 +885,7 @@ class ArchiveChecker:
else:
logger.warning('Orphaned objects check skipped (needs all archives checked).')
def finish(self):
def finish(self, save_space=False):
if self.repair:
self.manifest.write()
self.repository.commit()
self.repository.commit(save_space=save_space)

View File

@ -105,10 +105,11 @@ class Archiver:
env_var_override='BORG_CHECK_I_KNOW_WHAT_I_AM_DOING', truish=('YES', )):
return EXIT_ERROR
if not args.archives_only:
if not repository.check(repair=args.repair):
if not repository.check(repair=args.repair, save_space=args.save_space):
return EXIT_WARNING
if not args.repo_only and not ArchiveChecker().check(
repository, repair=args.repair, archive=args.repository.archive, last=args.last):
repository, repair=args.repair, archive=args.repository.archive,
last=args.last, save_space=args.save_space):
return EXIT_WARNING
return EXIT_SUCCESS
@ -332,7 +333,7 @@ class Archiver:
stats = Statistics()
archive.delete(stats)
manifest.write()
repository.commit()
repository.commit(save_space=args.save_space)
cache.commit()
if args.stats:
logger.info(stats.summary.format(label='Deleted data:', stats=stats))
@ -487,7 +488,7 @@ class Archiver:
Archive(repository, key, manifest, archive.name, cache).delete(stats)
if to_delete and not args.dry_run:
manifest.write()
repository.commit()
repository.commit(save_space=args.save_space)
cache.commit()
if args.stats:
logger.info(stats.summary.format(label='Deleted data:', stats=stats))
@ -762,6 +763,9 @@ class Archiver:
subparser.add_argument('--repair', dest='repair', action='store_true',
default=False,
help='attempt to repair any inconsistencies found')
subparser.add_argument('--save-space', dest='save_space', action='store_true',
default=False,
help='work slower, but using less space')
subparser.add_argument('--last', dest='last',
type=int, default=None, metavar='N',
help='only check last N archives (Default: all)')
@ -926,6 +930,9 @@ class Archiver:
subparser.add_argument('-c', '--cache-only', dest='cache_only',
action='store_true', default=False,
help='delete only the local cache for the given repository')
subparser.add_argument('--save-space', dest='save_space', action='store_true',
default=False,
help='work slower, but using less space')
subparser.add_argument('target', metavar='TARGET', nargs='?', default='',
type=location_validator(),
help='archive or repository to delete')
@ -1043,6 +1050,9 @@ class Archiver:
help='number of yearly archives to keep')
subparser.add_argument('-p', '--prefix', dest='prefix', type=str,
help='only consider archive names starting with this prefix')
subparser.add_argument('--save-space', dest='save_space', action='store_true',
default=False,
help='work slower, but using less space')
subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='',
type=location_validator(archive=False),
help='repository to prune')

View File

@ -273,11 +273,11 @@ class RemoteRepository:
w_fds = []
self.ignore_responses |= set(waiting_for)
def check(self, repair=False):
return self.call('check', repair)
def check(self, repair=False, save_space=False):
return self.call('check', repair, save_space)
def commit(self, *args):
return self.call('commit')
def commit(self, save_space=False):
return self.call('commit', save_space)
def rollback(self, *args):
return self.call('rollback')

View File

@ -158,11 +158,11 @@ class Repository:
self.lock.release()
self.lock = None
def commit(self):
def commit(self, save_space=False):
"""Commit transaction
"""
self.io.write_commit()
self.compact_segments()
self.compact_segments(save_space=save_space)
self.write_index()
self.rollback()
@ -220,31 +220,50 @@ class Repository:
os.unlink(os.path.join(self.path, name))
self.index = None
def compact_segments(self):
def compact_segments(self, save_space=False):
"""Compact sparse segments by copying data into new segments
"""
if not self.compact:
return
index_transaction_id = self.get_index_transaction_id()
segments = self.segments
unused = [] # list of segments, that are not used anymore
def complete_xfer():
# complete the transfer (usually exactly when some target segment
# is full, or at the very end when everything is processed)
nonlocal unused
# commit the new, compact, used segments
self.io.write_commit()
# get rid of the old, sparse, unused segments. free space.
for segment in unused:
assert self.segments.pop(segment) == 0
self.io.delete_segment(segment)
unused = []
for segment in sorted(self.compact):
if self.io.segment_exists(segment):
for tag, key, offset, data in self.io.iter_objects(segment, include_data=True):
if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset):
new_segment, offset = self.io.write_put(key, data)
try:
new_segment, offset = self.io.write_put(key, data, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
new_segment, offset = self.io.write_put(key, data)
self.index[key] = new_segment, offset
segments.setdefault(new_segment, 0)
segments[new_segment] += 1
segments[segment] -= 1
elif tag == TAG_DELETE:
if index_transaction_id is None or segment > index_transaction_id:
self.io.write_delete(key)
try:
self.io.write_delete(key, raise_full=save_space)
except LoggedIO.SegmentFull:
complete_xfer()
self.io.write_delete(key)
assert segments[segment] == 0
self.io.write_commit()
for segment in sorted(self.compact):
assert self.segments.pop(segment) == 0
self.io.delete_segment(segment)
unused.append(segment)
complete_xfer()
self.compact = set()
def replay_segments(self, index_transaction_id, segments_transaction_id):
@ -297,7 +316,7 @@ class Repository:
if self.segments[segment] == 0:
self.compact.add(segment)
def check(self, repair=False):
def check(self, repair=False, save_space=False):
"""Check repository consistency
This method verifies all segment checksums and makes sure
@ -358,7 +377,7 @@ class Repository:
if current_index.get(key, (-1, -1)) != value:
report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1))))
if repair:
self.compact_segments()
self.compact_segments(save_space=save_space)
self.write_index()
self.rollback()
if error_found:
@ -441,6 +460,9 @@ class Repository:
class LoggedIO:
class SegmentFull(Exception):
"""raised when a segment is full, before opening next"""
header_fmt = struct.Struct('<IIB')
assert header_fmt.size == 9
put_header_fmt = struct.Struct('<IIB32s')
@ -517,8 +539,10 @@ class LoggedIO:
def segment_filename(self, segment):
return os.path.join(self.path, 'data', str(segment // self.segments_per_dir), str(segment))
def get_write_fd(self, no_new=False):
def get_write_fd(self, no_new=False, raise_full=False):
if not no_new and self.offset and self.offset > self.limit:
if raise_full:
raise self.SegmentFull
self.close_segment()
if not self._write_fd:
if self.segment % self.segments_per_dir == 0:
@ -630,9 +654,9 @@ class LoggedIO:
key, data = data[:32], data[32:]
return size, tag, key, data
def write_put(self, id, data):
def write_put(self, id, data, raise_full=False):
fd = self.get_write_fd(raise_full=raise_full)
size = len(data) + self.put_header_fmt.size
fd = self.get_write_fd()
offset = self.offset
header = self.header_no_crc_fmt.pack(size, TAG_PUT)
crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff)
@ -640,8 +664,8 @@ class LoggedIO:
self.offset += size
return self.segment, offset
def write_delete(self, id):
fd = self.get_write_fd()
def write_delete(self, id, raise_full=False):
fd = self.get_write_fd(raise_full=raise_full)
header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE)
crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff)
fd.write(b''.join((crc, header, id)))

View File

@ -311,7 +311,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase):
# Simulate a crash before compact
with patch.object(Repository, 'compact_segments') as compact:
self.repository.commit()
compact.assert_called_once_with()
compact.assert_called_once_with(save_space=False)
self.reopen()
self.check(repair=True)
self.assert_equal(self.repository.get(bytes(32)), b'data2')