From 0c076ad114206a370d129eae3e6771e856cb1d67 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Wed, 18 Nov 2015 02:27:25 +0100 Subject: [PATCH] compact_segments: save_space -> free unused segments quickly as soon as one target segment is full, it is a good time to commit it and remove the source segments that are already completely unused (because they were transferred int the target segment). so, for compact_segments(save_space=True), the additional space needed should be about 1 segment size. note: we can't just do that at the end of one source segment as this might create very small target segments, which is not wanted. --- borg/archive.py | 8 ++--- borg/archiver.py | 18 ++++++++--- borg/remote.py | 8 ++--- borg/repository.py | 58 +++++++++++++++++++++++++----------- borg/testsuite/repository.py | 2 +- 5 files changed, 64 insertions(+), 30 deletions(-) diff --git a/borg/archive.py b/borg/archive.py index f4d98a3a7..011d7845a 100644 --- a/borg/archive.py +++ b/borg/archive.py @@ -661,7 +661,7 @@ class ArchiveChecker: self.error_found = False self.possibly_superseded = set() - def check(self, repository, repair=False, archive=None, last=None): + def check(self, repository, repair=False, archive=None, last=None, save_space=False): logger.info('Starting archive consistency check...') self.check_all = archive is None and last is None self.repair = repair @@ -676,7 +676,7 @@ class ArchiveChecker: self.manifest, _ = Manifest.load(repository, key=self.key) self.rebuild_refcounts(archive=archive, last=last) self.orphan_chunks_check() - self.finish() + self.finish(save_space=save_space) if self.error_found: logger.error('Archive consistency check complete, problems found.') else: @@ -885,7 +885,7 @@ class ArchiveChecker: else: logger.warning('Orphaned objects check skipped (needs all archives checked).') - def finish(self): + def finish(self, save_space=False): if self.repair: self.manifest.write() - self.repository.commit() + self.repository.commit(save_space=save_space) diff --git a/borg/archiver.py b/borg/archiver.py index fcccaccf6..187788d22 100644 --- a/borg/archiver.py +++ b/borg/archiver.py @@ -105,10 +105,11 @@ class Archiver: env_var_override='BORG_CHECK_I_KNOW_WHAT_I_AM_DOING', truish=('YES', )): return EXIT_ERROR if not args.archives_only: - if not repository.check(repair=args.repair): + if not repository.check(repair=args.repair, save_space=args.save_space): return EXIT_WARNING if not args.repo_only and not ArchiveChecker().check( - repository, repair=args.repair, archive=args.repository.archive, last=args.last): + repository, repair=args.repair, archive=args.repository.archive, + last=args.last, save_space=args.save_space): return EXIT_WARNING return EXIT_SUCCESS @@ -332,7 +333,7 @@ class Archiver: stats = Statistics() archive.delete(stats) manifest.write() - repository.commit() + repository.commit(save_space=args.save_space) cache.commit() if args.stats: logger.info(stats.summary.format(label='Deleted data:', stats=stats)) @@ -487,7 +488,7 @@ class Archiver: Archive(repository, key, manifest, archive.name, cache).delete(stats) if to_delete and not args.dry_run: manifest.write() - repository.commit() + repository.commit(save_space=args.save_space) cache.commit() if args.stats: logger.info(stats.summary.format(label='Deleted data:', stats=stats)) @@ -762,6 +763,9 @@ class Archiver: subparser.add_argument('--repair', dest='repair', action='store_true', default=False, help='attempt to repair any inconsistencies found') + subparser.add_argument('--save-space', dest='save_space', action='store_true', + default=False, + help='work slower, but using less space') subparser.add_argument('--last', dest='last', type=int, default=None, metavar='N', help='only check last N archives (Default: all)') @@ -926,6 +930,9 @@ class Archiver: subparser.add_argument('-c', '--cache-only', dest='cache_only', action='store_true', default=False, help='delete only the local cache for the given repository') + subparser.add_argument('--save-space', dest='save_space', action='store_true', + default=False, + help='work slower, but using less space') subparser.add_argument('target', metavar='TARGET', nargs='?', default='', type=location_validator(), help='archive or repository to delete') @@ -1043,6 +1050,9 @@ class Archiver: help='number of yearly archives to keep') subparser.add_argument('-p', '--prefix', dest='prefix', type=str, help='only consider archive names starting with this prefix') + subparser.add_argument('--save-space', dest='save_space', action='store_true', + default=False, + help='work slower, but using less space') subparser.add_argument('repository', metavar='REPOSITORY', nargs='?', default='', type=location_validator(archive=False), help='repository to prune') diff --git a/borg/remote.py b/borg/remote.py index 76feecced..f724b80d7 100644 --- a/borg/remote.py +++ b/borg/remote.py @@ -273,11 +273,11 @@ class RemoteRepository: w_fds = [] self.ignore_responses |= set(waiting_for) - def check(self, repair=False): - return self.call('check', repair) + def check(self, repair=False, save_space=False): + return self.call('check', repair, save_space) - def commit(self, *args): - return self.call('commit') + def commit(self, save_space=False): + return self.call('commit', save_space) def rollback(self, *args): return self.call('rollback') diff --git a/borg/repository.py b/borg/repository.py index 3765d9c50..36cd0f66f 100644 --- a/borg/repository.py +++ b/borg/repository.py @@ -158,11 +158,11 @@ class Repository: self.lock.release() self.lock = None - def commit(self): + def commit(self, save_space=False): """Commit transaction """ self.io.write_commit() - self.compact_segments() + self.compact_segments(save_space=save_space) self.write_index() self.rollback() @@ -220,31 +220,50 @@ class Repository: os.unlink(os.path.join(self.path, name)) self.index = None - def compact_segments(self): + def compact_segments(self, save_space=False): """Compact sparse segments by copying data into new segments """ if not self.compact: return index_transaction_id = self.get_index_transaction_id() segments = self.segments + unused = [] # list of segments, that are not used anymore + + def complete_xfer(): + # complete the transfer (usually exactly when some target segment + # is full, or at the very end when everything is processed) + nonlocal unused + # commit the new, compact, used segments + self.io.write_commit() + # get rid of the old, sparse, unused segments. free space. + for segment in unused: + assert self.segments.pop(segment) == 0 + self.io.delete_segment(segment) + unused = [] + for segment in sorted(self.compact): if self.io.segment_exists(segment): for tag, key, offset, data in self.io.iter_objects(segment, include_data=True): if tag == TAG_PUT and self.index.get(key, (-1, -1)) == (segment, offset): - new_segment, offset = self.io.write_put(key, data) + try: + new_segment, offset = self.io.write_put(key, data, raise_full=save_space) + except LoggedIO.SegmentFull: + complete_xfer() + new_segment, offset = self.io.write_put(key, data) self.index[key] = new_segment, offset segments.setdefault(new_segment, 0) segments[new_segment] += 1 segments[segment] -= 1 elif tag == TAG_DELETE: if index_transaction_id is None or segment > index_transaction_id: - self.io.write_delete(key) + try: + self.io.write_delete(key, raise_full=save_space) + except LoggedIO.SegmentFull: + complete_xfer() + self.io.write_delete(key) assert segments[segment] == 0 - - self.io.write_commit() - for segment in sorted(self.compact): - assert self.segments.pop(segment) == 0 - self.io.delete_segment(segment) + unused.append(segment) + complete_xfer() self.compact = set() def replay_segments(self, index_transaction_id, segments_transaction_id): @@ -297,7 +316,7 @@ class Repository: if self.segments[segment] == 0: self.compact.add(segment) - def check(self, repair=False): + def check(self, repair=False, save_space=False): """Check repository consistency This method verifies all segment checksums and makes sure @@ -358,7 +377,7 @@ class Repository: if current_index.get(key, (-1, -1)) != value: report_error('Index mismatch for key {}. {} != {}'.format(key, value, current_index.get(key, (-1, -1)))) if repair: - self.compact_segments() + self.compact_segments(save_space=save_space) self.write_index() self.rollback() if error_found: @@ -441,6 +460,9 @@ class Repository: class LoggedIO: + class SegmentFull(Exception): + """raised when a segment is full, before opening next""" + header_fmt = struct.Struct(' self.limit: + if raise_full: + raise self.SegmentFull self.close_segment() if not self._write_fd: if self.segment % self.segments_per_dir == 0: @@ -630,9 +654,9 @@ class LoggedIO: key, data = data[:32], data[32:] return size, tag, key, data - def write_put(self, id, data): + def write_put(self, id, data, raise_full=False): + fd = self.get_write_fd(raise_full=raise_full) size = len(data) + self.put_header_fmt.size - fd = self.get_write_fd() offset = self.offset header = self.header_no_crc_fmt.pack(size, TAG_PUT) crc = self.crc_fmt.pack(crc32(data, crc32(id, crc32(header))) & 0xffffffff) @@ -640,8 +664,8 @@ class LoggedIO: self.offset += size return self.segment, offset - def write_delete(self, id): - fd = self.get_write_fd() + def write_delete(self, id, raise_full=False): + fd = self.get_write_fd(raise_full=raise_full) header = self.header_no_crc_fmt.pack(self.put_header_fmt.size, TAG_DELETE) crc = self.crc_fmt.pack(crc32(id, crc32(header)) & 0xffffffff) fd.write(b''.join((crc, header, id))) diff --git a/borg/testsuite/repository.py b/borg/testsuite/repository.py index 713f44029..4094df407 100644 --- a/borg/testsuite/repository.py +++ b/borg/testsuite/repository.py @@ -311,7 +311,7 @@ class RepositoryCheckTestCase(RepositoryTestCaseBase): # Simulate a crash before compact with patch.object(Repository, 'compact_segments') as compact: self.repository.commit() - compact.assert_called_once_with() + compact.assert_called_once_with(save_space=False) self.reopen() self.check(repair=True) self.assert_equal(self.repository.get(bytes(32)), b'data2')