Refactor ArchiveRecreater.try_resume

This commit is contained in:
Marian Beermann 2016-08-07 12:23:37 +02:00
parent 2cfe940ebf
commit 5fe6c09c34
1 changed files with 48 additions and 35 deletions

View File

@ -1532,6 +1532,51 @@ class ArchiveRecreater:
"""Try to resume from temporary archive. Return (target archive, resume from path) if successful."""
logger.info('Found %s, will resume interrupted operation', target_name)
old_target = self.open_archive(target_name)
if not self.can_resume(archive, old_target, target_name):
return None, None
target = self.create_target_archive(target_name + '.temp')
logger.info('Replaying items from interrupted operation...')
last_old_item = self.copy_items(old_target, target)
if last_old_item:
resume_from = last_old_item.path
else:
resume_from = None
self.incref_partial_chunks(old_target, target)
old_target.delete(Statistics(), progress=self.progress)
logger.info('Done replaying items')
return target, resume_from
def incref_partial_chunks(self, source_archive, target_archive):
target_archive.recreate_partial_chunks = source_archive.metadata.get(b'recreate_partial_chunks', [])
for chunk_id, size, csize in target_archive.recreate_partial_chunks:
if not self.cache.seen_chunk(chunk_id):
try:
# Repository has __contains__, RemoteRepository doesn't
self.repository.get(chunk_id)
except Repository.ObjectNotFound:
# delete/prune/check between invocations: these chunks are gone.
target_archive.recreate_partial_chunks = None
break
# fast-lane insert into chunks cache
self.cache.chunks[chunk_id] = (1, size, csize)
target_archive.stats.update(size, csize, True)
continue
# incref now, otherwise a source_archive.delete() might delete these chunks
self.cache.chunk_incref(chunk_id, target_archive.stats)
def copy_items(self, source_archive, target_archive):
item = None
for item in source_archive.iter_items():
if 'chunks' in item:
for chunk in item.chunks:
self.cache.chunk_incref(chunk.id, target_archive.stats)
target_archive.stats.nfiles += 1
target_archive.add_item(item)
if self.progress:
source_archive.stats.show_progress(final=True) # XXX target_archive.stats?
return item
def can_resume(self, archive, old_target, target_name):
resume_id = old_target.metadata[b'recreate_source_id']
resume_args = [safe_decode(arg) for arg in old_target.metadata[b'recreate_args']]
if resume_id != archive.id:
@ -1539,45 +1584,13 @@ class ArchiveRecreater:
logger.warning('Saved fingerprint: %s', bin_to_hex(resume_id))
logger.warning('Current fingerprint: %s', archive.fpr)
old_target.delete(Statistics(), progress=self.progress)
return None, None # can't resume
return False
if resume_args != sys.argv[1:]:
logger.warning('Command line changed, this might lead to inconsistencies')
logger.warning('Saved: %s', repr(resume_args))
logger.warning('Current: %s', repr(sys.argv[1:]))
target = self.create_target_archive(target_name + '.temp')
logger.info('Replaying items from interrupted operation...')
item = None
for item in old_target.iter_items():
if 'chunks' in item:
for chunk in item.chunks:
self.cache.chunk_incref(chunk.id, target.stats)
target.stats.nfiles += 1
target.add_item(item)
if item:
resume_from = item.path
else:
resume_from = None
if self.progress:
old_target.stats.show_progress(final=True)
target.recreate_partial_chunks = old_target.metadata.get(b'recreate_partial_chunks', [])
for chunk_id, size, csize in target.recreate_partial_chunks:
if not self.cache.seen_chunk(chunk_id):
try:
# Repository has __contains__, RemoteRepository doesn't
self.repository.get(chunk_id)
except Repository.ObjectNotFound:
# delete/prune/check between invocations: these chunks are gone.
target.recreate_partial_chunks = None
break
# fast-lane insert into chunks cache
self.cache.chunks[chunk_id] = (1, size, csize)
target.stats.update(size, csize, True)
continue
# incref now, otherwise old_target.delete() might delete these chunks
self.cache.chunk_incref(chunk_id, target.stats)
old_target.delete(Statistics(), progress=self.progress)
logger.info('Done replaying items')
return target, resume_from
# Just warn in this case, don't start over
return True
def create_target_archive(self, name):
target = Archive(self.repository, self.key, self.manifest, name, create=True,