mirror of https://github.com/borgbackup/borg.git
731 lines
31 KiB
Python
731 lines
31 KiB
Python
import errno
|
|
import functools
|
|
import io
|
|
import os
|
|
import stat
|
|
import struct
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from collections import defaultdict
|
|
from signal import SIGINT
|
|
|
|
from .constants import ROBJ_FILE_STREAM
|
|
from .fuse_impl import llfuse, has_pyfuse3
|
|
|
|
|
|
if has_pyfuse3:
|
|
import trio
|
|
|
|
def async_wrapper(fn):
|
|
@functools.wraps(fn)
|
|
async def wrapper(*args, **kwargs):
|
|
return fn(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
else:
|
|
trio = None
|
|
|
|
def async_wrapper(fn):
|
|
return fn
|
|
|
|
|
|
from .logger import create_logger
|
|
|
|
logger = create_logger()
|
|
|
|
from .crypto.low_level import blake2b_128
|
|
from .archiver._common import build_matcher, build_filter
|
|
from .archive import Archive, get_item_uid_gid
|
|
from .hashindex import FuseVersionsIndex
|
|
from .helpers import daemonize, daemonizing, signal_handler, format_file_size
|
|
from .helpers import HardLinkManager
|
|
from .helpers import msgpack
|
|
from .helpers.lrucache import LRUCache
|
|
from .item import Item
|
|
from .platform import uid2user, gid2group
|
|
from .platformflags import is_darwin
|
|
from .remote import RemoteRepository
|
|
|
|
|
|
def fuse_main():
|
|
if has_pyfuse3:
|
|
try:
|
|
trio.run(llfuse.main)
|
|
except: # noqa
|
|
return 1 # TODO return signal number if it was killed by signal
|
|
else:
|
|
return None
|
|
else:
|
|
return llfuse.main(workers=1)
|
|
|
|
|
|
# size of some LRUCaches (1 element per simultaneously open file)
|
|
# note: _inode_cache might have rather large elements - Item.chunks can be large!
|
|
# also, simultaneously reading too many files should be avoided anyway.
|
|
# thus, do not set FILES to high values.
|
|
FILES = 4
|
|
|
|
|
|
class ItemCache:
|
|
"""
|
|
This is the "meat" of the file system's metadata storage.
|
|
|
|
This class generates inode numbers that efficiently index items in archives,
|
|
and retrieves items from these inode numbers.
|
|
"""
|
|
|
|
# 2 MiB are approximately ~230000 items (depends on the average number of items per metadata chunk).
|
|
#
|
|
# Since growing a bytearray has to copy it, growing it will converge to O(n^2), however,
|
|
# this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
|
|
# use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
|
|
# to resize it in the first place; that's free).
|
|
GROW_META_BY = 2 * 1024 * 1024
|
|
|
|
indirect_entry_struct = struct.Struct("=cII")
|
|
assert indirect_entry_struct.size == 9
|
|
|
|
def __init__(self, decrypted_repository):
|
|
self.decrypted_repository = decrypted_repository
|
|
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
|
|
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
|
|
# unices did this).
|
|
# The meta-array contains chunk IDs and item entries (described in iter_archive_items).
|
|
# The chunk IDs are referenced by item entries through relative offsets,
|
|
# which are bounded by the metadata chunk size.
|
|
self.meta = bytearray()
|
|
# The current write offset in self.meta
|
|
self.write_offset = 0
|
|
|
|
# Offset added to meta-indices, resulting in inodes,
|
|
# or subtracted from inodes, resulting in meta-indices.
|
|
# XXX: Merge FuseOperations.items and ItemCache to avoid
|
|
# this implicit limitation / hack (on the number of synthetic inodes, degenerate
|
|
# cases can inflate their number far beyond the number of archives).
|
|
self.offset = 1000000
|
|
|
|
# A temporary file that contains direct items, i.e. items directly cached in this layer.
|
|
# These are items that span more than one chunk and thus cannot be efficiently cached
|
|
# by the object cache (self.decrypted_repository), which would require variable-length structures;
|
|
# possible but not worth the effort, see iter_archive_items.
|
|
self.fd = tempfile.TemporaryFile(prefix="borg-tmp")
|
|
|
|
# A small LRU cache for chunks requested by ItemCache.get() from the object cache,
|
|
# this significantly speeds up directory traversal and similar operations which
|
|
# tend to re-read the same chunks over and over.
|
|
# The capacity is kept low because increasing it does not provide any significant advantage,
|
|
# but makes LRUCache's square behaviour noticeable and consumes more memory.
|
|
self.chunks = LRUCache(capacity=10)
|
|
|
|
# Instrumentation
|
|
# Count of indirect items, i.e. data is cached in the object cache, not directly in this cache
|
|
self.indirect_items = 0
|
|
# Count of direct items, i.e. data is in self.fd
|
|
self.direct_items = 0
|
|
|
|
def get(self, inode):
|
|
offset = inode - self.offset
|
|
if offset < 0:
|
|
raise ValueError("ItemCache.get() called with an invalid inode number")
|
|
if self.meta[offset] == ord(b"I"):
|
|
_, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
|
|
chunk_id_offset = offset - chunk_id_relative_offset
|
|
# bytearray slices are bytearrays as well, explicitly convert to bytes()
|
|
chunk_id = bytes(self.meta[chunk_id_offset : chunk_id_offset + 32])
|
|
chunk = self.chunks.get(chunk_id)
|
|
if not chunk:
|
|
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
|
|
self.chunks[chunk_id] = chunk
|
|
data = memoryview(chunk)[chunk_offset:]
|
|
unpacker = msgpack.Unpacker()
|
|
unpacker.feed(data)
|
|
return Item(internal_dict=next(unpacker))
|
|
elif self.meta[offset] == ord(b"S"):
|
|
fd_offset = int.from_bytes(self.meta[offset + 1 : offset + 9], "little")
|
|
self.fd.seek(fd_offset, io.SEEK_SET)
|
|
return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
|
|
else:
|
|
raise ValueError("Invalid entry type in self.meta")
|
|
|
|
def iter_archive_items(self, archive_item_ids, filter=None):
|
|
unpacker = msgpack.Unpacker()
|
|
|
|
# Current offset in the metadata stream, which consists of all metadata chunks glued together
|
|
stream_offset = 0
|
|
# Offset of the current chunk in the metadata stream
|
|
chunk_begin = 0
|
|
# Length of the chunk preceding the current chunk
|
|
last_chunk_length = 0
|
|
msgpacked_bytes = b""
|
|
|
|
write_offset = self.write_offset
|
|
meta = self.meta
|
|
pack_indirect_into = self.indirect_entry_struct.pack_into
|
|
|
|
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
|
|
# Store the chunk ID in the meta-array
|
|
if write_offset + 32 >= len(meta):
|
|
self.meta = meta = meta + bytes(self.GROW_META_BY)
|
|
meta[write_offset : write_offset + 32] = key
|
|
current_id_offset = write_offset
|
|
write_offset += 32
|
|
|
|
chunk_begin += last_chunk_length
|
|
last_chunk_length = len(data)
|
|
|
|
unpacker.feed(data)
|
|
while True:
|
|
try:
|
|
item = unpacker.unpack()
|
|
need_more_data = False
|
|
except msgpack.OutOfData:
|
|
need_more_data = True
|
|
|
|
start = stream_offset - chunk_begin
|
|
# tell() is not helpful for the need_more_data case, but we know it is the remainder
|
|
# of the data in that case. in the other case, tell() works as expected.
|
|
length = (len(data) - start) if need_more_data else (unpacker.tell() - stream_offset)
|
|
msgpacked_bytes += data[start : start + length]
|
|
stream_offset += length
|
|
|
|
if need_more_data:
|
|
# Need more data, feed the next chunk
|
|
break
|
|
|
|
item = Item(internal_dict=item)
|
|
if filter and not filter(item):
|
|
msgpacked_bytes = b""
|
|
continue
|
|
|
|
current_item = msgpacked_bytes
|
|
current_item_length = len(current_item)
|
|
current_spans_chunks = stream_offset - current_item_length < chunk_begin
|
|
msgpacked_bytes = b""
|
|
|
|
if write_offset + 9 >= len(meta):
|
|
self.meta = meta = meta + bytes(self.GROW_META_BY)
|
|
|
|
# item entries in the meta-array come in two different flavours, both nine bytes long.
|
|
# (1) for items that span chunks:
|
|
#
|
|
# 'S' + 8 byte offset into the self.fd file, where the msgpacked item starts.
|
|
#
|
|
# (2) for items that are completely contained in one chunk, which usually is the great majority
|
|
# (about 700:1 for system backups)
|
|
#
|
|
# 'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk
|
|
# where the msgpacked items starts
|
|
#
|
|
# The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.:
|
|
#
|
|
# |Chunk ID| .... |S1234abcd|
|
|
# ^------ offset ----------^
|
|
|
|
if current_spans_chunks:
|
|
pos = self.fd.seek(0, io.SEEK_END)
|
|
self.fd.write(current_item)
|
|
meta[write_offset : write_offset + 9] = b"S" + pos.to_bytes(8, "little")
|
|
self.direct_items += 1
|
|
else:
|
|
item_offset = stream_offset - current_item_length - chunk_begin
|
|
pack_indirect_into(meta, write_offset, b"I", write_offset - current_id_offset, item_offset)
|
|
self.indirect_items += 1
|
|
inode = write_offset + self.offset
|
|
write_offset += 9
|
|
|
|
yield inode, item
|
|
|
|
self.write_offset = write_offset
|
|
|
|
|
|
class FuseBackend:
|
|
"""Virtual filesystem based on archive(s) to provide information to fuse"""
|
|
|
|
def __init__(self, manifest, args, decrypted_repository):
|
|
self._args = args
|
|
self.numeric_ids = args.numeric_ids
|
|
self._manifest = manifest
|
|
self.repo_objs = manifest.repo_objs
|
|
self.repository_uncached = manifest.repository
|
|
# Maps inode numbers to Item instances. This is used for synthetic inodes, i.e. file-system objects that are
|
|
# made up and are not contained in the archives. For example archive directories or intermediate directories
|
|
# not contained in archives.
|
|
self._items = {}
|
|
# cache up to <FILES> Items
|
|
self._inode_cache = LRUCache(capacity=FILES)
|
|
# _inode_count is the current count of synthetic inodes, i.e. those in self._items
|
|
self.inode_count = 0
|
|
# Maps inode numbers to the inode number of the parent
|
|
self.parent = {}
|
|
# Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
|
|
# i.e. this contains all dirents of everything that is mounted. (It becomes really big).
|
|
self.contents = defaultdict(dict)
|
|
self.default_uid = os.getuid()
|
|
self.default_gid = os.getgid()
|
|
self.default_dir = None
|
|
# Archives to be loaded when first accessed, mapped by their placeholder inode
|
|
self.pending_archives = {}
|
|
self.cache = ItemCache(decrypted_repository)
|
|
self.allow_damaged_files = False
|
|
self.versions = False
|
|
self.uid_forced = None
|
|
self.gid_forced = None
|
|
self.umask = 0
|
|
|
|
def _create_filesystem(self):
|
|
self._create_dir(parent=1) # first call, create root dir (inode == 1)
|
|
self.versions_index = FuseVersionsIndex()
|
|
for archive in self._manifest.archives.list_considering(self._args):
|
|
if self.versions:
|
|
# process archives immediately
|
|
self._process_archive(archive.name)
|
|
else:
|
|
# lazily load archives, create archive placeholder inode
|
|
archive_inode = self._create_dir(parent=1, mtime=int(archive.ts.timestamp() * 1e9))
|
|
self.contents[1][os.fsencode(archive.name)] = archive_inode
|
|
self.pending_archives[archive_inode] = archive.name
|
|
|
|
def get_item(self, inode):
|
|
item = self._inode_cache.get(inode)
|
|
if item is not None:
|
|
return item
|
|
try:
|
|
# this is a cheap get-from-dictionary operation, no need to cache the result.
|
|
return self._items[inode]
|
|
except KeyError:
|
|
# while self.cache does some internal caching, it has still quite some overhead, so we cache the result.
|
|
item = self.cache.get(inode)
|
|
self._inode_cache[inode] = item
|
|
return item
|
|
|
|
def check_pending_archive(self, inode):
|
|
# Check if this is an archive we need to load
|
|
archive_name = self.pending_archives.pop(inode, None)
|
|
if archive_name is not None:
|
|
self._process_archive(archive_name, [os.fsencode(archive_name)])
|
|
|
|
def _allocate_inode(self):
|
|
self.inode_count += 1
|
|
return self.inode_count
|
|
|
|
def _create_dir(self, parent, mtime=None):
|
|
"""Create directory"""
|
|
ino = self._allocate_inode()
|
|
if mtime is not None:
|
|
self._items[ino] = Item(internal_dict=self.default_dir.as_dict())
|
|
self._items[ino].mtime = mtime
|
|
else:
|
|
self._items[ino] = self.default_dir
|
|
self.parent[ino] = parent
|
|
return ino
|
|
|
|
def find_inode(self, path, prefix=[]):
|
|
segments = prefix + path.split(b"/")
|
|
inode = 1
|
|
for segment in segments:
|
|
inode = self.contents[inode][segment]
|
|
return inode
|
|
|
|
def _process_archive(self, archive_name, prefix=[]):
|
|
"""Build FUSE inode hierarchy from archive metadata"""
|
|
self.file_versions = {} # for versions mode: original path -> version
|
|
t0 = time.perf_counter()
|
|
archive = Archive(self._manifest, archive_name)
|
|
strip_components = self._args.strip_components
|
|
matcher = build_matcher(self._args.patterns, self._args.paths)
|
|
hlm = HardLinkManager(id_type=bytes, info_type=str) # hlid -> path
|
|
|
|
filter = build_filter(matcher, strip_components)
|
|
for item_inode, item in self.cache.iter_archive_items(archive.metadata.items, filter=filter):
|
|
if strip_components:
|
|
item.path = os.sep.join(item.path.split(os.sep)[strip_components:])
|
|
path = os.fsencode(item.path)
|
|
is_dir = stat.S_ISDIR(item.mode)
|
|
if is_dir:
|
|
try:
|
|
# This can happen if an archive was created with a command line like
|
|
# $ borg create ... dir1/file dir1
|
|
# In this case the code below will have created a default_dir inode for dir1 already.
|
|
inode = self.find_inode(path, prefix)
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self._items[inode] = item
|
|
continue
|
|
segments = prefix + path.split(b"/")
|
|
parent = 1
|
|
for segment in segments[:-1]:
|
|
parent = self._process_inner(segment, parent)
|
|
self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode, hlm)
|
|
duration = time.perf_counter() - t0
|
|
logger.debug("fuse: _process_archive completed in %.1f s for archive %s", duration, archive.name)
|
|
|
|
def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode, hlm):
|
|
path = item.path
|
|
del item.path # save some space
|
|
|
|
def file_version(item, path):
|
|
if "chunks" in item:
|
|
file_id = blake2b_128(path)
|
|
current_version, previous_id = self.versions_index.get(file_id, (0, None))
|
|
|
|
contents_id = blake2b_128(b"".join(chunk_id for chunk_id, _ in item.chunks))
|
|
|
|
if contents_id != previous_id:
|
|
current_version += 1
|
|
self.versions_index[file_id] = current_version, contents_id
|
|
|
|
return current_version
|
|
|
|
def make_versioned_name(name, version, add_dir=False):
|
|
if add_dir:
|
|
# add intermediate directory with same name as filename
|
|
path_fname = name.rsplit(b"/", 1)
|
|
name += b"/" + path_fname[-1]
|
|
# keep original extension at end to avoid confusing tools
|
|
name, ext = os.path.splitext(name)
|
|
version_enc = os.fsencode(".%05d" % version)
|
|
return name + version_enc + ext
|
|
|
|
if "hlid" in item:
|
|
link_target = hlm.retrieve(id=item.hlid, default=None)
|
|
if link_target is not None:
|
|
# Hard link was extracted previously, just link
|
|
link_target = os.fsencode(link_target)
|
|
if self.versions:
|
|
# adjust link target name with version
|
|
version = self.file_versions[link_target]
|
|
link_target = make_versioned_name(link_target, version, add_dir=True)
|
|
try:
|
|
inode = self.find_inode(link_target, prefix)
|
|
except KeyError:
|
|
logger.warning("Skipping broken hard link: %s -> %s", path, link_target)
|
|
return
|
|
item = self.get_item(inode)
|
|
item.nlink = item.get("nlink", 1) + 1
|
|
self._items[inode] = item
|
|
else:
|
|
inode = item_inode
|
|
self._items[inode] = item
|
|
# remember extracted item path, so that following hardlinks don't extract twice.
|
|
hlm.remember(id=item.hlid, info=path)
|
|
else:
|
|
inode = item_inode
|
|
|
|
if self.versions and not is_dir:
|
|
parent = self._process_inner(name, parent)
|
|
enc_path = os.fsencode(path)
|
|
version = file_version(item, enc_path)
|
|
if version is not None:
|
|
# regular file, with contents
|
|
name = make_versioned_name(name, version)
|
|
self.file_versions[enc_path] = version
|
|
|
|
self.parent[inode] = parent
|
|
if name:
|
|
self.contents[parent][name] = inode
|
|
|
|
def _process_inner(self, name, parent_inode):
|
|
dir = self.contents[parent_inode]
|
|
if name in dir:
|
|
inode = dir[name]
|
|
else:
|
|
inode = self._create_dir(parent_inode)
|
|
if name:
|
|
dir[name] = inode
|
|
return inode
|
|
|
|
|
|
class FuseOperations(llfuse.Operations, FuseBackend):
|
|
"""Export archive as a FUSE filesystem"""
|
|
|
|
def __init__(self, manifest, args, decrypted_repository):
|
|
llfuse.Operations.__init__(self)
|
|
FuseBackend.__init__(self, manifest, args, decrypted_repository)
|
|
self.decrypted_repository = decrypted_repository
|
|
data_cache_capacity = int(os.environ.get("BORG_MOUNT_DATA_CACHE_ENTRIES", os.cpu_count() or 1))
|
|
logger.debug("mount data cache capacity: %d chunks", data_cache_capacity)
|
|
self.data_cache = LRUCache(capacity=data_cache_capacity)
|
|
self._last_pos = LRUCache(capacity=FILES)
|
|
|
|
def sig_info_handler(self, sig_no, stack):
|
|
logger.debug(
|
|
"fuse: %d synth inodes, %d edges (%s)",
|
|
self.inode_count,
|
|
len(self.parent),
|
|
# getsizeof is the size of the dict itself; key and value are two small-ish integers,
|
|
# which are shared due to code structure (this has been verified).
|
|
format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self.inode_count)),
|
|
)
|
|
logger.debug("fuse: %d pending archives", len(self.pending_archives))
|
|
logger.debug(
|
|
"fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s",
|
|
self.cache.direct_items + self.cache.indirect_items,
|
|
self.cache.direct_items,
|
|
self.cache.indirect_items,
|
|
format_file_size(sys.getsizeof(self.cache.meta)),
|
|
format_file_size(os.stat(self.cache.fd.fileno()).st_size),
|
|
)
|
|
logger.debug(
|
|
"fuse: data cache: %d/%d entries, %s",
|
|
len(self.data_cache.items()),
|
|
self.data_cache._capacity,
|
|
format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())),
|
|
)
|
|
self.decrypted_repository.log_instrumentation()
|
|
|
|
def mount(self, mountpoint, mount_options, foreground=False):
|
|
"""Mount filesystem on *mountpoint* with *mount_options*."""
|
|
|
|
def pop_option(options, key, present, not_present, wanted_type, int_base=0):
|
|
assert isinstance(options, list) # we mutate this
|
|
for idx, option in enumerate(options):
|
|
if option == key:
|
|
options.pop(idx)
|
|
return present
|
|
if option.startswith(key + "="):
|
|
options.pop(idx)
|
|
value = option.split("=", 1)[1]
|
|
if wanted_type is bool:
|
|
v = value.lower()
|
|
if v in ("y", "yes", "true", "1"):
|
|
return True
|
|
if v in ("n", "no", "false", "0"):
|
|
return False
|
|
raise ValueError("unsupported value in option: %s" % option)
|
|
if wanted_type is int:
|
|
try:
|
|
return int(value, base=int_base)
|
|
except ValueError:
|
|
raise ValueError("unsupported value in option: %s" % option) from None
|
|
try:
|
|
return wanted_type(value)
|
|
except ValueError:
|
|
raise ValueError("unsupported value in option: %s" % option) from None
|
|
else:
|
|
return not_present
|
|
|
|
# default_permissions enables permission checking by the kernel. Without
|
|
# this, any umask (or uid/gid) would not have an effect and this could
|
|
# cause security issues if used with allow_other mount option.
|
|
# When not using allow_other or allow_root, access is limited to the
|
|
# mounting user anyway.
|
|
options = ["fsname=borgfs", "ro", "default_permissions"]
|
|
if mount_options:
|
|
options.extend(mount_options.split(","))
|
|
if is_darwin:
|
|
# macFUSE supports a volname mount option to give what finder displays on desktop / in directory list.
|
|
volname = pop_option(options, "volname", "", "", str)
|
|
# if the user did not specify it, we make something up,
|
|
# because otherwise it would be "macFUSE Volume 0 (Python)", #7690.
|
|
volname = volname or f"{os.path.basename(mountpoint)} (borgfs)"
|
|
options.append(f"volname={volname}")
|
|
ignore_permissions = pop_option(options, "ignore_permissions", True, False, bool)
|
|
if ignore_permissions:
|
|
# in case users have a use-case that requires NOT giving "default_permissions",
|
|
# this is enabled by the custom "ignore_permissions" mount option which just
|
|
# removes "default_permissions" again:
|
|
pop_option(options, "default_permissions", True, False, bool)
|
|
self.allow_damaged_files = pop_option(options, "allow_damaged_files", True, False, bool)
|
|
self.versions = pop_option(options, "versions", True, False, bool)
|
|
self.uid_forced = pop_option(options, "uid", None, None, int)
|
|
self.gid_forced = pop_option(options, "gid", None, None, int)
|
|
self.umask = pop_option(options, "umask", 0, 0, int, int_base=8) # umask is octal, e.g. 222 or 0222
|
|
dir_uid = self.uid_forced if self.uid_forced is not None else self.default_uid
|
|
dir_gid = self.gid_forced if self.gid_forced is not None else self.default_gid
|
|
dir_user = uid2user(dir_uid)
|
|
dir_group = gid2group(dir_gid)
|
|
assert isinstance(dir_user, str)
|
|
assert isinstance(dir_group, str)
|
|
dir_mode = 0o40755 & ~self.umask
|
|
self.default_dir = Item(
|
|
mode=dir_mode, mtime=int(time.time() * 1e9), user=dir_user, group=dir_group, uid=dir_uid, gid=dir_gid
|
|
)
|
|
self._create_filesystem()
|
|
llfuse.init(self, mountpoint, options)
|
|
if not foreground:
|
|
if isinstance(self.repository_uncached, RemoteRepository):
|
|
daemonize()
|
|
else:
|
|
with daemonizing() as (old_id, new_id):
|
|
# local repo: the locking process' PID is changing, migrate it:
|
|
logger.debug("fuse: mount local repo, going to background: migrating lock.")
|
|
self.repository_uncached.migrate_lock(old_id, new_id)
|
|
|
|
# If the file system crashes, we do not want to umount because in that
|
|
# case the mountpoint suddenly appears to become empty. This can have
|
|
# nasty consequences, imagine the user has e.g. an active rsync mirror
|
|
# job - seeing the mountpoint empty, rsync would delete everything in the
|
|
# mirror.
|
|
umount = False
|
|
try:
|
|
with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler):
|
|
signal = fuse_main()
|
|
# no crash and no signal (or it's ^C and we're in the foreground) -> umount request
|
|
umount = signal is None or (signal == SIGINT and foreground)
|
|
finally:
|
|
llfuse.close(umount)
|
|
|
|
@async_wrapper
|
|
def statfs(self, ctx=None):
|
|
stat_ = llfuse.StatvfsData()
|
|
stat_.f_bsize = 512 # Filesystem block size
|
|
stat_.f_frsize = 512 # Fragment size
|
|
stat_.f_blocks = 0 # Size of fs in f_frsize units
|
|
stat_.f_bfree = 0 # Number of free blocks
|
|
stat_.f_bavail = 0 # Number of free blocks for unprivileged users
|
|
stat_.f_files = 0 # Number of inodes
|
|
stat_.f_ffree = 0 # Number of free inodes
|
|
stat_.f_favail = 0 # Number of free inodes for unprivileged users
|
|
stat_.f_namemax = 255 # == NAME_MAX (depends on archive source OS / FS)
|
|
return stat_
|
|
|
|
def _getattr(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
entry = llfuse.EntryAttributes()
|
|
entry.st_ino = inode
|
|
entry.generation = 0
|
|
entry.entry_timeout = 300
|
|
entry.attr_timeout = 300
|
|
entry.st_mode = item.mode & ~self.umask
|
|
entry.st_nlink = item.get("nlink", 1)
|
|
entry.st_uid, entry.st_gid = get_item_uid_gid(
|
|
item,
|
|
numeric=self.numeric_ids,
|
|
uid_default=self.default_uid,
|
|
gid_default=self.default_gid,
|
|
uid_forced=self.uid_forced,
|
|
gid_forced=self.gid_forced,
|
|
)
|
|
entry.st_rdev = item.get("rdev", 0)
|
|
entry.st_size = item.get_size()
|
|
entry.st_blksize = 512
|
|
entry.st_blocks = (entry.st_size + entry.st_blksize - 1) // entry.st_blksize
|
|
# note: older archives only have mtime (not atime nor ctime)
|
|
entry.st_mtime_ns = mtime_ns = item.mtime
|
|
entry.st_atime_ns = item.get("atime", mtime_ns)
|
|
entry.st_ctime_ns = item.get("ctime", mtime_ns)
|
|
entry.st_birthtime_ns = item.get("birthtime", mtime_ns)
|
|
return entry
|
|
|
|
@async_wrapper
|
|
def getattr(self, inode, ctx=None):
|
|
return self._getattr(inode, ctx=ctx)
|
|
|
|
@async_wrapper
|
|
def listxattr(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
return item.get("xattrs", {}).keys()
|
|
|
|
@async_wrapper
|
|
def getxattr(self, inode, name, ctx=None):
|
|
item = self.get_item(inode)
|
|
try:
|
|
return item.get("xattrs", {})[name] or b""
|
|
except KeyError:
|
|
raise llfuse.FUSEError(llfuse.ENOATTR) from None
|
|
|
|
@async_wrapper
|
|
def lookup(self, parent_inode, name, ctx=None):
|
|
self.check_pending_archive(parent_inode)
|
|
if name == b".":
|
|
inode = parent_inode
|
|
elif name == b"..":
|
|
inode = self.parent[parent_inode]
|
|
else:
|
|
inode = self.contents[parent_inode].get(name)
|
|
if not inode:
|
|
raise llfuse.FUSEError(errno.ENOENT)
|
|
return self._getattr(inode)
|
|
|
|
@async_wrapper
|
|
def open(self, inode, flags, ctx=None):
|
|
if not self.allow_damaged_files:
|
|
item = self.get_item(inode)
|
|
if "chunks_healthy" in item:
|
|
# Processed archive items don't carry the path anymore; for converting the inode
|
|
# to the path we'd either have to store the inverse of the current structure,
|
|
# or search the entire archive. So we just don't print it. It's easy to correlate anyway.
|
|
logger.warning(
|
|
"File has damaged (all-zero) chunks. Try running borg check --repair. "
|
|
"Mount with allow_damaged_files to read damaged files."
|
|
)
|
|
raise llfuse.FUSEError(errno.EIO)
|
|
return llfuse.FileInfo(fh=inode) if has_pyfuse3 else inode
|
|
|
|
@async_wrapper
|
|
def opendir(self, inode, ctx=None):
|
|
self.check_pending_archive(inode)
|
|
return inode
|
|
|
|
@async_wrapper
|
|
def read(self, fh, offset, size):
|
|
parts = []
|
|
item = self.get_item(fh)
|
|
|
|
# optimize for linear reads:
|
|
# we cache the chunk number and the in-file offset of the chunk in _last_pos[fh]
|
|
chunk_no, chunk_offset = self._last_pos.get(fh, (0, 0))
|
|
if chunk_offset > offset:
|
|
# this is not a linear read, so we lost track and need to start from beginning again...
|
|
chunk_no, chunk_offset = (0, 0)
|
|
|
|
offset -= chunk_offset
|
|
chunks = item.chunks
|
|
# note: using index iteration to avoid frequently copying big (sub)lists by slicing
|
|
for idx in range(chunk_no, len(chunks)):
|
|
id, s = chunks[idx]
|
|
if s < offset:
|
|
offset -= s
|
|
chunk_offset += s
|
|
chunk_no += 1
|
|
continue
|
|
n = min(size, s - offset)
|
|
if id in self.data_cache:
|
|
data = self.data_cache[id]
|
|
if offset + n == len(data):
|
|
# evict fully read chunk from cache
|
|
del self.data_cache[id]
|
|
else:
|
|
_, data = self.repo_objs.parse(id, self.repository_uncached.get(id), ro_type=ROBJ_FILE_STREAM)
|
|
if offset + n < len(data):
|
|
# chunk was only partially read, cache it
|
|
self.data_cache[id] = data
|
|
parts.append(data[offset : offset + n])
|
|
offset = 0
|
|
size -= n
|
|
if not size:
|
|
if fh in self._last_pos:
|
|
self._last_pos.replace(fh, (chunk_no, chunk_offset))
|
|
else:
|
|
self._last_pos[fh] = (chunk_no, chunk_offset)
|
|
break
|
|
return b"".join(parts)
|
|
|
|
# note: we can't have a generator (with yield) and not a generator (async) in the same method
|
|
if has_pyfuse3:
|
|
|
|
async def readdir(self, fh, off, token): # type: ignore[misc]
|
|
entries = [(b".", fh), (b"..", self.parent[fh])]
|
|
entries.extend(self.contents[fh].items())
|
|
for i, (name, inode) in enumerate(entries[off:], off):
|
|
attrs = self._getattr(inode)
|
|
if not llfuse.readdir_reply(token, name, attrs, i + 1):
|
|
break
|
|
|
|
else:
|
|
|
|
def readdir(self, fh, off): # type: ignore[misc]
|
|
entries = [(b".", fh), (b"..", self.parent[fh])]
|
|
entries.extend(self.contents[fh].items())
|
|
for i, (name, inode) in enumerate(entries[off:], off):
|
|
attrs = self._getattr(inode)
|
|
yield name, attrs, i + 1
|
|
|
|
@async_wrapper
|
|
def readlink(self, inode, ctx=None):
|
|
item = self.get_item(inode)
|
|
return os.fsencode(item.target)
|