From 44935aa8eacf2742c3ed7edd7b7de409ddad89ad Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 19 Nov 2016 16:49:20 +0100 Subject: [PATCH 1/3] recreate: remove interruption blah, autocommit blah, resuming blah --- src/borg/archive.py | 136 +++------------------------------ src/borg/archiver.py | 50 +++++------- src/borg/testsuite/archiver.py | 100 ------------------------ 3 files changed, 29 insertions(+), 257 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 263536f20..269263f9d 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1371,9 +1371,6 @@ def finish(self, save_space=False): class ArchiveRecreater: - AUTOCOMMIT_THRESHOLD = 512 * 1024 * 1024 - """Commit (compact segments) after this many (or 1 % of repository size, whichever is greater) bytes.""" - class FakeTargetArchive: def __init__(self): self.stats = Statistics() @@ -1409,9 +1406,6 @@ def __init__(self, repository, manifest, key, cache, matcher, compression_files or []) key.compression_decider2 = CompressionDecider2(compression or CompressionSpec('none')) - self.autocommit_threshold = max(self.AUTOCOMMIT_THRESHOLD, self.cache.chunks_stored_size() / 100) - logger.debug("Autocommit threshold: %s", format_file_size(self.autocommit_threshold)) - self.dry_run = dry_run self.stats = stats self.progress = progress @@ -1423,20 +1417,17 @@ def __init__(self, repository, manifest, key, cache, matcher, def recreate(self, archive_name, comment=None, target_name=None): assert not self.is_temporary_archive(archive_name) archive = self.open_archive(archive_name) - target, resume_from = self.create_target_or_resume(archive, target_name) + target = self.create_target(archive, target_name) if self.exclude_if_present or self.exclude_caches: self.matcher_add_tagged_dirs(archive) if self.matcher.empty() and not self.recompress and not target.recreate_rechunkify and comment is None: logger.info("Skipping archive %s, nothing to do", archive_name) return True - try: - self.process_items(archive, target, resume_from) - except self.Interrupted as e: - return self.save(archive, target, completed=False, metadata=e.metadata) + self.process_items(archive, target) replace_original = target_name is None return self.save(archive, target, comment, replace_original=replace_original) - def process_items(self, archive, target, resume_from=None): + def process_items(self, archive, target): matcher = self.matcher target_is_subset = not matcher.empty() hardlink_masters = {} if target_is_subset else None @@ -1450,15 +1441,8 @@ def item_is_hardlink_master(item): for item in archive.iter_items(): if item_is_hardlink_master(item): - # Re-visit all of these items in the archive even when fast-forwarding to rebuild hardlink_masters hardlink_masters[item.path] = (item.get('chunks'), None) continue - if resume_from: - # Fast forward to after the last processed file - if item.path == resume_from: - logger.info('Fast-forwarded to %s', remove_surrogates(item.path)) - resume_from = None - continue if not matcher.match(item.path): self.print_file_status('x', item.path) continue @@ -1476,12 +1460,7 @@ def item_is_hardlink_master(item): if self.dry_run: self.print_file_status('-', item.path) else: - try: - self.process_item(archive, target, item) - except self.Interrupted: - if self.progress: - target.stats.show_progress(final=True) - raise + self.process_item(archive, target, item) if self.progress: target.stats.show_progress(final=True) @@ -1491,8 +1470,6 @@ def process_item(self, archive, target, item): target.stats.nfiles += 1 target.add_item(item) self.print_file_status(file_status(item.mode), item.path) - if self.interrupt: - raise self.Interrupted def process_chunks(self, archive, target, item): """Return new chunk ID list for 'item'.""" @@ -1500,9 +1477,8 @@ def process_chunks(self, archive, target, item): for chunk_id, size, csize in item.chunks: self.cache.chunk_incref(chunk_id, target.stats) return item.chunks - new_chunks = self.process_partial_chunks(target) + new_chunks = [] chunk_iterator = self.create_chunk_iterator(archive, target, item) - consume(chunk_iterator, len(new_chunks)) compress = self.compression_decider1.decide(item.path) for chunk in chunk_iterator: chunk.meta['compress'] = compress @@ -1521,20 +1497,8 @@ def process_chunks(self, archive, target, item): chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) new_chunks.append((chunk_id, size, csize)) self.seen_chunks.add(chunk_id) - if self.recompress and self.cache.seen_chunk(chunk_id) == 1: - # This tracks how many bytes are uncommitted but compactable, since we are recompressing - # existing chunks. - target.recreate_uncomitted_bytes += csize - if target.recreate_uncomitted_bytes >= self.autocommit_threshold: - # Issue commits to limit additional space usage when recompressing chunks - target.recreate_uncomitted_bytes = 0 - self.repository.commit() if self.progress: target.stats.show_progress(item=item, dt=0.2) - if self.interrupt: - raise self.Interrupted({ - 'recreate_partial_chunks': new_chunks, - }) return new_chunks def create_chunk_iterator(self, archive, target, item): @@ -1552,19 +1516,6 @@ def _chunk_iterator(): chunk_iterator = _chunk_iterator() return chunk_iterator - def process_partial_chunks(self, target): - """Return chunks from a previous run for archive 'target' (if any) or an empty list.""" - if not target.recreate_partial_chunks: - return [] - # No incref, create_target_or_resume already did that before to deleting the old target archive - # So just copy these over - partial_chunks = target.recreate_partial_chunks - target.recreate_partial_chunks = None - for chunk_id, size, csize in partial_chunks: - self.seen_chunks.add(chunk_id) - logger.debug('Copied %d chunks from a partially processed item', len(partial_chunks)) - return partial_chunks - def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True): """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict.""" if self.dry_run: @@ -1631,84 +1582,15 @@ def exclude(dir, tag_item): matcher.add(tag_files, True) matcher.add(tagged_dirs, False) - def create_target_or_resume(self, archive, target_name=None): - """Create new target archive or resume from temporary archive, if it exists. Return archive, resume from path""" + def create_target(self, archive, target_name=None): + """Create target archive.""" if self.dry_run: return self.FakeTargetArchive(), None target_name = target_name or archive.name + '.recreate' - resume = target_name in self.manifest.archives - target, resume_from = None, None - if resume: - target, resume_from = self.try_resume(archive, target_name) - if not target: - target = self.create_target_archive(target_name) + target = self.create_target_archive(target_name) # If the archives use the same chunker params, then don't rechunkify target.recreate_rechunkify = tuple(archive.metadata.get('chunker_params', [])) != self.chunker_params - return target, resume_from - - def try_resume(self, archive, target_name): - """Try to resume from temporary archive. Return (target archive, resume from path) if successful.""" - logger.info('Found %s, will resume interrupted operation', target_name) - old_target = self.open_archive(target_name) - if not self.can_resume(archive, old_target, target_name): - return None, None - target = self.create_target_archive(target_name + '.temp') - logger.info('Replaying items from interrupted operation...') - last_old_item = self.copy_items(old_target, target) - resume_from = getattr(last_old_item, 'path', None) - self.incref_partial_chunks(old_target, target) - old_target.delete(Statistics(), progress=self.progress) - logger.info('Done replaying items') - return target, resume_from - - def incref_partial_chunks(self, source_archive, target_archive): - target_archive.recreate_partial_chunks = source_archive.metadata.get('recreate_partial_chunks', []) - for chunk_id, size, csize in target_archive.recreate_partial_chunks: - if not self.cache.seen_chunk(chunk_id): - try: - # Repository has __contains__, RemoteRepository doesn't - # `chunk_id in repo` doesn't read the data though, so we try to use that if possible. - get_or_in = getattr(self.repository, '__contains__', self.repository.get) - if get_or_in(chunk_id) is False: - raise Repository.ObjectNotFound(chunk_id, self.repository) - except Repository.ObjectNotFound: - # delete/prune/check between invocations: these chunks are gone. - target_archive.recreate_partial_chunks = None - break - # fast-lane insert into chunks cache - self.cache.chunks[chunk_id] = (1, size, csize) - target_archive.stats.update(size, csize, True) - continue - # incref now, otherwise a source_archive.delete() might delete these chunks - self.cache.chunk_incref(chunk_id, target_archive.stats) - - def copy_items(self, source_archive, target_archive): - item = None - for item in source_archive.iter_items(): - if 'chunks' in item: - for chunk in item.chunks: - self.cache.chunk_incref(chunk.id, target_archive.stats) - target_archive.stats.nfiles += 1 - target_archive.add_item(item) - if self.progress: - target_archive.stats.show_progress(final=True) - return item - - def can_resume(self, archive, old_target, target_name): - resume_id = old_target.metadata.recreate_source_id - resume_args = [safe_decode(arg) for arg in old_target.metadata.recreate_args] - if resume_id != archive.id: - logger.warning('Source archive changed, will discard %s and start over', target_name) - logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id)) - logger.warning('Current fingerprint: %s', archive.fpr) - old_target.delete(Statistics(), progress=self.progress) - return False - if resume_args != sys.argv[1:]: - logger.warning('Command line changed, this might lead to inconsistencies') - logger.warning('Saved: %s', repr(resume_args)) - logger.warning('Current: %s', repr(sys.argv[1:])) - # Just warn in this case, don't start over - return True + return target def create_target_archive(self, name): target = Archive(self.repository, self.key, self.manifest, name, create=True, diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 59f4e17bb..3c63e37e5 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -1058,13 +1058,6 @@ def do_upgrade(self, args): @with_repository(cache=True, exclusive=True) def do_recreate(self, args, repository, manifest, key, cache): """Re-create archives""" - def interrupt(signal_num, stack_frame): - if recreater.interrupt: - print("\nReceived signal, again. I'm not deaf.", file=sys.stderr) - else: - print("\nReceived signal, will exit cleanly.", file=sys.stderr) - recreater.interrupt = True - msg = ("recreate is an experimental feature.\n" "Type 'YES' if you understand this and want to continue: ") if not yes(msg, false_msg="Aborting.", truish=('YES',), @@ -1084,30 +1077,27 @@ def interrupt(signal_num, stack_frame): file_status_printer=self.print_file_status, dry_run=args.dry_run) - with signal_handler(signal.SIGTERM, interrupt), \ - signal_handler(signal.SIGINT, interrupt), \ - signal_handler(signal.SIGHUP, interrupt): - if args.location.archive: - name = args.location.archive + if args.location.archive: + name = args.location.archive + if recreater.is_temporary_archive(name): + self.print_error('Refusing to work on temporary archive of prior recreate: %s', name) + return self.exit_code + recreater.recreate(name, args.comment, args.target) + else: + if args.target is not None: + self.print_error('--target: Need to specify single archive') + return self.exit_code + for archive in manifest.archives.list(sort_by=['ts']): + name = archive.name if recreater.is_temporary_archive(name): - self.print_error('Refusing to work on temporary archive of prior recreate: %s', name) - return self.exit_code - recreater.recreate(name, args.comment, args.target) - else: - if args.target is not None: - self.print_error('--target: Need to specify single archive') - return self.exit_code - for archive in manifest.archives.list(sort_by=['ts']): - name = archive.name - if recreater.is_temporary_archive(name): - continue - print('Processing', name) - if not recreater.recreate(name, args.comment): - break - manifest.write() - repository.commit() - cache.commit() - return self.exit_code + continue + print('Processing', name) + if not recreater.recreate(name, args.comment): + break + manifest.write() + repository.commit() + cache.commit() + return self.exit_code @with_repository(manifest=False, exclusive=True) def do_with_lock(self, args, repository): diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index ffa7cccd6..da3720563 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -1717,106 +1717,6 @@ def test_recreate_dry_run(self): archives_after = self.cmd('list', self.repository_location + '::test') assert archives_after == archives_before - def _recreate_interrupt_patch(self, interrupt_after_n_1_files): - def interrupt(self, *args): - if interrupt_after_n_1_files: - self.interrupt = True - pi_save(self, *args) - else: - raise ArchiveRecreater.Interrupted - - def process_item_patch(*args): - return pi_call.pop(0)(*args) - - pi_save = ArchiveRecreater.process_item - pi_call = [pi_save] * interrupt_after_n_1_files + [interrupt] - return process_item_patch - - def _test_recreate_interrupt(self, change_args, interrupt_early): - self.create_test_files() - self.create_regular_file('dir2/abcdef', size=1024 * 80) - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - process_files = 1 - if interrupt_early: - process_files = 0 - with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(process_files)): - self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'test.recreate' in self.cmd('list', self.repository_location) - if change_args: - with patch.object(sys, 'argv', sys.argv + ['non-forking tests don\'t use sys.argv']): - output = self.cmd('recreate', '-sv', '--list', '-pC', 'lz4', self.repository_location, 'input/dir2') - else: - output = self.cmd('recreate', '-sv', '--list', self.repository_location, 'input/dir2') - assert 'Found test.recreate, will resume' in output - assert change_args == ('Command line changed' in output) - if not interrupt_early: - assert 'Fast-forwarded to input/dir2/abcdef' in output - assert 'A input/dir2/abcdef' not in output - assert 'A input/dir2/file2' in output - archives = self.cmd('list', self.repository_location) - assert 'test.recreate' not in archives - assert 'test' in archives - files = self.cmd('list', self.repository_location + '::test') - assert 'dir2/file2' in files - assert 'dir2/abcdef' in files - assert 'file1' not in files - - # The _test_create_interrupt requires a deterministic (alphabetic) order of the files to easily check if - # resumption works correctly. Patch scandir_inorder to work in alphabetic order. - - def test_recreate_interrupt(self): - with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic): - self._test_recreate_interrupt(False, True) - - def test_recreate_interrupt2(self): - with patch.object(helpers, 'scandir_inorder', helpers.scandir_generic): - self._test_recreate_interrupt(True, False) - - def _test_recreate_chunker_interrupt_patch(self): - real_add_chunk = Cache.add_chunk - - def add_chunk(*args, **kwargs): - frame = inspect.stack()[2] - try: - caller_self = frame[0].f_locals['self'] - if isinstance(caller_self, ArchiveRecreater): - caller_self.interrupt = True - finally: - del frame - return real_add_chunk(*args, **kwargs) - return add_chunk - - def test_recreate_rechunkify_interrupt(self): - self.create_regular_file('file1', size=1024 * 80) - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - archive_before = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}') - with patch.object(Cache, 'add_chunk', self._test_recreate_chunker_interrupt_patch()): - self.cmd('recreate', '-pv', '--chunker-params', '10,13,11,4095', self.repository_location) - assert 'test.recreate' in self.cmd('list', self.repository_location) - output = self.cmd('recreate', '-svp', '--debug', '--chunker-params', '10,13,11,4095', self.repository_location) - assert 'Found test.recreate, will resume' in output - assert 'Copied 1 chunks from a partially processed item' in output - archive_after = self.cmd('list', self.repository_location + '::test', '--format', '{sha512}') - assert archive_after == archive_before - - def test_recreate_changed_source(self): - self.create_test_files() - self.cmd('init', self.repository_location) - self.cmd('create', self.repository_location + '::test', 'input') - with patch.object(ArchiveRecreater, 'process_item', self._recreate_interrupt_patch(1)): - self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'test.recreate' in self.cmd('list', self.repository_location) - self.cmd('delete', self.repository_location + '::test') - self.cmd('create', self.repository_location + '::test', 'input') - output = self.cmd('recreate', self.repository_location, 'input/dir2') - assert 'Source archive changed, will discard test.recreate and start over' in output - - def test_recreate_refuses_temporary(self): - self.cmd('init', self.repository_location) - self.cmd('recreate', self.repository_location + '::cba.recreate', exit_code=2) - def test_recreate_skips_nothing_to_do(self): self.create_regular_file('file1', size=1024 * 80) self.cmd('init', self.repository_location) From 93b03ea23109004d8876c1af1690e8c9ec33bcfc Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sat, 19 Nov 2016 19:09:47 +0100 Subject: [PATCH 2/3] recreate: re-use existing checkpoint functionality --- src/borg/archive.py | 161 ++++++++++++++++++++----------------------- src/borg/archiver.py | 6 +- 2 files changed, 79 insertions(+), 88 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 269263f9d..cc90f165f 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -6,6 +6,7 @@ import time from contextlib import contextmanager from datetime import datetime, timezone +from functools import partial from getpass import getuser from io import BytesIO from itertools import groupby @@ -741,28 +742,32 @@ def process_symlink(self, path, st): self.add_item(item) return 's' # symlink - def chunk_file(self, item, cache, stats, fd, fh=-1, **chunk_kw): - def write_part(item, from_chunk, number): - item = Item(internal_dict=item.as_dict()) - length = len(item.chunks) - # the item should only have the *additional* chunks we processed after the last partial item: - item.chunks = item.chunks[from_chunk:] - item.path += '.borg_part_%d' % number - item.part = number - number += 1 - self.add_item(item, show_progress=False) - self.write_checkpoint() - return length, number + def write_part_file(self, item, from_chunk, number): + item = Item(internal_dict=item.as_dict()) + length = len(item.chunks) + # the item should only have the *additional* chunks we processed after the last partial item: + item.chunks = item.chunks[from_chunk:] + item.path += '.borg_part_%d' % number + item.part = number + number += 1 + self.add_item(item, show_progress=False) + self.write_checkpoint() + return length, number + + def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None, **chunk_kw): + if not chunk_processor: + def chunk_processor(data): + return cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats) item.chunks = [] from_chunk = 0 part_number = 1 - for data in backup_io_iter(self.chunker.chunkify(fd, fh)): - item.chunks.append(cache.add_chunk(self.key.id_hash(data), Chunk(data, **chunk_kw), stats)) + for data in chunk_iter: + item.chunks.append(chunk_processor(data)) if self.show_progress: self.stats.show_progress(item=item, dt=0.2) if self.checkpoint_interval and time.time() - self.last_checkpoint > self.checkpoint_interval: - from_chunk, part_number = write_part(item, from_chunk, part_number) + from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) self.last_checkpoint = time.time() else: if part_number > 1: @@ -770,7 +775,7 @@ def write_part(item, from_chunk, number): # if we already have created a part item inside this file, we want to put the final # chunks (if any) into a part item also (so all parts can be concatenated to get # the complete file): - from_chunk, part_number = write_part(item, from_chunk, part_number) + from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) self.last_checkpoint = time.time() # if we created part files, we have referenced all chunks from the part files, @@ -789,7 +794,7 @@ def process_stdin(self, path, cache): mtime=t, atime=t, ctime=t, ) fd = sys.stdin.buffer # binary - self.chunk_file(item, cache, self.stats, fd) + self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd))) self.stats.nfiles += 1 self.add_item(item) return 'i' # stdin @@ -845,7 +850,7 @@ def process_file(self, path, st, cache, ignore_inode=False): with backup_io(): fh = Archive._open_rb(path) with os.fdopen(fh, 'rb') as fd: - self.chunk_file(item, cache, self.stats, fd, fh, compress=compress) + self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)), compress=compress) if not is_special_file: # we must not memorize special files, because the contents of e.g. a # block or char device will change without its mtime/size/inode changing. @@ -1386,7 +1391,8 @@ def is_temporary_archive(archive_name): def __init__(self, repository, manifest, key, cache, matcher, exclude_caches=False, exclude_if_present=None, keep_tag_files=False, chunker_params=None, compression=None, compression_files=None, always_recompress=False, - dry_run=False, stats=False, progress=False, file_status_printer=None): + dry_run=False, stats=False, progress=False, file_status_printer=None, + checkpoint_interval=1800): self.repository = repository self.key = key self.manifest = manifest @@ -1410,9 +1416,7 @@ def __init__(self, repository, manifest, key, cache, matcher, self.stats = stats self.progress = progress self.print_file_status = file_status_printer or (lambda *args: None) - - self.interrupt = False - self.errors = False + self.checkpoint_interval = checkpoint_interval def recreate(self, archive_name, comment=None, target_name=None): assert not self.is_temporary_archive(archive_name) @@ -1466,7 +1470,7 @@ def item_is_hardlink_master(item): def process_item(self, archive, target, item): if 'chunks' in item: - item.chunks = self.process_chunks(archive, target, item) + self.process_chunks(archive, target, item) target.stats.nfiles += 1 target.add_item(item) self.print_file_status(file_status(item.mode), item.path) @@ -1477,77 +1481,62 @@ def process_chunks(self, archive, target, item): for chunk_id, size, csize in item.chunks: self.cache.chunk_incref(chunk_id, target.stats) return item.chunks - new_chunks = [] - chunk_iterator = self.create_chunk_iterator(archive, target, item) + chunk_iterator = self.create_chunk_iterator(archive, target, list(item.chunks)) compress = self.compression_decider1.decide(item.path) - for chunk in chunk_iterator: - chunk.meta['compress'] = compress - chunk_id = self.key.id_hash(chunk.data) - if chunk_id in self.seen_chunks: - new_chunks.append(self.cache.chunk_incref(chunk_id, target.stats)) - else: - compression_spec, chunk = self.key.compression_decider2.decide(chunk) - overwrite = self.recompress - if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks: - # Check if this chunk is already compressed the way we want it - old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False) - if Compressor.detect(old_chunk.data).name == compression_spec['name']: - # Stored chunk has the same compression we wanted - overwrite = False - chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) - new_chunks.append((chunk_id, size, csize)) - self.seen_chunks.add(chunk_id) - if self.progress: - target.stats.show_progress(item=item, dt=0.2) - return new_chunks + chunk_processor = partial(self.chunk_processor, target, compress) + target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor) - def create_chunk_iterator(self, archive, target, item): + def chunk_processor(self, target, compress, data): + chunk_id = self.key.id_hash(data) + if chunk_id in self.seen_chunks: + return self.cache.chunk_incref(chunk_id, target.stats) + chunk = Chunk(data, compress=compress) + compression_spec, chunk = self.key.compression_decider2.decide(chunk) + overwrite = self.recompress + if self.recompress and not self.always_recompress and chunk_id in self.cache.chunks: + # Check if this chunk is already compressed the way we want it + old_chunk = self.key.decrypt(None, self.repository.get(chunk_id), decompress=False) + if Compressor.detect(old_chunk.data).name == compression_spec['name']: + # Stored chunk has the same compression we wanted + overwrite = False + chunk_id, size, csize = self.cache.add_chunk(chunk_id, chunk, target.stats, overwrite=overwrite) + self.seen_chunks.add(chunk_id) + return chunk_id, size, csize + + def create_chunk_iterator(self, archive, target, chunks): """Return iterator of chunks to store for 'item' from 'archive' in 'target'.""" - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in item.chunks]) + chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _, _ in chunks]) if target.recreate_rechunkify: # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk # (does not load the entire file into memory) file = ChunkIteratorFileWrapper(chunk_iterator) + return target.chunker.chunkify(file) + else: + for chunk in chunk_iterator: + yield chunk.data - def _chunk_iterator(): - for data in target.chunker.chunkify(file): - yield Chunk(data) - - chunk_iterator = _chunk_iterator() - return chunk_iterator - - def save(self, archive, target, comment=None, completed=True, metadata=None, replace_original=True): + def save(self, archive, target, comment=None, replace_original=True): """Save target archive. If completed, replace source. If not, save temporary with additional 'metadata' dict.""" if self.dry_run: - return completed - if completed: - timestamp = archive.ts.replace(tzinfo=None) - if comment is None: - comment = archive.metadata.get('comment', '') - target.save(timestamp=timestamp, comment=comment, additional_metadata={ - 'cmdline': archive.metadata.cmdline, - 'recreate_cmdline': sys.argv, - }) - if replace_original: - archive.delete(Statistics(), progress=self.progress) - target.rename(archive.name) - if self.stats: - target.end = datetime.utcnow() - log_multi(DASHES, - str(target), - DASHES, - str(target.stats), - str(self.cache), - DASHES) - else: - additional_metadata = metadata or {} - additional_metadata.update({ - 'recreate_source_id': archive.id, - 'recreate_args': sys.argv[1:], - }) - target.save(name=archive.name + '.recreate', additional_metadata=additional_metadata) - logger.info('Run the same command again to resume.') - return completed + return + timestamp = archive.ts.replace(tzinfo=None) + if comment is None: + comment = archive.metadata.get('comment', '') + target.save(timestamp=timestamp, comment=comment, additional_metadata={ + 'cmdline': archive.metadata.cmdline, + 'recreate_cmdline': sys.argv, + }) + if replace_original: + archive.delete(Statistics(), progress=self.progress) + target.rename(archive.name) + if self.stats: + target.end = datetime.utcnow() + log_multi(DASHES, + str(target), + DASHES, + str(target.stats), + str(self.cache), + DASHES) def matcher_add_tagged_dirs(self, archive): """Add excludes to the matcher created by exclude_cache and exclude_if_present.""" @@ -1595,9 +1584,7 @@ def create_target(self, archive, target_name=None): def create_target_archive(self, name): target = Archive(self.repository, self.key, self.manifest, name, create=True, progress=self.progress, chunker_params=self.chunker_params, cache=self.cache, - checkpoint_interval=0, compression=self.compression) - target.recreate_partial_chunks = None - target.recreate_uncomitted_bytes = 0 + checkpoint_interval=self.checkpoint_interval, compression=self.compression) return target def open_archive(self, name, **kwargs): diff --git a/src/borg/archiver.py b/src/borg/archiver.py index 3c63e37e5..1eff7c5e7 100644 --- a/src/borg/archiver.py +++ b/src/borg/archiver.py @@ -1075,6 +1075,7 @@ def do_recreate(self, args, repository, manifest, key, cache): always_recompress=args.always_recompress, progress=args.progress, stats=args.stats, file_status_printer=self.print_file_status, + checkpoint_interval=args.checkpoint_interval, dry_run=args.dry_run) if args.location.archive: @@ -2412,6 +2413,9 @@ def build_parser(self, prog=None): type=archivename_validator(), help='create a new archive with the name ARCHIVE, do not replace existing archive ' '(only applies for a single archive)') + archive_group.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval', + type=int, default=1800, metavar='SECONDS', + help='write checkpoint every SECONDS seconds (Default: 1800)') archive_group.add_argument('--comment', dest='comment', metavar='COMMENT', default=None, help='add a comment text to the archive') archive_group.add_argument('--timestamp', dest='timestamp', @@ -2424,7 +2428,7 @@ def build_parser(self, prog=None): help='select compression algorithm, see the output of the ' '"borg help compression" command for details.') archive_group.add_argument('--always-recompress', dest='always_recompress', action='store_true', - help='always recompress chunks, don\'t skip chunks already compressed with the same' + help='always recompress chunks, don\'t skip chunks already compressed with the same ' 'algorithm.') archive_group.add_argument('--compression-from', dest='compression_files', type=argparse.FileType('r'), action='append', From 15cefe8ddebc7d226c6f05be44d1a9f1f94d9bea Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sun, 20 Nov 2016 13:31:46 +0100 Subject: [PATCH 3/3] recreate/archiver tests: add check_cache tool - lints refcounts --- src/borg/testsuite/archiver.py | 55 +++++++++++++++++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/src/borg/testsuite/archiver.py b/src/borg/testsuite/archiver.py index da3720563..847b4ca98 100644 --- a/src/borg/testsuite/archiver.py +++ b/src/borg/testsuite/archiver.py @@ -29,7 +29,7 @@ from ..cache import Cache from ..constants import * # NOQA from ..crypto import bytes_to_long, num_aes_blocks -from ..helpers import PatternMatcher, parse_pattern +from ..helpers import PatternMatcher, parse_pattern, Location from ..helpers import Chunk, Manifest from ..helpers import EXIT_SUCCESS, EXIT_WARNING, EXIT_ERROR from ..helpers import bin_to_hex @@ -260,6 +260,9 @@ def open_archive(self, name): archive = Archive(repository, key, manifest, name) return archive, repository + def open_repository(self): + return Repository(self.repository_path, exclusive=True) + def create_regular_file(self, name, size=0, contents=None): filename = os.path.join(self.input_path, name) if not os.path.exists(os.path.dirname(filename)): @@ -1626,6 +1629,40 @@ def raise_eof(*args): self.cmd('init', self.repository_location, exit_code=1) assert not os.path.exists(self.repository_location) + def check_cache(self): + # First run a regular borg check + self.cmd('check', self.repository_location) + # Then check that the cache on disk matches exactly what's in the repo. + with self.open_repository() as repository: + manifest, key = Manifest.load(repository) + with Cache(repository, key, manifest, sync=False) as cache: + original_chunks = cache.chunks + cache.destroy(repository) + with Cache(repository, key, manifest) as cache: + correct_chunks = cache.chunks + assert original_chunks is not correct_chunks + seen = set() + for id, (refcount, size, csize) in correct_chunks.iteritems(): + o_refcount, o_size, o_csize = original_chunks[id] + assert refcount == o_refcount + assert size == o_size + assert csize == o_csize + seen.add(id) + for id, (refcount, size, csize) in original_chunks.iteritems(): + assert id in seen + + def test_check_cache(self): + self.cmd('init', self.repository_location) + self.cmd('create', self.repository_location + '::test', 'input') + with self.open_repository() as repository: + manifest, key = Manifest.load(repository) + with Cache(repository, key, manifest, sync=False) as cache: + cache.begin_txn() + cache.chunks.incref(list(cache.chunks.iteritems())[0][0]) + cache.commit() + with pytest.raises(AssertionError): + self.check_cache() + def test_recreate_target_rc(self): self.cmd('init', self.repository_location) output = self.cmd('recreate', self.repository_location, '--target=asdf', exit_code=2) @@ -1634,10 +1671,13 @@ def test_recreate_target_rc(self): def test_recreate_target(self): self.create_test_files() self.cmd('init', self.repository_location) + self.check_cache() archive = self.repository_location + '::test0' self.cmd('create', archive, 'input') + self.check_cache() original_archive = self.cmd('list', self.repository_location) self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3', '--target=new-archive') + self.check_cache() archives = self.cmd('list', self.repository_location) assert original_archive in archives assert 'new-archive' in archives @@ -1655,6 +1695,7 @@ def test_recreate_basic(self): archive = self.repository_location + '::test0' self.cmd('create', archive, 'input') self.cmd('recreate', archive, 'input/dir2', '-e', 'input/dir2/file3') + self.check_cache() listing = self.cmd('list', '--short', archive) assert 'file1' not in listing assert 'dir2/file2' in listing @@ -1666,6 +1707,7 @@ def test_recreate_subtree_hardlinks(self): self._extract_hardlinks_setup() self.cmd('create', self.repository_location + '::test2', 'input') self.cmd('recreate', self.repository_location + '::test', 'input/dir1') + self.check_cache() with changedir('output'): self.cmd('extract', self.repository_location + '::test') assert os.stat('input/dir1/hardlink').st_nlink == 2 @@ -1689,6 +1731,7 @@ def test_recreate_rechunkify(self): # test1 and test2 do not deduplicate assert num_chunks == unique_chunks self.cmd('recreate', self.repository_location, '--chunker-params', 'default') + self.check_cache() # test1 and test2 do deduplicate after recreate assert not int(self.cmd('list', self.repository_location + '::test1', 'input/large_file', '--format', '{unique_chunks}')) @@ -1702,6 +1745,7 @@ def test_recreate_recompress(self): size, csize, sha256_before = file_list.split(' ') assert int(csize) >= int(size) # >= due to metadata overhead self.cmd('recreate', self.repository_location, '-C', 'lz4') + self.check_cache() file_list = self.cmd('list', self.repository_location + '::test', 'input/compressible', '--format', '{size} {csize} {sha256}') size, csize, sha256_after = file_list.split(' ') @@ -1714,6 +1758,7 @@ def test_recreate_dry_run(self): self.cmd('create', self.repository_location + '::test', 'input') archives_before = self.cmd('list', self.repository_location + '::test') self.cmd('recreate', self.repository_location, '-n', '-e', 'input/compressible') + self.check_cache() archives_after = self.cmd('list', self.repository_location + '::test') assert archives_after == archives_before @@ -1723,6 +1768,7 @@ def test_recreate_skips_nothing_to_do(self): self.cmd('create', self.repository_location + '::test', 'input') info_before = self.cmd('info', self.repository_location + '::test') self.cmd('recreate', self.repository_location, '--chunker-params', 'default') + self.check_cache() info_after = self.cmd('info', self.repository_location + '::test') assert info_before == info_after # includes archive ID @@ -1743,18 +1789,22 @@ def test_recreate_list_output(self): self.cmd('create', self.repository_location + '::test', 'input') output = self.cmd('recreate', '--list', '--info', self.repository_location + '::test', '-e', 'input/file2') + self.check_cache() self.assert_in("input/file1", output) self.assert_in("x input/file2", output) output = self.cmd('recreate', '--list', self.repository_location + '::test', '-e', 'input/file3') + self.check_cache() self.assert_in("input/file1", output) self.assert_in("x input/file3", output) output = self.cmd('recreate', self.repository_location + '::test', '-e', 'input/file4') + self.check_cache() self.assert_not_in("input/file1", output) self.assert_not_in("x input/file4", output) output = self.cmd('recreate', '--info', self.repository_location + '::test', '-e', 'input/file5') + self.check_cache() self.assert_not_in("input/file1", output) self.assert_not_in("x input/file5", output) @@ -2095,6 +2145,9 @@ def as_dict(): class RemoteArchiverTestCase(ArchiverTestCase): prefix = '__testsuite__:' + def open_repository(self): + return RemoteRepository(Location(self.repository_location)) + def test_remote_repo_restrict_to_path(self): # restricted to repo directory itself: with patch.object(RemoteRepository, 'extra_test_args', ['--restrict-to-path', self.repository_path]):