Merge pull request #7305 from ThomasWaldmann/volume-based-checkpointing

volume based checkpointing, fix item_ptrs orphaned chunks
This commit is contained in:
TW 2023-01-31 16:48:49 +01:00 committed by GitHub
commit d0344cb8f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 97 additions and 12 deletions

View File

@ -401,6 +401,7 @@ class CacheChunkBuffer(ChunkBuffer):
def write_chunk(self, chunk):
id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False)
logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}")
self.cache.repository.async_response(wait=False)
return id_
@ -444,6 +445,7 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer
for i in range(0, len(chunk_ids), IDS_PER_CHUNK):
data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK])
id = repo_objs.id_hash(data)
logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}")
if cache is not None and stats is not None:
cache.add_chunk(id, {}, data, stats=stats)
elif add_reference is not None:
@ -471,7 +473,6 @@ class Archive:
name,
cache=None,
create=False,
checkpoint_interval=1800,
numeric_ids=False,
noatime=False,
noctime=False,
@ -500,7 +501,6 @@ class Archive:
self.name = name # overwritten later with name from archive metadata
self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names)
self.comment = None
self.checkpoint_interval = checkpoint_interval
self.numeric_ids = numeric_ids
self.noatime = noatime
self.noctime = noctime
@ -650,9 +650,16 @@ Duration: {0.duration}
self.items_buffer.add(item)
def write_checkpoint(self):
self.save(self.checkpoint_name)
metadata = self.save(self.checkpoint_name)
# that .save() has committed the repo.
# at next commit, we won't need this checkpoint archive any more because we will then
# have either a newer checkpoint archive or the final archive.
# so we can already remove it here, the next .save() will then commit this cleanup.
# remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks:
del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id, self.stats)
for id in metadata.item_ptrs:
self.cache.chunk_decref(id, self.stats)
def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None):
name = name or self.name
@ -714,6 +721,7 @@ Duration: {0.duration}
self.manifest.write()
self.repository.commit(compact=False)
self.cache.commit()
return metadata
def calc_stats(self, cache, want_unique=True):
if not want_unique:
@ -1226,14 +1234,19 @@ def cached_hash(chunk, id_hash):
class ChunksProcessor:
# Processes an iterator of chunks for an Item
def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, rechunkify):
def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify):
self.key = key
self.cache = cache
self.add_item = add_item
self.write_checkpoint = write_checkpoint
self.rechunkify = rechunkify
# time interval based checkpointing
self.checkpoint_interval = checkpoint_interval
self.last_checkpoint = time.monotonic()
self.rechunkify = rechunkify
# file content volume based checkpointing
self.checkpoint_volume = checkpoint_volume
self.current_volume = 0
self.last_volume_checkpoint = 0
def write_part_file(self, item, from_chunk, number):
item = Item(internal_dict=item.as_dict())
@ -1255,13 +1268,14 @@ class ChunksProcessor:
if (
forced
or sig_int_triggered
or self.checkpoint_interval
and time.monotonic() - self.last_checkpoint > self.checkpoint_interval
or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval)
or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume)
):
if sig_int_triggered:
logger.info("checkpoint requested: starting checkpoint creation...")
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
self.last_checkpoint = time.monotonic()
self.last_volume_checkpoint = self.current_volume
if sig_int_triggered:
sig_int.action_completed()
logger.info("checkpoint requested: finished checkpoint creation!")
@ -1286,7 +1300,9 @@ class ChunksProcessor:
from_chunk = 0
part_number = 1
for chunk in chunk_iter:
item.chunks.append(chunk_processor(chunk))
cle = chunk_processor(chunk)
item.chunks.append(cle)
self.current_volume += cle[1]
if show_progress:
stats.show_progress(item=item, dt=0.2)
from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False)
@ -2159,7 +2175,7 @@ class ArchiveChecker:
if last and len(archive_infos) < last:
logger.warning("--last %d archives: only found %d archives", last, len(archive_infos))
else:
archive_infos = self.manifest.archives.list(sort_by=sort_by)
archive_infos = self.manifest.archives.list(sort_by=sort_by, consider_checkpoints=True)
num_archives = len(archive_infos)
pi = ProgressIndicatorPercent(
@ -2216,6 +2232,8 @@ class ArchiveChecker:
orphaned = unused - self.possibly_superseded
if orphaned:
logger.error(f"{len(orphaned)} orphaned objects found!")
for chunk_id in orphaned:
logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.")
self.error_found = True
if self.repair and unused:
logger.info(
@ -2262,6 +2280,7 @@ class ArchiveRecreater:
file_status_printer=None,
timestamp=None,
checkpoint_interval=1800,
checkpoint_volume=0,
):
self.manifest = manifest
self.repository = manifest.repository
@ -2289,6 +2308,7 @@ class ArchiveRecreater:
self.progress = progress
self.print_file_status = file_status_printer or (lambda *args: None)
self.checkpoint_interval = None if dry_run else checkpoint_interval
self.checkpoint_volume = None if dry_run else checkpoint_volume
def recreate(self, archive_name, comment=None, target_name=None):
assert not self.is_temporary_archive(archive_name)
@ -2456,6 +2476,7 @@ class ArchiveRecreater:
add_item=target.add_item,
write_checkpoint=target.write_checkpoint,
checkpoint_interval=self.checkpoint_interval,
checkpoint_volume=self.checkpoint_volume,
rechunkify=target.recreate_rechunkify,
).process_file_chunks
target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed, sparse=False)
@ -2469,7 +2490,6 @@ class ArchiveRecreater:
progress=self.progress,
chunker_params=self.chunker_params,
cache=self.cache,
checkpoint_interval=self.checkpoint_interval,
)
return target

