archive: create FilesystemObjectProcessors class

This commit is contained in:
Marian Beermann 2017-07-29 16:11:33 +02:00
parent 5abfa0b266
commit c93dba0195
2 changed files with 144 additions and 94 deletions

View File

@ -314,10 +314,8 @@ class Archive:
self.create = create
if self.create:
self.items_buffer = CacheChunkBuffer(self.cache, self.key, self.stats)
self.chunker = Chunker(self.key.chunk_seed, *chunker_params)
if name in manifest.archives:
raise self.AlreadyExists(name)
self.last_checkpoint = time.monotonic()
i = 0
while True:
self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
@ -809,6 +807,31 @@ Utilization of max. archive size: {csize_max:.0%}
logger.warning('forced deletion succeeded, but the deleted archive was corrupted.')
logger.warning('borg check --repair is required to free all space.')
@staticmethod
def list_archives(repository, key, manifest, cache=None):
# expensive! see also Manifest.list_archive_infos.
for name in manifest.archives:
yield Archive(repository, key, manifest, name, cache=cache)
@staticmethod
def _open_rb(path):
try:
# if we have O_NOATIME, this likely will succeed if we are root or owner of file:
return os.open(path, flags_noatime)
except PermissionError:
if flags_noatime == flags_normal:
# we do not have O_NOATIME, no need to try again:
raise
# Was this EPERM due to the O_NOATIME flag? Try again without it:
return os.open(path, flags_normal)
class MetadataCollector:
def __init__(self, *, noatime, noctime, numeric_owner):
self.noatime = noatime
self.noctime = noctime
self.numeric_owner = numeric_owner
def stat_simple_attrs(self, st):
attrs = dict(
mode=st.st_mode,
@ -847,6 +870,82 @@ Utilization of max. archive size: {csize_max:.0%}
attrs.update(self.stat_ext_attrs(st, path))
return attrs
class ChunksProcessor:
# Processes an iterator of chunks for an Item
def __init__(self, *, key, cache,
add_item, write_checkpoint,
checkpoint_interval):
self.key = key
self.cache = cache
self.add_item = add_item
self.write_checkpoint = write_checkpoint
self.checkpoint_interval = checkpoint_interval
self.last_checkpoint = time.monotonic()
def write_part_file(self, item, from_chunk, number):
item = Item(internal_dict=item.as_dict())
length = len(item.chunks)
# the item should only have the *additional* chunks we processed after the last partial item:
item.chunks = item.chunks[from_chunk:]
item.get_size(memorize=True)
item.path += '.borg_part_%d' % number
item.part = number
number += 1
self.add_item(item, show_progress=False)
self.write_checkpoint()
return length, number
def process_file_chunks(self, item, cache, stats, chunk_iter, chunk_processor=None):
if not chunk_processor:
def chunk_processor(data):
chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
self.cache.repository.async_response(wait=False)
return chunk_entry
item.chunks = []
from_chunk = 0
part_number = 1
for data in chunk_iter:
item.chunks.append(chunk_processor(data))
if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
self.last_checkpoint = time.monotonic()
else:
if part_number > 1:
if item.chunks[from_chunk:]:
# if we already have created a part item inside this file, we want to put the final
# chunks (if any) into a part item also (so all parts can be concatenated to get
# the complete file):
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
self.last_checkpoint = time.monotonic()
# if we created part files, we have referenced all chunks from the part files,
# but we also will reference the same chunks also from the final, complete file:
for chunk in item.chunks:
cache.chunk_incref(chunk.id, stats, size=chunk.size)
class FilesystemObjectProcessors:
# When ported to threading, then this doesn't need chunker, cache, key any more.
# write_checkpoint should then be in the item buffer,
# and process_file becomes a callback passed to __init__.
def __init__(self, *, metadata_collector, cache, key,
add_item, process_file_chunks,
chunker_params):
self.metadata_collector = metadata_collector
self.cache = cache
self.key = key
self.add_item = add_item
self.process_file_chunks = process_file_chunks
self.hard_links = {}
self.stats = Statistics() # threading: done by cache (including progress)
self.cwd = os.getcwd()
self.chunker = Chunker(key.chunk_seed, *chunker_params)
@contextmanager
def create_helper(self, path, st, status=None, hardlinkable=True):
safe_path = make_path_safe(path)
@ -869,18 +968,18 @@ Utilization of max. archive size: {csize_max:.0%}
def process_dir(self, path, st):
with self.create_helper(path, st, 'd', hardlinkable=False) as (item, status, hardlinked, hardlink_master):
item.update(self.stat_attrs(st, path))
item.update(self.metadata_collector.stat_attrs(st, path))
return status
def process_fifo(self, path, st):
with self.create_helper(path, st, 'f') as (item, status, hardlinked, hardlink_master): # fifo
item.update(self.stat_attrs(st, path))
item.update(self.metadata_collector.stat_attrs(st, path))
return status
def process_dev(self, path, st, dev_type):
with self.create_helper(path, st, dev_type) as (item, status, hardlinked, hardlink_master): # char/block device
item.rdev = st.st_rdev
item.update(self.stat_attrs(st, path))
item.update(self.metadata_collector.stat_attrs(st, path))
return status
def process_symlink(self, path, st):
@ -892,53 +991,9 @@ Utilization of max. archive size: {csize_max:.0%}
item.source = source
if st.st_nlink > 1:
logger.warning('hardlinked symlinks will be archived as non-hardlinked symlinks!')
item.update(self.stat_attrs(st, path))
item.update(self.metadata_collector.stat_attrs(st, path))
return status
def write_part_file(self, item, from_chunk, number):
item = Item(internal_dict=item.as_dict())
length = len(item.chunks)
# the item should only have the *additional* chunks we processed after the last partial item:
item.chunks = item.chunks[from_chunk:]
item.get_size(memorize=True)
item.path += '.borg_part_%d' % number
item.part = number
number += 1
self.add_item(item, show_progress=False)
self.write_checkpoint()
return length, number
def chunk_file(self, item, cache, stats, chunk_iter, chunk_processor=None):
if not chunk_processor:
def chunk_processor(data):
chunk_entry = cache.add_chunk(self.key.id_hash(data), data, stats, wait=False)
self.cache.repository.async_response(wait=False)
return chunk_entry
item.chunks = []
from_chunk = 0
part_number = 1
for data in chunk_iter:
item.chunks.append(chunk_processor(data))
if self.show_progress:
self.stats.show_progress(item=item, dt=0.2)
if self.checkpoint_interval and time.monotonic() - self.last_checkpoint > self.checkpoint_interval:
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
self.last_checkpoint = time.monotonic()
else:
if part_number > 1:
if item.chunks[from_chunk:]:
# if we already have created a part item inside this file, we want to put the final
# chunks (if any) into a part item also (so all parts can be concatenated to get
# the complete file):
from_chunk, part_number = self.write_part_file(item, from_chunk, part_number)
self.last_checkpoint = time.monotonic()
# if we created part files, we have referenced all chunks from the part files,
# but we also will reference the same chunks also from the final, complete file:
for chunk in item.chunks:
cache.chunk_incref(chunk.id, stats, size=chunk.size)
def process_stdin(self, path, cache):
uid, gid = 0, 0
t = int(time.time()) * 1000000000
@ -950,7 +1005,7 @@ Utilization of max. archive size: {csize_max:.0%}
mtime=t, atime=t, ctime=t,
)
fd = sys.stdin.buffer # binary
self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd)))
item.get_size(memorize=True)
self.stats.nfiles += 1
self.add_item(item)
@ -983,7 +1038,7 @@ Utilization of max. archive size: {csize_max:.0%}
else:
status = 'A' # regular file, added
item.hardlink_master = hardlinked
item.update(self.stat_simple_attrs(st))
item.update(self.metadata_collector.stat_simple_attrs(st))
# Only chunkify the file if needed
if chunks is not None:
item.chunks = chunks
@ -991,14 +1046,14 @@ Utilization of max. archive size: {csize_max:.0%}
with backup_io('open'):
fh = Archive._open_rb(path)
with os.fdopen(fh, 'rb') as fd:
self.chunk_file(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
self.process_file_chunks(item, cache, self.stats, backup_io_iter(self.chunker.chunkify(fd, fh)))
if not is_special_file:
# we must not memorize special files, because the contents of e.g. a
# block or char device will change without its mtime/size/inode changing.
cache.memorize_file(path_hash, st, [c.id for c in item.chunks])
status = status or 'M' # regular file, modified (if not 'A' already)
self.stats.nfiles += 1
item.update(self.stat_attrs(st, path))
item.update(self.metadata_collector.stat_attrs(st, path))
item.get_size(memorize=True)
if is_special_file:
# we processed a special file like a regular file. reflect that in mode,
@ -1006,24 +1061,6 @@ Utilization of max. archive size: {csize_max:.0%}
item.mode = stat.S_IFREG | stat.S_IMODE(item.mode)
return status
@staticmethod
def list_archives(repository, key, manifest, cache=None):
# expensive! see also Manifest.list_archive_infos.
for name in manifest.archives:
yield Archive(repository, key, manifest, name, cache=cache)
@staticmethod
def _open_rb(path):
try:
# if we have O_NOATIME, this likely will succeed if we are root or owner of file:
return os.open(path, flags_noatime)
except PermissionError:
if flags_noatime == flags_normal:
# we do not have O_NOATIME, no need to try again:
raise
# Was this EPERM due to the O_NOATIME flag? Try again without it:
return os.open(path, flags_normal)
def valid_msgpacked_dict(d, keys_serialized):
"""check if the data <d> looks like a msgpacked dict"""
@ -1663,7 +1700,7 @@ class ArchiveRecreater:
return item.chunks
chunk_iterator = self.iter_chunks(archive, target, list(item.chunks))
chunk_processor = partial(self.chunk_processor, target)
target.chunk_file(item, self.cache, target.stats, chunk_iterator, chunk_processor)
target.process_file_chunks(item, self.cache, target.stats, chunk_iterator, chunk_processor)
def chunk_processor(self, target, data):
chunk_id = self.key.id_hash(data)
@ -1759,6 +1796,11 @@ class ArchiveRecreater:
target.recreate_rechunkify = self.rechunkify and source_chunker_params != target.chunker_params
if target.recreate_rechunkify:
logger.debug('Rechunking archive from %s to %s', source_chunker_params or '(unknown)', target.chunker_params)
target.process_file_chunks = ChunksProcessor(
cache=self.cache, key=self.key,
add_item=target.add_item, write_checkpoint=target.write_checkpoint,
checkpoint_interval=self.checkpoint_interval).process_file_chunks
target.chunker = Chunker(self.key.chunk_seed, *target.chunker_params)
return target
def create_target_archive(self, name):

View File

@ -33,10 +33,10 @@ import msgpack
import borg
from . import __version__
from . import helpers
from . import shellpattern
from .algorithms.checksums import crc32
from .archive import Archive, ArchiveChecker, ArchiveRecreater, Statistics, is_special
from .archive import BackupOSError, backup_io
from .archive import FilesystemObjectProcessors, MetadataCollector, ChunksProcessor
from .cache import Cache, assert_secure
from .constants import * # NOQA
from .compress import CompressionSpec
@ -448,7 +448,7 @@ class Archiver:
matcher = PatternMatcher(fallback=True)
matcher.add_inclexcl(args.patterns)
def create_inner(archive, cache):
def create_inner(archive, cache, fso):
# Add cache dir to inode_skip list
skip_inodes = set()
try:
@ -468,7 +468,7 @@ class Archiver:
path = 'stdin'
if not dry_run:
try:
status = archive.process_stdin(path, cache)
status = fso.process_stdin(path, cache)
except BackupOSError as e:
status = 'E'
self.print_warning('%s: %s', path, e)
@ -486,7 +486,7 @@ class Archiver:
restrict_dev = st.st_dev
else:
restrict_dev = None
self._process(archive, cache, matcher, args.exclude_caches, args.exclude_if_present,
self._process(fso, cache, matcher, args.exclude_caches, args.exclude_if_present,
args.keep_exclude_tags, skip_inodes, path, restrict_dev,
read_special=args.read_special, dry_run=dry_run, st=st)
if not dry_run:
@ -523,12 +523,20 @@ class Archiver:
progress=args.progress,
chunker_params=args.chunker_params, start=t0, start_monotonic=t0_monotonic,
log_json=args.log_json)
create_inner(archive, cache)
metadata_collector = MetadataCollector(noatime=args.noatime, noctime=args.noctime,
numeric_owner=args.numeric_owner)
cp = ChunksProcessor(cache=cache, key=key,
add_item=archive.add_item, write_checkpoint=archive.write_checkpoint,
checkpoint_interval=args.checkpoint_interval)
fso = FilesystemObjectProcessors(metadata_collector=metadata_collector, cache=cache, key=key,
process_file_chunks=cp.process_file_chunks, add_item=archive.add_item,
chunker_params=args.chunker_params)
create_inner(archive, cache, fso)
else:
create_inner(None, None)
create_inner(None, None, None)
return self.exit_code
def _process(self, archive, cache, matcher, exclude_caches, exclude_if_present,
def _process(self, fso, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, path, restrict_dev,
read_special=False, dry_run=False, st=None):
"""
@ -566,33 +574,33 @@ class Archiver:
return
if stat.S_ISREG(st.st_mode):
if not dry_run:
status = archive.process_file(path, st, cache, self.ignore_inode)
status = fso.process_file(path, st, cache, self.ignore_inode)
elif stat.S_ISDIR(st.st_mode):
if recurse:
tag_paths = dir_is_tagged(path, exclude_caches, exclude_if_present)
if tag_paths:
if keep_exclude_tags and not dry_run:
archive.process_dir(path, st)
fso.process_dir(path, st)
for tag_path in tag_paths:
self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
self._process(fso, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, tag_path, restrict_dev,
read_special=read_special, dry_run=dry_run)
return
if not dry_run:
if not recurse_excluded_dir:
status = archive.process_dir(path, st)
status = fso.process_dir(path, st)
if recurse:
with backup_io('scandir'):
entries = helpers.scandir_inorder(path)
for dirent in entries:
normpath = os.path.normpath(dirent.path)
self._process(archive, cache, matcher, exclude_caches, exclude_if_present,
self._process(fso, cache, matcher, exclude_caches, exclude_if_present,
keep_exclude_tags, skip_inodes, normpath, restrict_dev,
read_special=read_special, dry_run=dry_run)
elif stat.S_ISLNK(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_symlink(path, st)
status = fso.process_symlink(path, st)
else:
try:
st_target = os.stat(path)
@ -601,27 +609,27 @@ class Archiver:
else:
special = is_special(st_target.st_mode)
if special:
status = archive.process_file(path, st_target, cache)
status = fso.process_file(path, st_target, cache)
else:
status = archive.process_symlink(path, st)
status = fso.process_symlink(path, st)
elif stat.S_ISFIFO(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_fifo(path, st)
status = fso.process_fifo(path, st)
else:
status = archive.process_file(path, st, cache)
status = fso.process_file(path, st, cache)
elif stat.S_ISCHR(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_dev(path, st, 'c')
status = fso.process_dev(path, st, 'c')
else:
status = archive.process_file(path, st, cache)
status = fso.process_file(path, st, cache)
elif stat.S_ISBLK(st.st_mode):
if not dry_run:
if not read_special:
status = archive.process_dev(path, st, 'b')
status = fso.process_dev(path, st, 'b')
else:
status = archive.process_file(path, st, cache)
status = fso.process_file(path, st, cache)
elif stat.S_ISSOCK(st.st_mode):
# Ignore unix sockets
return