Merge pull request #6809 from ThomasWaldmann/refactor-transfer-upgrades-borg2

transfer: use an Upgrader class
This commit is contained in:
TW 2022-06-28 22:40:28 +02:00 committed by GitHub
commit a44dbc1dc8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 71 deletions

View File

@ -347,72 +347,22 @@ class Archiver:
def do_transfer(self, args, *, def do_transfer(self, args, *,
repository, manifest, key, cache, repository, manifest, key, cache,
other_repository=None, other_manifest=None, other_key=None): other_repository=None, other_manifest=None, other_key=None):
"""archives transfer from other repository""" """archives transfer from other repository, optionally upgrade data format"""
ITEM_KEY_WHITELIST = {'path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hlid',
'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime', 'birthtime', 'size',
'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
'part'}
def upgrade_item(item):
"""upgrade item as needed, get rid of legacy crap"""
if hlm.borg1_hardlink_master(item):
item._dict['hlid'] = hlid = hlm.hardlink_id_from_path(item._dict['path'])
hlm.remember(id=hlid, info=(item._dict.get('chunks'), item._dict.get('chunks_healthy')))
elif hlm.borg1_hardlink_slave(item):
item._dict['hlid'] = hlid = hlm.hardlink_id_from_path(item._dict['source'])
chunks, chunks_healthy = hlm.retrieve(id=hlid, default=(None, None))
if chunks is not None:
item._dict['chunks'] = chunks
for chunk_id, _ in chunks:
cache.chunk_incref(chunk_id, archive.stats)
if chunks_healthy is not None:
item._dict['chunks_healthy'] = chunks
item._dict.pop('source') # not used for hardlinks any more, replaced by hlid
# make sure we only have desired stuff in the new item. specifically, make sure to get rid of:
# - 'acl' remnants of bug in attic <= 0.13
# - 'hardlink_master' (superseded by hlid)
new_item_dict = {key: value for key, value in item.as_dict().items() if key in ITEM_KEY_WHITELIST}
new_item = Item(internal_dict=new_item_dict)
new_item.get_size(memorize=True) # if not already present: compute+remember size for items with chunks
assert all(key in new_item for key in REQUIRED_ITEM_KEYS)
return new_item
def upgrade_compressed_chunk(chunk):
def upgrade_zlib_and_level(chunk):
if ZLIB_legacy.detect(chunk):
ctype = ZLIB.ID
chunk = ctype + level + chunk # get rid of the attic legacy: prepend separate type/level bytes
else:
ctype = chunk[0:1]
chunk = ctype + level + chunk[2:] # keep type same, but set level
return chunk
ctype = chunk[0:1]
level = b'\xFF' # FF means unknown compression level
if ctype == ObfuscateSize.ID:
# in older borg, we used unusual byte order
old_header_fmt = Struct('>I')
new_header_fmt = ObfuscateSize.header_fmt
length = ObfuscateSize.header_len
size_bytes = chunk[2:2+length]
size = old_header_fmt.unpack(size_bytes)
size_bytes = new_header_fmt.pack(size)
compressed = chunk[2+length:]
compressed = upgrade_zlib_and_level(compressed)
chunk = ctype + level + size_bytes + compressed
else:
chunk = upgrade_zlib_and_level(chunk)
return chunk
dry_run = args.dry_run dry_run = args.dry_run
args.consider_checkpoints = True args.consider_checkpoints = True
archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args)) archive_names = tuple(x.name for x in other_manifest.archives.list_considering(args))
if not archive_names: if not archive_names:
return EXIT_SUCCESS return EXIT_SUCCESS
from . import upgrade as upgrade_mod
try:
UpgraderCls = getattr(upgrade_mod, f'Upgrader{args.upgrader}')
except AttributeError:
self.print_error(f'No such upgrader: {args.upgrader}')
return EXIT_ERROR
upgrader = UpgraderCls(cache=cache)
for name in archive_names: for name in archive_names:
transfer_size = 0 transfer_size = 0
present_size = 0 present_size = 0
@ -421,9 +371,9 @@ class Archiver:
else: else:
if not dry_run: if not dry_run:
print(f"{name}: copying archive to destination repo...") print(f"{name}: copying archive to destination repo...")
hlm = HardLinkManager(id_type=bytes, info_type=tuple) # hlid -> (chunks, chunks_healthy)
other_archive = Archive(other_repository, other_key, other_manifest, name) other_archive = Archive(other_repository, other_key, other_manifest, name)
archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None archive = Archive(repository, key, manifest, name, cache=cache, create=True) if not dry_run else None
upgrader.new_archive(archive=archive)
for item in other_archive.iter_items(): for item in other_archive.iter_items():
if 'chunks' in item: if 'chunks' in item:
chunks = [] chunks = []
@ -434,7 +384,7 @@ class Archiver:
cdata = other_repository.get(chunk_id) cdata = other_repository.get(chunk_id)
# keep compressed payload same, avoid decompression / recompression # keep compressed payload same, avoid decompression / recompression
data = other_key.decrypt(chunk_id, cdata, decompress=False) data = other_key.decrypt(chunk_id, cdata, decompress=False)
data = upgrade_compressed_chunk(data) data = upgrader.upgrade_compressed_chunk(chunk=data)
chunk_entry = cache.add_chunk(chunk_id, data, archive.stats, wait=False, chunk_entry = cache.add_chunk(chunk_id, data, archive.stats, wait=False,
compress=False, size=size) compress=False, size=size)
cache.repository.async_response(wait=False) cache.repository.async_response(wait=False)
@ -449,15 +399,9 @@ class Archiver:
item.chunks = chunks # TODO: overwrite? IDs and sizes are same. item.chunks = chunks # TODO: overwrite? IDs and sizes are same.
archive.stats.nfiles += 1 archive.stats.nfiles += 1
if not dry_run: if not dry_run:
archive.add_item(upgrade_item(item)) archive.add_item(upgrader.upgrade_item(item=item))
if not dry_run: if not dry_run:
additional_metadata = {} additional_metadata = upgrader.upgrade_archive_metadata(metadata=other_archive.metadata)
# keep all metadata except archive version and stats. also do not keep
# recreate_source_id, recreate_args, recreate_partial_chunks which were used only in 1.1.0b1 .. b2.
for attr in ('cmdline', 'hostname', 'username', 'time', 'time_end', 'comment',
'chunker_params', 'recreate_cmdline'):
if hasattr(other_archive.metadata, attr):
additional_metadata[attr] = getattr(other_archive.metadata, attr)
archive.save(stats=archive.stats, additional_metadata=additional_metadata) archive.save(stats=archive.stats, additional_metadata=additional_metadata)
print(f"{name}: finished. " print(f"{name}: finished. "
f"transfer_size: {format_file_size(transfer_size)} " f"transfer_size: {format_file_size(transfer_size)} "
@ -4057,8 +4001,9 @@ class Archiver:
# borg transfer # borg transfer
transfer_epilog = process_epilog(""" transfer_epilog = process_epilog("""
This command transfers archives from one repository to another repository. This command transfers archives from one repository to another repository.
Optionally, it can also upgrade the transferred data.
Suggested use:: Suggested use for general purpose archive transfer (not repo upgrades)::
# initialize DST_REPO reusing key material from SRC_REPO, so that # initialize DST_REPO reusing key material from SRC_REPO, so that
# chunking and chunk id generation will work in the same way as before. # chunking and chunk id generation will work in the same way as before.
@ -4074,6 +4019,13 @@ class Archiver:
You could use the misc. archive filter options to limit which archives it will You could use the misc. archive filter options to limit which archives it will
transfer, e.g. using the -a option. This is recommended for big transfer, e.g. using the -a option. This is recommended for big
repositories with multiple data sets to keep the runtime per invocation lower. repositories with multiple data sets to keep the runtime per invocation lower.
For repository upgrades (e.g. from a borg 1.2 repo to a related borg 2.0 repo), usage is
quite similar to the above::
borg --repo=DST_REPO transfer --other-repo=SRC_REPO --upgrader=From12To20
""") """)
subparser = subparsers.add_parser('transfer', parents=[common_parser], add_help=False, subparser = subparsers.add_parser('transfer', parents=[common_parser], add_help=False,
description=self.do_transfer.__doc__, description=self.do_transfer.__doc__,
@ -4086,6 +4038,9 @@ class Archiver:
subparser.add_argument('--other-repo', metavar='SRC_REPOSITORY', dest='other_location', subparser.add_argument('--other-repo', metavar='SRC_REPOSITORY', dest='other_location',
type=location_validator(other=True), default=Location(other=True), type=location_validator(other=True), default=Location(other=True),
help='transfer archives from the other repository') help='transfer archives from the other repository')
subparser.add_argument('--upgrader', metavar='UPGRADER', dest='upgrader',
type=str, default='NoOp',
help='use the upgrader to convert transferred data (default: no conversion)')
define_archive_filters_group(subparser) define_archive_filters_group(subparser)
# borg diff # borg diff

108
src/borg/upgrade.py Normal file
View File

@ -0,0 +1,108 @@
from struct import Struct
from .constants import REQUIRED_ITEM_KEYS
from .compress import ZLIB, ZLIB_legacy, ObfuscateSize
from .helpers import HardLinkManager
from .item import Item
from .logger import create_logger
logger = create_logger(__name__)
class UpgraderNoOp:
def __init__(self, *, cache):
pass
def new_archive(self, *, archive):
pass
def upgrade_item(self, *, item):
return item
def upgrade_compressed_chunk(self, *, chunk):
return chunk
def upgrade_archive_metadata(self, *, metadata):
new_metadata = {}
# keep all metadata except archive version and stats.
for attr in ('cmdline', 'hostname', 'username', 'time', 'time_end', 'comment',
'chunker_params', 'recreate_cmdline'):
if hasattr(metadata, attr):
new_metadata[attr] = getattr(metadata, attr)
return new_metadata
class UpgraderFrom12To20:
def __init__(self, *, cache):
self.cache = cache
def new_archive(self, *, archive):
self.archive = archive
self.hlm = HardLinkManager(id_type=bytes, info_type=tuple) # hlid -> (chunks, chunks_healthy)
def upgrade_item(self, *, item):
"""upgrade item as needed, get rid of legacy crap"""
ITEM_KEY_WHITELIST = {'path', 'source', 'rdev', 'chunks', 'chunks_healthy', 'hlid',
'mode', 'user', 'group', 'uid', 'gid', 'mtime', 'atime', 'ctime', 'birthtime', 'size',
'xattrs', 'bsdflags', 'acl_nfs4', 'acl_access', 'acl_default', 'acl_extended',
'part'}
if self.hlm.borg1_hardlink_master(item):
item._dict['hlid'] = hlid = self.hlm.hardlink_id_from_path(item._dict['path'])
self.hlm.remember(id=hlid, info=(item._dict.get('chunks'), item._dict.get('chunks_healthy')))
elif self.hlm.borg1_hardlink_slave(item):
item._dict['hlid'] = hlid = self.hlm.hardlink_id_from_path(item._dict['source'])
chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None))
if chunks is not None:
item._dict['chunks'] = chunks
for chunk_id, _ in chunks:
self.cache.chunk_incref(chunk_id, self.archive.stats)
if chunks_healthy is not None:
item._dict['chunks_healthy'] = chunks
item._dict.pop('source') # not used for hardlinks any more, replaced by hlid
# make sure we only have desired stuff in the new item. specifically, make sure to get rid of:
# - 'acl' remnants of bug in attic <= 0.13
# - 'hardlink_master' (superseded by hlid)
new_item_dict = {key: value for key, value in item.as_dict().items() if key in ITEM_KEY_WHITELIST}
new_item = Item(internal_dict=new_item_dict)
new_item.get_size(memorize=True) # if not already present: compute+remember size for items with chunks
assert all(key in new_item for key in REQUIRED_ITEM_KEYS)
return new_item
def upgrade_compressed_chunk(self, *, chunk):
def upgrade_zlib_and_level(chunk):
if ZLIB_legacy.detect(chunk):
ctype = ZLIB.ID
chunk = ctype + level + bytes(chunk) # get rid of the attic legacy: prepend separate type/level bytes
else:
ctype = bytes(chunk[0:1])
chunk = ctype + level + bytes(chunk[2:]) # keep type same, but set level
return chunk
ctype = chunk[0:1]
level = b'\xFF' # FF means unknown compression level
if ctype == ObfuscateSize.ID:
# in older borg, we used unusual byte order
old_header_fmt = Struct('>I')
new_header_fmt = ObfuscateSize.header_fmt
length = ObfuscateSize.header_len
size_bytes = chunk[2:2+length]
size = old_header_fmt.unpack(size_bytes)
size_bytes = new_header_fmt.pack(size)
compressed = chunk[2+length:]
compressed = upgrade_zlib_and_level(compressed)
chunk = ctype + level + size_bytes + compressed
else:
chunk = upgrade_zlib_and_level(chunk)
return chunk
def upgrade_archive_metadata(self, *, metadata):
new_metadata = {}
# keep all metadata except archive version and stats. also do not keep
# recreate_source_id, recreate_args, recreate_partial_chunks which were used only in 1.1.0b1 .. b2.
for attr in ('cmdline', 'hostname', 'username', 'time', 'time_end', 'comment',
'chunker_params', 'recreate_cmdline'):
if hasattr(metadata, attr):
new_metadata[attr] = getattr(metadata, attr)
return new_metadata