From 56b6f1d2e0e8dd6e654cdf81929f8e8ae0d0e87e Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 31 Jan 2023 03:17:53 +0100 Subject: [PATCH 1/3] create/recreate/import-tar: add --checkpoint-volume option volume based checkpointing is easier to test than its time based cousin. also added first checkpointing test. --- src/borg/archive.py | 21 +++++++++--- src/borg/archiver/create_cmd.py | 9 +++++ src/borg/archiver/recreate_cmd.py | 9 +++++ src/borg/archiver/tar_cmds.py | 9 +++++ src/borg/testsuite/archiver/create_cmd.py | 40 +++++++++++++++++++++++ 5 files changed, 83 insertions(+), 5 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 28693a8d6..0e2a83037 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -1226,14 +1226,19 @@ def cached_hash(chunk, id_hash): class ChunksProcessor: # Processes an iterator of chunks for an Item - def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, rechunkify): + def __init__(self, *, key, cache, add_item, write_checkpoint, checkpoint_interval, checkpoint_volume, rechunkify): self.key = key self.cache = cache self.add_item = add_item self.write_checkpoint = write_checkpoint + self.rechunkify = rechunkify + # time interval based checkpointing self.checkpoint_interval = checkpoint_interval self.last_checkpoint = time.monotonic() - self.rechunkify = rechunkify + # file content volume based checkpointing + self.checkpoint_volume = checkpoint_volume + self.current_volume = 0 + self.last_volume_checkpoint = 0 def write_part_file(self, item, from_chunk, number): item = Item(internal_dict=item.as_dict()) @@ -1255,13 +1260,14 @@ class ChunksProcessor: if ( forced or sig_int_triggered - or self.checkpoint_interval - and time.monotonic() - self.last_checkpoint > self.checkpoint_interval + or (self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval) + or (self.checkpoint_volume and self.current_volume - self.last_volume_checkpoint >= self.checkpoint_volume) ): if sig_int_triggered: logger.info("checkpoint requested: starting checkpoint creation...") from_chunk, part_number = self.write_part_file(item, from_chunk, part_number) self.last_checkpoint = time.monotonic() + self.last_volume_checkpoint = self.current_volume if sig_int_triggered: sig_int.action_completed() logger.info("checkpoint requested: finished checkpoint creation!") @@ -1286,7 +1292,9 @@ class ChunksProcessor: from_chunk = 0 part_number = 1 for chunk in chunk_iter: - item.chunks.append(chunk_processor(chunk)) + cle = chunk_processor(chunk) + item.chunks.append(cle) + self.current_volume += cle[1] if show_progress: stats.show_progress(item=item, dt=0.2) from_chunk, part_number = self.maybe_checkpoint(item, from_chunk, part_number, forced=False) @@ -2262,6 +2270,7 @@ class ArchiveRecreater: file_status_printer=None, timestamp=None, checkpoint_interval=1800, + checkpoint_volume=0, ): self.manifest = manifest self.repository = manifest.repository @@ -2289,6 +2298,7 @@ class ArchiveRecreater: self.progress = progress self.print_file_status = file_status_printer or (lambda *args: None) self.checkpoint_interval = None if dry_run else checkpoint_interval + self.checkpoint_volume = None if dry_run else checkpoint_volume def recreate(self, archive_name, comment=None, target_name=None): assert not self.is_temporary_archive(archive_name) @@ -2456,6 +2466,7 @@ class ArchiveRecreater: add_item=target.add_item, write_checkpoint=target.write_checkpoint, checkpoint_interval=self.checkpoint_interval, + checkpoint_volume=self.checkpoint_volume, rechunkify=target.recreate_rechunkify, ).process_file_chunks target.chunker = get_chunker(*target.chunker_params, seed=self.key.chunk_seed, sparse=False) diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 4f8208ab5..7fe0dc247 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -258,6 +258,7 @@ class CreateMixIn: add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, checkpoint_interval=args.checkpoint_interval, + checkpoint_volume=args.checkpoint_volume, rechunkify=False, ) fso = FilesystemObjectProcessors( @@ -845,6 +846,14 @@ class CreateMixIn: default=1800, help="write checkpoint every SECONDS seconds (Default: 1800)", ) + archive_group.add_argument( + "--checkpoint-volume", + metavar="BYTES", + dest="checkpoint_volume", + type=int, + default=0, + help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)", + ) archive_group.add_argument( "--chunker-params", metavar="PARAMS", diff --git a/src/borg/archiver/recreate_cmd.py b/src/borg/archiver/recreate_cmd.py index 827085d6f..c4a3d6b3c 100644 --- a/src/borg/archiver/recreate_cmd.py +++ b/src/borg/archiver/recreate_cmd.py @@ -39,6 +39,7 @@ class RecreateMixIn: stats=args.stats, file_status_printer=self.print_file_status, checkpoint_interval=args.checkpoint_interval, + checkpoint_volume=args.checkpoint_volume, dry_run=args.dry_run, timestamp=args.timestamp, ) @@ -160,6 +161,14 @@ class RecreateMixIn: metavar="SECONDS", help="write checkpoint every SECONDS seconds (Default: 1800)", ) + archive_group.add_argument( + "--checkpoint-volume", + metavar="BYTES", + dest="checkpoint_volume", + type=int, + default=0, + help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)", + ) archive_group.add_argument( "--comment", metavar="COMMENT", diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index dfabcc8f4..6cf661752 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -274,6 +274,7 @@ class TarMixIn: add_item=archive.add_item, write_checkpoint=archive.write_checkpoint, checkpoint_interval=args.checkpoint_interval, + checkpoint_volume=args.checkpoint_volume, rechunkify=False, ) tfo = TarfileObjectProcessors( @@ -515,6 +516,14 @@ class TarMixIn: metavar="SECONDS", help="write checkpoint every SECONDS seconds (Default: 1800)", ) + archive_group.add_argument( + "--checkpoint-volume", + metavar="BYTES", + dest="checkpoint_volume", + type=int, + default=0, + help="write checkpoint every BYTES bytes (Default: 0, meaning no volume based checkpointing)", + ) archive_group.add_argument( "--chunker-params", dest="chunker_params", diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index c11d7aa10..161163c00 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -167,6 +167,46 @@ class ArchiverTestCase(ArchiverTestCaseBase): ) assert extracted_data == input_data + def test_create_stdin_checkpointing(self): + chunk_size = 1000 # fixed chunker with this size, also volume based checkpointing after that volume + self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) + input_data = b"X" * (chunk_size * 2 - 1) # one full and one partial chunk + self.cmd( + f"--repo={self.repository_location}", + "create", + f"--chunker-params=fixed,{chunk_size}", + f"--checkpoint-volume={chunk_size}", + "test", + "-", + input=input_data, + ) + # repo looking good overall? + self.cmd(f"--repo={self.repository_location}", "check", "-v") + # verify part files + out = self.cmd( + f"--repo={self.repository_location}", + "extract", + "test", + "stdin.borg_part_1", + "--consider-part-files", + "--stdout", + binary_output=True, + ) + assert out == input_data[:chunk_size] + out = self.cmd( + f"--repo={self.repository_location}", + "extract", + "test", + "stdin.borg_part_2", + "--consider-part-files", + "--stdout", + binary_output=True, + ) + assert out == input_data[: chunk_size - 1] + # verify full file + out = self.cmd(f"--repo={self.repository_location}", "extract", "test", "stdin", "--stdout", binary_output=True) + assert out == input_data + def test_create_content_from_command(self): self.cmd(f"--repo={self.repository_location}", "rcreate", RK_ENCRYPTION) input_data = "some test content" From 7e31fab75448a734df786c4ecc93259e3640e2de Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 31 Jan 2023 03:25:47 +0100 Subject: [PATCH 2/3] cleanup: remove Archive.checkpoint_interval (not used) checkpoint_interval and checkpoint_volume are only needed for the ChunksProcessor. --- src/borg/archive.py | 3 --- src/borg/archiver/create_cmd.py | 1 - src/borg/archiver/tar_cmds.py | 1 - 3 files changed, 5 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 0e2a83037..8d2c9c432 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -471,7 +471,6 @@ class Archive: name, cache=None, create=False, - checkpoint_interval=1800, numeric_ids=False, noatime=False, noctime=False, @@ -500,7 +499,6 @@ class Archive: self.name = name # overwritten later with name from archive metadata self.name_in_manifest = name # can differ from .name later (if borg check fixed duplicate archive names) self.comment = None - self.checkpoint_interval = checkpoint_interval self.numeric_ids = numeric_ids self.noatime = noatime self.noctime = noctime @@ -2480,7 +2478,6 @@ class ArchiveRecreater: progress=self.progress, chunker_params=self.chunker_params, cache=self.cache, - checkpoint_interval=self.checkpoint_interval, ) return target diff --git a/src/borg/archiver/create_cmd.py b/src/borg/archiver/create_cmd.py index 7fe0dc247..ccb15c462 100644 --- a/src/borg/archiver/create_cmd.py +++ b/src/borg/archiver/create_cmd.py @@ -232,7 +232,6 @@ class CreateMixIn: args.name, cache=cache, create=True, - checkpoint_interval=args.checkpoint_interval, numeric_ids=args.numeric_ids, noatime=not args.atime, noctime=args.noctime, diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index 6cf661752..95196e118 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -261,7 +261,6 @@ class TarMixIn: args.name, cache=cache, create=True, - checkpoint_interval=args.checkpoint_interval, progress=args.progress, chunker_params=args.chunker_params, start=t0, From 15d1bc0c49783ee5c6c0f50d4ddc15ae906678b4 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Tue, 31 Jan 2023 05:18:00 +0100 Subject: [PATCH 3/3] fix checkpointing: add item_ptrs chunks cleanup not having this had created orphaned item_ptrs chunks for checkpoint archives. also: - borg check: show id of orphaned chunks - borg check: archive list with explicit consider_checkpoints=True (this is the default, but better make sure). --- src/borg/archive.py | 16 ++++++++++++++-- src/borg/testsuite/archiver/create_cmd.py | 4 ++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 8d2c9c432..0600093df 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -401,6 +401,7 @@ class CacheChunkBuffer(ChunkBuffer): def write_chunk(self, chunk): id_, _ = self.cache.add_chunk(self.key.id_hash(chunk), {}, chunk, stats=self.stats, wait=False) + logger.debug(f"writing item metadata stream chunk {bin_to_hex(id_)}") self.cache.repository.async_response(wait=False) return id_ @@ -444,6 +445,7 @@ def archive_put_items(chunk_ids, *, repo_objs, cache=None, stats=None, add_refer for i in range(0, len(chunk_ids), IDS_PER_CHUNK): data = msgpack.packb(chunk_ids[i : i + IDS_PER_CHUNK]) id = repo_objs.id_hash(data) + logger.debug(f"writing item_ptrs chunk {bin_to_hex(id)}") if cache is not None and stats is not None: cache.add_chunk(id, {}, data, stats=stats) elif add_reference is not None: @@ -648,9 +650,16 @@ Duration: {0.duration} self.items_buffer.add(item) def write_checkpoint(self): - self.save(self.checkpoint_name) + metadata = self.save(self.checkpoint_name) + # that .save() has committed the repo. + # at next commit, we won't need this checkpoint archive any more because we will then + # have either a newer checkpoint archive or the final archive. + # so we can already remove it here, the next .save() will then commit this cleanup. + # remove its manifest entry, remove its ArchiveItem chunk, remove its item_ptrs chunks: del self.manifest.archives[self.checkpoint_name] self.cache.chunk_decref(self.id, self.stats) + for id in metadata.item_ptrs: + self.cache.chunk_decref(id, self.stats) def save(self, name=None, comment=None, timestamp=None, stats=None, additional_metadata=None): name = name or self.name @@ -712,6 +721,7 @@ Duration: {0.duration} self.manifest.write() self.repository.commit(compact=False) self.cache.commit() + return metadata def calc_stats(self, cache, want_unique=True): if not want_unique: @@ -2165,7 +2175,7 @@ class ArchiveChecker: if last and len(archive_infos) < last: logger.warning("--last %d archives: only found %d archives", last, len(archive_infos)) else: - archive_infos = self.manifest.archives.list(sort_by=sort_by) + archive_infos = self.manifest.archives.list(sort_by=sort_by, consider_checkpoints=True) num_archives = len(archive_infos) pi = ProgressIndicatorPercent( @@ -2222,6 +2232,8 @@ class ArchiveChecker: orphaned = unused - self.possibly_superseded if orphaned: logger.error(f"{len(orphaned)} orphaned objects found!") + for chunk_id in orphaned: + logger.debug(f"chunk {bin_to_hex(chunk_id)} is orphaned.") self.error_found = True if self.repair and unused: logger.info( diff --git a/src/borg/testsuite/archiver/create_cmd.py b/src/borg/testsuite/archiver/create_cmd.py index 161163c00..24e2f4461 100644 --- a/src/borg/testsuite/archiver/create_cmd.py +++ b/src/borg/testsuite/archiver/create_cmd.py @@ -180,8 +180,8 @@ class ArchiverTestCase(ArchiverTestCaseBase): "-", input=input_data, ) - # repo looking good overall? - self.cmd(f"--repo={self.repository_location}", "check", "-v") + # repo looking good overall? checks for rc == 0. + self.cmd(f"--repo={self.repository_location}", "check", "--debug") # verify part files out = self.cmd( f"--repo={self.repository_location}",