From 5fe6c09c3402e4e25e1a4bd27dbefcd31c33e837 Mon Sep 17 00:00:00 2001 From: Marian Beermann Date: Sun, 7 Aug 2016 12:23:37 +0200 Subject: [PATCH] Refactor ArchiveRecreater.try_resume --- src/borg/archive.py | 83 ++++++++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 35 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 13f596c33..2a3328bee 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1532,6 +1532,51 @@ class ArchiveRecreater: """Try to resume from temporary archive. Return (target archive, resume from path) if successful.""" logger.info('Found %s, will resume interrupted operation', target_name) old_target = self.open_archive(target_name) + if not self.can_resume(archive, old_target, target_name): + return None, None + target = self.create_target_archive(target_name + '.temp') + logger.info('Replaying items from interrupted operation...') + last_old_item = self.copy_items(old_target, target) + if last_old_item: + resume_from = last_old_item.path + else: + resume_from = None + self.incref_partial_chunks(old_target, target) + old_target.delete(Statistics(), progress=self.progress) + logger.info('Done replaying items') + return target, resume_from + + def incref_partial_chunks(self, source_archive, target_archive): + target_archive.recreate_partial_chunks = source_archive.metadata.get(b'recreate_partial_chunks', []) + for chunk_id, size, csize in target_archive.recreate_partial_chunks: + if not self.cache.seen_chunk(chunk_id): + try: + # Repository has __contains__, RemoteRepository doesn't + self.repository.get(chunk_id) + except Repository.ObjectNotFound: + # delete/prune/check between invocations: these chunks are gone. + target_archive.recreate_partial_chunks = None + break + # fast-lane insert into chunks cache + self.cache.chunks[chunk_id] = (1, size, csize) + target_archive.stats.update(size, csize, True) + continue + # incref now, otherwise a source_archive.delete() might delete these chunks + self.cache.chunk_incref(chunk_id, target_archive.stats) + + def copy_items(self, source_archive, target_archive): + item = None + for item in source_archive.iter_items(): + if 'chunks' in item: + for chunk in item.chunks: + self.cache.chunk_incref(chunk.id, target_archive.stats) + target_archive.stats.nfiles += 1 + target_archive.add_item(item) + if self.progress: + source_archive.stats.show_progress(final=True) # XXX target_archive.stats? + return item + + def can_resume(self, archive, old_target, target_name): resume_id = old_target.metadata[b'recreate_source_id'] resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']] if resume_id != archive.id: @@ -1539,45 +1584,13 @@ class ArchiveRecreater: logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id)) logger.warning('Current fingerprint: %s', archive.fpr) old_target.delete(Statistics(), progress=self.progress) - return None, None # can't resume + return False if resume_args != sys.argv[1:]: logger.warning('Command line changed, this might lead to inconsistencies') logger.warning('Saved: %s', repr(resume_args)) logger.warning('Current: %s', repr(sys.argv[1:])) - target = self.create_target_archive(target_name + '.temp') - logger.info('Replaying items from interrupted operation...') - item = None - for item in old_target.iter_items(): - if 'chunks' in item: - for chunk in item.chunks: - self.cache.chunk_incref(chunk.id, target.stats) - target.stats.nfiles += 1 - target.add_item(item) - if item: - resume_from = item.path - else: - resume_from = None - if self.progress: - old_target.stats.show_progress(final=True) - target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', []) - for chunk_id, size, csize in target.recreate_partial_chunks: - if not self.cache.seen_chunk(chunk_id): - try: - # Repository has __contains__, RemoteRepository doesn't - self.repository.get(chunk_id) - except Repository.ObjectNotFound: - # delete/prune/check between invocations: these chunks are gone. - target.recreate_partial_chunks = None - break - # fast-lane insert into chunks cache - self.cache.chunks[chunk_id] = (1, size, csize) - target.stats.update(size, csize, True) - continue - # incref now, otherwise old_target.delete() might delete these chunks - self.cache.chunk_incref(chunk_id, target.stats) - old_target.delete(Statistics(), progress=self.progress) - logger.info('Done replaying items') - return target, resume_from + # Just warn in this case, don't start over + return True def create_target_archive(self, name): target = Archive(self.repository, self.key, self.manifest, name, create=True,