1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2024-12-21 23:33:07 +00:00

repo-compress: reduce memory consumption

Worst (but frequent) case here is that all or most of the chunks
in the repo need to get recompressed, thus storing all chunk ids
in a python list would need significant amounts of memory for
large repositories.

We already have all chunk ids stored in cache.chunks, so we now just
flag the ones needing re-compression by setting the F_COMPRESS flag
(that does not need any additional memory).
This commit is contained in:
Thomas Waldmann 2024-11-15 10:42:07 +01:00
parent b6ae924f30
commit f7dea6e93f
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01
3 changed files with 19 additions and 13 deletions

View file

@ -4,6 +4,7 @@
from ._common import with_repository, Highlander
from ..constants import * # NOQA
from ..compress import CompressionSpec, ObfuscateSize, Auto, COMPRESSOR_TABLE
from ..hashindex import ChunkIndex
from ..helpers import sig_int, ProgressIndicatorPercent, Error
from ..repository import Repository
from ..remote import RemoteRepository
@ -15,20 +16,22 @@
def find_chunks(repository, repo_objs, cache, stats, ctype, clevel, olevel):
"""find chunks that need processing (usually: recompression)."""
recompress_ids = []
"""find and flag chunks that need processing (usually: recompression)."""
compr_keys = stats["compr_keys"] = set()
compr_wanted = ctype, clevel, olevel
for id, _ in cache.chunks.iteritems():
recompress_count = 0
for id, cie in cache.chunks.iteritems():
chunk_no_data = repository.get(id, read_data=False)
meta = repo_objs.parse_meta(id, chunk_no_data, ro_type=ROBJ_DONTCARE)
compr_found = meta["ctype"], meta["clevel"], meta.get("olevel", -1)
if compr_found != compr_wanted:
recompress_ids.append(id)
flags_compress = cie.flags | ChunkIndex.F_COMPRESS
cache.chunks[id] = cie._replace(flags=flags_compress)
recompress_count += 1
compr_keys.add(compr_found)
stats[compr_found] += 1
stats["checked_count"] += 1
return recompress_ids
return recompress_count
def process_chunks(repository, repo_objs, stats, recompress_ids, olevel):
@ -104,19 +107,20 @@ def get_csettings(c):
stats_find = defaultdict(int)
stats_process = defaultdict(int)
recompress_ids = find_chunks(repository, repo_objs, cache, stats_find, ctype, clevel, olevel)
recompress_candidate_count = len(recompress_ids)
chunks_limit = min(1000, max(100, recompress_candidate_count // 1000))
recompress_candidate_count = find_chunks(repository, repo_objs, cache, stats_find, ctype, clevel, olevel)
pi = ProgressIndicatorPercent(
total=len(recompress_ids), msg="Recompressing %3.1f%%", step=0.1, msgid="repo_compress.process_chunks"
total=recompress_candidate_count,
msg="Recompressing %3.1f%%",
step=0.1,
msgid="repo_compress.process_chunks",
)
while recompress_ids:
for id, cie in cache.chunks.iteritems():
if sig_int and sig_int.action_done():
break
ids, recompress_ids = recompress_ids[:chunks_limit], recompress_ids[chunks_limit:]
process_chunks(repository, repo_objs, stats_process, ids, olevel)
pi.show(increase=len(ids))
if cie.flags & ChunkIndex.F_COMPRESS:
process_chunks(repository, repo_objs, stats_process, [id], olevel)
pi.show()
pi.finish()
if sig_int:
# Ctrl-C / SIGINT: do not commit

View file

@ -13,6 +13,7 @@ CIE = Union[Tuple[int, int], Type[ChunkIndexEntry]]
class ChunkIndex:
F_NONE: int
F_USED: int
F_COMPRESS: int
F_NEW: int
M_USER: int
M_SYSTEM: int

View file

@ -47,6 +47,7 @@ class ChunkIndex(HTProxyMixin, MutableMapping):
M_SYSTEM = 0xff000000 # mask for system flags
# user flags:
F_USED = 2 ** 0 # chunk is used/referenced
F_COMPRESS = 2 ** 1 # chunk shall get (re-)compressed
# system flags (internal use, always 0 to user, not changeable by user):
F_NEW = 2 ** 24 # a new chunk that is not present in repo/cache/chunks.* yet.