mirror of
https://github.com/borgbackup/borg.git
synced 2025-03-10 14:15:43 +00:00
chunks index caching, fixes #8397
borg compact now uses ChunkIndex (a specialized, memory-efficient data structure), so it needs less memory now. Also, it saves that chunks index to cache/chunks in the repository. When the chunks index is needed, it is first tried to get it from cache/chunks. If that fails, fall back to building the chunks index via repository.list(), which can be rather slow and immediately cache the resulting ChunkIndex in the repo. borg check --repair currently just deletes the chunks cache, because it might have deleted some invalid chunks in the repo. cache.close now saves the chunks index to cache/chunks in repo if it was modified. thus, borg create will update the cached chunks index with new chunks. cache/chunks_hash can be used to validate cache/chunks (and also to validate / invalidate locally cached copies of that).
This commit is contained in:
parent
1e6f71f2f5
commit
36e3d63474
3 changed files with 156 additions and 65 deletions
|
@ -50,7 +50,7 @@ from .patterns import PathPrefixPattern, FnmatchPattern, IECommand
|
|||
from .item import Item, ArchiveItem, ItemDiff
|
||||
from .platform import acl_get, acl_set, set_flags, get_flags, swidth, hostname
|
||||
from .remote import RemoteRepository, cache_if_remote
|
||||
from .repository import Repository, NoManifestError
|
||||
from .repository import Repository, NoManifestError, StoreObjectNotFound
|
||||
from .repoobj import RepoObj
|
||||
|
||||
has_link = hasattr(os, "link")
|
||||
|
@ -1644,7 +1644,7 @@ class ArchiveChecker:
|
|||
self.check_all = not any((first, last, match, older, newer, oldest, newest))
|
||||
self.repair = repair
|
||||
self.repository = repository
|
||||
self.chunks = build_chunkindex_from_repo(self.repository)
|
||||
self.chunks = build_chunkindex_from_repo(self.repository, disable_caches=True, cache_immediately=not repair)
|
||||
self.key = self.make_key(repository)
|
||||
self.repo_objs = RepoObj(self.key)
|
||||
if verify_data:
|
||||
|
@ -2100,6 +2100,18 @@ class ArchiveChecker:
|
|||
|
||||
def finish(self):
|
||||
if self.repair:
|
||||
logger.info("Deleting chunks cache in repository - next repository access will cause a rebuild.")
|
||||
# we may have deleted chunks, invalidate/remove the chunks index cache!
|
||||
try:
|
||||
self.repository.store_delete("cache/chunks_hash")
|
||||
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
||||
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
||||
pass
|
||||
try:
|
||||
self.repository.store_delete("cache/chunks")
|
||||
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
||||
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
||||
pass
|
||||
logger.info("Writing Manifest.")
|
||||
self.manifest.write()
|
||||
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
import argparse
|
||||
from typing import Tuple, Dict
|
||||
from typing import Tuple, Set
|
||||
|
||||
from ._common import with_repository
|
||||
from ..archive import Archive
|
||||
from ..cache import write_chunkindex_to_repo_cache
|
||||
from ..constants import * # NOQA
|
||||
from ..hashindex import ChunkIndex, ChunkIndexEntry
|
||||
from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex
|
||||
from ..helpers import ProgressIndicatorPercent
|
||||
from ..manifest import Manifest
|
||||
|
@ -20,34 +22,33 @@ class ArchiveGarbageCollector:
|
|||
self.repository = repository
|
||||
assert isinstance(repository, (Repository, RemoteRepository))
|
||||
self.manifest = manifest
|
||||
self.repository_chunks = None # what we have in the repository, id -> stored_size
|
||||
self.used_chunks = None # what archives currently reference
|
||||
self.wanted_chunks = None # chunks that would be nice to have for next borg check --repair
|
||||
self.chunks = None # a ChunkIndex, here used for: id -> (is_used, stored_size)
|
||||
self.total_files = None # overall number of source files written to all archives in this repo
|
||||
self.total_size = None # overall size of source file content data written to all archives
|
||||
self.archives_count = None # number of archives
|
||||
|
||||
@property
|
||||
def repository_size(self):
|
||||
if self.repository_chunks is None:
|
||||
if self.chunks is None:
|
||||
return None
|
||||
return sum(self.repository_chunks.values()) # sum of stored sizes
|
||||
return sum(entry.size for id, entry in self.chunks.iteritems()) # sum of stored sizes
|
||||
|
||||
def garbage_collect(self):
|
||||
"""Removes unused chunks from a repository."""
|
||||
logger.info("Starting compaction / garbage collection...")
|
||||
logger.info("Getting object IDs present in the repository...")
|
||||
self.repository_chunks = self.get_repository_chunks()
|
||||
self.chunks = self.get_repository_chunks()
|
||||
logger.info("Computing object IDs used by archives...")
|
||||
(self.used_chunks, self.wanted_chunks, self.total_files, self.total_size, self.archives_count) = (
|
||||
(self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = (
|
||||
self.analyze_archives()
|
||||
)
|
||||
self.report_and_delete()
|
||||
self.save_chunk_index()
|
||||
logger.info("Finished compaction / garbage collection...")
|
||||
|
||||
def get_repository_chunks(self) -> Dict[bytes, int]:
|
||||
def get_repository_chunks(self) -> ChunkIndex:
|
||||
"""Build a dict id -> size of all chunks present in the repository"""
|
||||
repository_chunks = {}
|
||||
chunks = ChunkIndex()
|
||||
marker = None
|
||||
while True:
|
||||
result = self.repository.list(limit=LIST_SCAN_LIMIT, marker=marker)
|
||||
|
@ -55,13 +56,41 @@ class ArchiveGarbageCollector:
|
|||
break
|
||||
marker = result[-1][0]
|
||||
for id, stored_size in result:
|
||||
repository_chunks[id] = stored_size
|
||||
return repository_chunks
|
||||
# we add this id to the chunks index, using refcount == 0, because
|
||||
# we do not know yet whether it is actually referenced from some archives.
|
||||
# we "abuse" the size field here. usually there is the plaintext size,
|
||||
# but we use it for the size of the stored object here.
|
||||
chunks[id] = ChunkIndexEntry(refcount=0, size=stored_size)
|
||||
return chunks
|
||||
|
||||
def analyze_archives(self) -> Tuple[Dict[bytes, int], Dict[bytes, int], int, int, int]:
|
||||
def save_chunk_index(self):
|
||||
# first clean up:
|
||||
for id, entry in self.chunks.iteritems():
|
||||
# we already deleted the unused chunks, so everything left must be used:
|
||||
assert entry.refcount == ChunkIndex.MAX_VALUE
|
||||
# as we put the wrong size in there, we need to clean up the size:
|
||||
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=0)
|
||||
# now self.chunks is an uptodate ChunkIndex, usable for general borg usage!
|
||||
write_chunkindex_to_repo_cache(self.repository, self.chunks, compact=True, clear=True, force_write=True)
|
||||
self.chunks = None # nothing there (cleared!)
|
||||
|
||||
def analyze_archives(self) -> Tuple[Set, Set, int, int, int]:
|
||||
"""Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks."""
|
||||
used_chunks = {} # chunks referenced by item.chunks
|
||||
wanted_chunks = {} # additional "wanted" chunks seen in item.chunks_healthy
|
||||
|
||||
def use_it(id, *, wanted=False):
|
||||
entry = self.chunks.get(id)
|
||||
if entry is not None:
|
||||
# the chunk is in the repo, mark it used by setting refcount to max.
|
||||
self.chunks[id] = ChunkIndexEntry(refcount=ChunkIndex.MAX_VALUE, size=entry.size)
|
||||
if wanted:
|
||||
# chunk id is from chunks_healthy list: a lost chunk has re-appeared!
|
||||
reappeared_chunks.add(id)
|
||||
else:
|
||||
# we do NOT have this chunk in the repository!
|
||||
missing_chunks.add(id)
|
||||
|
||||
missing_chunks: set[bytes] = set()
|
||||
reappeared_chunks: set[bytes] = set()
|
||||
archive_infos = self.manifest.archives.list(sort_by=["ts"])
|
||||
num_archives = len(archive_infos)
|
||||
pi = ProgressIndicatorPercent(
|
||||
|
@ -73,79 +102,60 @@ class ArchiveGarbageCollector:
|
|||
logger.info(f"Analyzing archive {info.name} {info.ts} {bin_to_hex(info.id)} ({i + 1}/{num_archives})")
|
||||
archive = Archive(self.manifest, info.id)
|
||||
# archive metadata size unknown, but usually small/irrelevant:
|
||||
used_chunks[archive.id] = 0
|
||||
use_it(archive.id)
|
||||
for id in archive.metadata.item_ptrs:
|
||||
used_chunks[id] = 0
|
||||
use_it(id)
|
||||
for id in archive.metadata.items:
|
||||
used_chunks[id] = 0
|
||||
use_it(id)
|
||||
# archive items content data:
|
||||
for item in archive.iter_items():
|
||||
total_files += 1 # every fs object counts, not just regular files
|
||||
if "chunks" in item:
|
||||
for id, size in item.chunks:
|
||||
total_size += size # original, uncompressed file content size
|
||||
used_chunks[id] = size
|
||||
use_it(id)
|
||||
if "chunks_healthy" in item:
|
||||
# we also consider the chunks_healthy chunks as referenced - do not throw away
|
||||
# anything that borg check --repair might still need.
|
||||
for id, size in item.chunks_healthy:
|
||||
if id not in used_chunks:
|
||||
wanted_chunks[id] = size
|
||||
use_it(id, wanted=True)
|
||||
pi.finish()
|
||||
return used_chunks, wanted_chunks, total_files, total_size, num_archives
|
||||
return missing_chunks, reappeared_chunks, total_files, total_size, num_archives
|
||||
|
||||
def report_and_delete(self):
|
||||
run_repair = " Run borg check --repair!"
|
||||
|
||||
missing_new = set(self.used_chunks) - set(self.repository_chunks)
|
||||
if missing_new:
|
||||
logger.error(f"Repository has {len(missing_new)} new missing objects." + run_repair)
|
||||
if self.missing_chunks:
|
||||
logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair)
|
||||
set_ec(EXIT_ERROR)
|
||||
|
||||
missing_known = set(self.wanted_chunks) - set(self.repository_chunks)
|
||||
if missing_known:
|
||||
logger.warning(f"Repository has {len(missing_known)} known missing objects.")
|
||||
set_ec(EXIT_WARNING)
|
||||
|
||||
missing_found = set(self.wanted_chunks) & set(self.repository_chunks)
|
||||
if missing_found:
|
||||
logger.warning(f"{len(missing_found)} previously missing objects re-appeared!" + run_repair)
|
||||
if self.reappeared_chunks:
|
||||
logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair)
|
||||
set_ec(EXIT_WARNING)
|
||||
|
||||
repo_size_before = self.repository_size
|
||||
referenced_chunks = set(self.used_chunks) | set(self.wanted_chunks)
|
||||
unused = set(self.repository_chunks) - referenced_chunks
|
||||
logger.info(f"Repository has {len(unused)} objects to delete.")
|
||||
if unused:
|
||||
logger.info(f"Deleting {len(unused)} unused objects...")
|
||||
pi = ProgressIndicatorPercent(
|
||||
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
|
||||
)
|
||||
for i, id in enumerate(unused):
|
||||
pi.show(i)
|
||||
self.repository.delete(id)
|
||||
del self.repository_chunks[id]
|
||||
pi.finish()
|
||||
logger.info("Determining unused objects...")
|
||||
unused = set()
|
||||
for id, entry in self.chunks.iteritems():
|
||||
if entry.refcount == 0:
|
||||
unused.add(id)
|
||||
logger.info(f"Deleting {len(unused)} unused objects...")
|
||||
pi = ProgressIndicatorPercent(
|
||||
total=len(unused), msg="Deleting unused objects %3.1f%%", step=0.1, msgid="compact.report_and_delete"
|
||||
)
|
||||
for i, id in enumerate(unused):
|
||||
pi.show(i)
|
||||
self.repository.delete(id)
|
||||
del self.chunks[id]
|
||||
pi.finish()
|
||||
repo_size_after = self.repository_size
|
||||
|
||||
count = len(self.repository_chunks)
|
||||
count = len(self.chunks)
|
||||
logger.info(f"Overall statistics, considering all {self.archives_count} archives in this repository:")
|
||||
logger.info(
|
||||
f"Source data size was {format_file_size(self.total_size, precision=0)} in {self.total_files} files."
|
||||
)
|
||||
dsize = 0
|
||||
for id in self.repository_chunks:
|
||||
if id in self.used_chunks:
|
||||
dsize += self.used_chunks[id]
|
||||
elif id in self.wanted_chunks:
|
||||
dsize += self.wanted_chunks[id]
|
||||
else:
|
||||
raise KeyError(bin_to_hex(id))
|
||||
logger.info(f"Repository size is {format_file_size(self.repository_size, precision=0)} in {count} objects.")
|
||||
if self.total_size != 0:
|
||||
logger.info(f"Space reduction factor due to deduplication: {dsize / self.total_size:.3f}")
|
||||
if dsize != 0:
|
||||
logger.info(f"Space reduction factor due to compression: {self.repository_size / dsize:.3f}")
|
||||
logger.info(f"Repository size is {format_file_size(repo_size_after, precision=0)} in {count} objects.")
|
||||
logger.info(f"Compaction saved {format_file_size(repo_size_before - repo_size_after, precision=0)}.")
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import configparser
|
||||
import io
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
|
@ -30,7 +31,7 @@ from .crypto.file_integrity import IntegrityCheckedFile, FileIntegrityError
|
|||
from .manifest import Manifest
|
||||
from .platform import SaveFile
|
||||
from .remote import RemoteRepository
|
||||
from .repository import LIST_SCAN_LIMIT, Repository
|
||||
from .repository import LIST_SCAN_LIMIT, Repository, StoreObjectNotFound
|
||||
|
||||
# chunks is a list of ChunkListEntry
|
||||
FileCacheEntry = namedtuple("FileCacheEntry", "age inode size ctime mtime chunks")
|
||||
|
@ -618,7 +619,64 @@ class FilesCacheMixin:
|
|||
)
|
||||
|
||||
|
||||
def build_chunkindex_from_repo(repository):
|
||||
def load_chunks_hash(repository) -> bytes:
|
||||
try:
|
||||
hash = repository.store_load("cache/chunks_hash")
|
||||
logger.debug(f"cache/chunks_hash is '{bin_to_hex(hash)}'.")
|
||||
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
||||
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
||||
hash = b""
|
||||
logger.debug("cache/chunks_hash missing!")
|
||||
return hash
|
||||
|
||||
|
||||
def write_chunkindex_to_repo_cache(repository, chunks, *, compact=False, clear=False, force_write=False):
|
||||
cached_hash = load_chunks_hash(repository)
|
||||
if compact:
|
||||
# if we don't need the in-memory chunks index anymore:
|
||||
chunks.compact() # vacuum the hash table
|
||||
with io.BytesIO() as f:
|
||||
chunks.write(f)
|
||||
data = f.getvalue()
|
||||
if clear:
|
||||
# if we don't need the in-memory chunks index anymore:
|
||||
chunks.clear() # free memory, immediately
|
||||
new_hash = xxh64(data)
|
||||
if force_write or new_hash != cached_hash:
|
||||
# when an updated chunks index is stored into the cache, we also store its hash into the cache.
|
||||
# when a client is loading the chunks index from a cache, it has to compare its xxh64
|
||||
# hash against cache/chunks_hash in the repository. if it is the same, the cache
|
||||
# is valid. If it is different, the cache is either corrupted or out of date and
|
||||
# has to be discarded.
|
||||
# when some functionality is DELETING chunks from the repository, it has to either update
|
||||
# both cache/chunks and cache/chunks_hash (like borg compact does) or it has to delete both,
|
||||
# so that all clients will discard any client-local chunks index caches.
|
||||
logger.debug(f"caching chunks index {bin_to_hex(new_hash)} in repository...")
|
||||
repository.store_store("cache/chunks", data)
|
||||
repository.store_store("cache/chunks_hash", new_hash)
|
||||
return new_hash
|
||||
|
||||
|
||||
def build_chunkindex_from_repo(repository, *, disable_caches=False, cache_immediately=False):
|
||||
chunks = None
|
||||
# first, try to load a pre-computed and centrally cached chunks index:
|
||||
if not disable_caches:
|
||||
wanted_hash = load_chunks_hash(repository)
|
||||
logger.debug(f"trying to get cached chunk index (id {bin_to_hex(wanted_hash)}) from the repo...")
|
||||
try:
|
||||
chunks_data = repository.store_load("cache/chunks")
|
||||
except (Repository.ObjectNotFound, StoreObjectNotFound):
|
||||
# TODO: ^ seem like RemoteRepository raises Repository.ONF instead of StoreONF
|
||||
logger.debug("cache/chunks not found in the repository.")
|
||||
else:
|
||||
if xxh64(chunks_data) == wanted_hash:
|
||||
logger.debug("cache/chunks is valid.")
|
||||
with io.BytesIO(chunks_data) as f:
|
||||
chunks = ChunkIndex.read(f)
|
||||
return chunks
|
||||
else:
|
||||
logger.debug("cache/chunks is invalid.")
|
||||
# if we didn't get anything from the cache, compute the ChunkIndex the slow way:
|
||||
logger.debug("querying the chunk IDs list from the repo...")
|
||||
chunks = ChunkIndex()
|
||||
t0 = perf_counter()
|
||||
|
@ -646,6 +704,9 @@ def build_chunkindex_from_repo(repository):
|
|||
# Protocol overhead is neglected in this calculation.
|
||||
speed = format_file_size(num_chunks * 34 / duration)
|
||||
logger.debug(f"queried {num_chunks} chunk IDs in {duration} s ({num_requests} requests), ~{speed}/s")
|
||||
if cache_immediately:
|
||||
# immediately update cache/chunks, so we only rarely have to do it the slow way:
|
||||
write_chunkindex_to_repo_cache(repository, chunks, compact=False, clear=False, force_write=True)
|
||||
return chunks
|
||||
|
||||
|
||||
|
@ -660,7 +721,7 @@ class ChunksMixin:
|
|||
@property
|
||||
def chunks(self):
|
||||
if self._chunks is None:
|
||||
self._chunks = build_chunkindex_from_repo(self.repository)
|
||||
self._chunks = build_chunkindex_from_repo(self.repository, cache_immediately=True)
|
||||
return self._chunks
|
||||
|
||||
def seen_chunk(self, id, size=None):
|
||||
|
@ -709,6 +770,11 @@ class ChunksMixin:
|
|||
stats.update(size, not exists)
|
||||
return ChunkListEntry(id, size)
|
||||
|
||||
def _write_chunks_cache(self, chunks):
|
||||
# this is called from .close, so we can clear/compact here:
|
||||
write_chunkindex_to_repo_cache(self.repository, self._chunks, compact=True, clear=True)
|
||||
self._chunks = None # nothing there (cleared!)
|
||||
|
||||
|
||||
class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
||||
"""
|
||||
|
@ -794,6 +860,9 @@ class AdHocWithFilesCache(FilesCacheMixin, ChunksMixin):
|
|||
pi.output("Saving files cache")
|
||||
integrity_data = self._write_files_cache(self._files)
|
||||
self.cache_config.integrity[self.files_cache_name()] = integrity_data
|
||||
if self._chunks is not None:
|
||||
pi.output("Saving chunks cache")
|
||||
self._write_chunks_cache(self._chunks) # cache/chunks in repo has a different integrity mechanism
|
||||
pi.output("Saving cache config")
|
||||
self.cache_config.save(self.manifest)
|
||||
self.cache_config.close()
|
||||
|
|
Loading…
Add table
Reference in a new issue