View File

@ -232,7 +232,6 @@ class CreateMixIn:
args.name,
cache=cache,
create=True,
checkpoint_interval=args.checkpoint_interval,
numeric_ids=args.numeric_ids,
noatime=not args.atime,
noctime=args.noctime,
@ -258,6 +257,7 @@ class CreateMixIn:
add_item=archive.add_item,
write_checkpoint=archive.write_checkpoint,
checkpoint_interval=args.checkpoint_interval,
checkpoint_volume=args.checkpoint_volume,
rechunkify=False,
)
fso = FilesystemObjectProcessors(
@ -845,6 +845,14 @@ class CreateMixIn:
default=1800,
help="write checkpoint every SECONDS seconds (Default: 1800)",
)
archive_group.add_argument(
"--checkpoint-volume",
metavar="BYTES",
dest="checkpoint_volume",
type=int,
default=0,
help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)",
)
archive_group.add_argument(
"--chunker-params",
metavar="PARAMS",

View File

@ -39,6 +39,7 @@ class RecreateMixIn:
stats=args.stats,
file_status_printer=self.print_file_status,
checkpoint_interval=args.checkpoint_interval,
checkpoint_volume=args.checkpoint_volume,
dry_run=args.dry_run,
timestamp=args.timestamp,
)
@ -160,6 +161,14 @@ class RecreateMixIn:
metavar="SECONDS",
help="write checkpoint every SECONDS seconds (Default: 1800)",
)
archive_group.add_argument(
"--checkpoint-volume",
metavar="BYTES",
dest="checkpoint_volume",
type=int,
default=0,
help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)",
)
archive_group.add_argument(
"--comment",
metavar="COMMENT",

View File

@ -261,7 +261,6 @@ class TarMixIn:
args.name,
cache=cache,
create=True,
checkpoint_interval=args.checkpoint_interval,
progress=args.progress,
chunker_params=args.chunker_params,
start=t0,
@ -274,6 +273,7 @@ class TarMixIn:
add_item=archive.add_item,
write_checkpoint=archive.write_checkpoint,
checkpoint_interval=args.checkpoint_interval,
checkpoint_volume=args.checkpoint_volume,
rechunkify=False,
)
tfo = TarfileObjectProcessors(
@ -515,6 +515,14 @@ class TarMixIn:
metavar="SECONDS",
help="write checkpoint every SECONDS seconds (Default: 1800)",
)
archive_group.add_argument(
"--checkpoint-volume",
metavar="BYTES",
dest="checkpoint_volume",
type=int,
default=0,
help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)",
)
archive_group.add_argument(
"--chunker-params",
dest="chunker_params",

View File

@ -167,6 +167,46 @@ class ArchiverTestCase(ArchiverTestCaseBase):
)
assert extracted_data == input_data
def test_create_stdin_checkpointing(self):
chunk_size = 1000 # fixed chunker with this size, also volume based checkpointing after that volume
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
input_data = b"X" * (chunk_size * 2 - 1) # one full and one partial chunk
self.cmd(
f"--repo={self.repository_location}",
"create",
f"--chunker-params=fixed,{chunk_size}",
f"--checkpoint-volume={chunk_size}",
"test",
"-",
input=input_data,
)
# repo looking good overall? checks for rc == 0.
self.cmd(f"--repo={self.repository_location}", "check", "--debug")
# verify part files
out = self.cmd(
f"--repo={self.repository_location}",
"extract",
"test",
"stdin.borg_part_1",
"--consider-part-files",
"--stdout",
binary_output=True,
)
assert out == input_data[:chunk_size]
out = self.cmd(
f"--repo={self.repository_location}",
"extract",
"test",
"stdin.borg_part_2",
"--consider-part-files",
"--stdout",
binary_output=True,
)
assert out == input_data[: chunk_size - 1]
# verify full file
out = self.cmd(f"--repo={self.repository_location}", "extract", "test", "stdin", "--stdout", binary_output=True)
assert out == input_data
def test_create_content_from_command(self):
self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION)
input_data = "some test content"