mirror of
https://github.com/borgbackup/borg.git
synced 2025-03-12 07:08:47 +00:00
transfer: do not transfer replacement chunks, deal with missing chunks in other_repo
This commit is contained in:
parent
9ed75ca405
commit
fa70d2f9c7
1 changed files with 55 additions and 40 deletions
|
@ -9,6 +9,8 @@ from ..helpers import Error
|
|||
from ..helpers import location_validator, Location, archivename_validator, comment_validator
|
||||
from ..helpers import format_file_size, bin_to_hex
|
||||
from ..manifest import Manifest
|
||||
from ..legacyrepository import LegacyRepository
|
||||
from ..repository import Repository
|
||||
|
||||
from ..logger import create_logger
|
||||
|
||||
|
@ -111,51 +113,64 @@ class TransferMixIn:
|
|||
# so let's remove them from old archives also, considering there is no
|
||||
# code any more that deals with them in special ways (e.g. to get stats right).
|
||||
continue
|
||||
if "chunks" in item:
|
||||
if "chunks_healthy" in item: # legacy
|
||||
other_chunks = item.chunks_healthy # chunks_healthy has the CORRECT chunks list, if present.
|
||||
elif "chunks" in item:
|
||||
other_chunks = item.chunks
|
||||
else:
|
||||
other_chunks = None
|
||||
if other_chunks is not None:
|
||||
chunks = []
|
||||
for chunk_id, size in item.chunks:
|
||||
for chunk_id, size in other_chunks:
|
||||
chunk_present = cache.seen_chunk(chunk_id, size)
|
||||
if not chunk_present: # target repo does not yet have this chunk
|
||||
if not dry_run:
|
||||
cdata = other_repository.get(chunk_id)
|
||||
if args.recompress == "never":
|
||||
# keep compressed payload same, verify via assert_id (that will
|
||||
# decompress, but avoid needing to compress it again):
|
||||
meta, data = other_manifest.repo_objs.parse(
|
||||
chunk_id,
|
||||
cdata,
|
||||
decompress=True,
|
||||
want_compressed=True,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
meta, data = upgrader.upgrade_compressed_chunk(meta, data)
|
||||
chunk_entry = cache.add_chunk(
|
||||
chunk_id,
|
||||
meta,
|
||||
data,
|
||||
stats=archive.stats,
|
||||
wait=False,
|
||||
compress=False,
|
||||
size=size,
|
||||
ctype=meta["ctype"],
|
||||
clevel=meta["clevel"],
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
elif args.recompress == "always":
|
||||
# always decompress and re-compress file data chunks
|
||||
meta, data = other_manifest.repo_objs.parse(
|
||||
chunk_id, cdata, ro_type=ROBJ_FILE_STREAM
|
||||
)
|
||||
chunk_entry = cache.add_chunk(
|
||||
chunk_id,
|
||||
meta,
|
||||
data,
|
||||
stats=archive.stats,
|
||||
wait=False,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
try:
|
||||
cdata = other_repository.get(chunk_id)
|
||||
except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound):
|
||||
# missing correct chunk in other_repository (source) will result in
|
||||
# a missing chunk in repository (destination).
|
||||
# we do NOT want to transfer all-zero replacement chunks from borg1 repos.
|
||||
pass
|
||||
else:
|
||||
raise ValueError(f"unsupported recompress mode: {args.recompress}")
|
||||
if args.recompress == "never":
|
||||
# keep compressed payload same, verify via assert_id (that will
|
||||
# decompress, but avoid needing to compress it again):
|
||||
meta, data = other_manifest.repo_objs.parse(
|
||||
chunk_id,
|
||||
cdata,
|
||||
decompress=True,
|
||||
want_compressed=True,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
meta, data = upgrader.upgrade_compressed_chunk(meta, data)
|
||||
chunk_entry = cache.add_chunk(
|
||||
chunk_id,
|
||||
meta,
|
||||
data,
|
||||
stats=archive.stats,
|
||||
wait=False,
|
||||
compress=False,
|
||||
size=size,
|
||||
ctype=meta["ctype"],
|
||||
clevel=meta["clevel"],
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
elif args.recompress == "always":
|
||||
# always decompress and re-compress file data chunks
|
||||
meta, data = other_manifest.repo_objs.parse(
|
||||
chunk_id, cdata, ro_type=ROBJ_FILE_STREAM
|
||||
)
|
||||
chunk_entry = cache.add_chunk(
|
||||
chunk_id,
|
||||
meta,
|
||||
data,
|
||||
stats=archive.stats,
|
||||
wait=False,
|
||||
ro_type=ROBJ_FILE_STREAM,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"unsupported recompress mode: {args.recompress}")
|
||||
cache.repository.async_response(wait=False)
|
||||
chunks.append(chunk_entry)
|
||||
transfer_size += size
|
||||
|
|
Loading…
Add table
Reference in a new issue