borg/src/borg/fuse.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

731 lines
31 KiB
Python
Raw Normal View History

2013-07-21 22:41:06 +00:00
import errno
import functools
import io
2013-07-21 22:41:06 +00:00
import os
2013-07-23 11:07:48 +00:00
import stat
2017-06-14 10:15:46 +00:00
import struct
import sys
import tempfile
2013-07-21 22:41:06 +00:00
import time
from collections import defaultdict
from signal import SIGINT
from .constants import ROBJ_FILE_STREAM
from .fuse_impl import llfuse, has_pyfuse3
if has_pyfuse3:
import trio
def async_wrapper(fn):
@functools.wraps(fn)
async def wrapper(*args, **kwargs):
return fn(*args, **kwargs)
return wrapper
else:
trio = None
def async_wrapper(fn):
return fn
2015-10-08 21:03:35 +00:00
from .logger import create_logger
logger = create_logger()
from .crypto.low_level import blake2b_128
from .archiver._common import build_matcher, build_filter
from .archive import Archive, get_item_uid_gid
from .hashindex import FuseVersionsIndex
from .helpers import daemonize, daemonizing, signal_handler, format_file_size
from .helpers import HardLinkManager
from .helpers import msgpack
2022-08-13 20:02:04 +00:00
from .helpers.lrucache import LRUCache
from .item import Item
from .platform import uid2user, gid2group
from .platformflags import is_darwin
from .remote import RemoteRepository
def fuse_main():
if has_pyfuse3:
try:
trio.run(llfuse.main)
except: # noqa
return 1 # TODO return signal number if it was killed by signal
else:
return None
else:
return llfuse.main(workers=1)
2013-07-21 22:41:06 +00:00
# size of some LRUCaches (1 element per simultaneously open file)
# note: _inode_cache might have rather large elements - Item.chunks can be large!
# also, simultaneously reading too many files should be avoided anyway.
# thus, do not set FILES to high values.
FILES = 4
2016-02-17 00:05:04 +00:00
class ItemCache:
2017-06-14 10:15:46 +00:00
"""
This is the "meat" of the file system's metadata storage.
This class generates inode numbers that efficiently index items in archives,
and retrieves items from these inode numbers.
"""
2017-06-15 21:50:17 +00:00
# 2 MiB are approximately ~230000 items (depends on the average number of items per metadata chunk).
#
# Since growing a bytearray has to copy it, growing it will converge to O(n^2), however,
2017-06-14 10:15:46 +00:00
# this is not yet relevant due to the swiftness of copying memory. If it becomes an issue,
# use an anonymous mmap and just resize that (or, if on 64 bit, make it so big you never need
# to resize it in the first place; that's free).
GROW_META_BY = 2 * 1024 * 1024
indirect_entry_struct = struct.Struct("=cII")
assert indirect_entry_struct.size == 9
2017-06-13 21:52:17 +00:00
2017-06-14 09:14:10 +00:00
def __init__(self, decrypted_repository):
self.decrypted_repository = decrypted_repository
2017-06-14 10:15:46 +00:00
# self.meta, the "meta-array" is a densely packed array of metadata about where items can be found.
# It is indexed by the inode number minus self.offset. (This is in a way eerily similar to how the first
# unices did this).
2017-06-15 21:50:17 +00:00
# The meta-array contains chunk IDs and item entries (described in iter_archive_items).
2017-06-14 10:15:46 +00:00
# The chunk IDs are referenced by item entries through relative offsets,
# which are bounded by the metadata chunk size.
self.meta = bytearray()
# The current write offset in self.meta
self.write_offset = 0
2017-06-15 21:50:17 +00:00
# Offset added to meta-indices, resulting in inodes,
# or subtracted from inodes, resulting in meta-indices.
# XXX: Merge FuseOperations.items and ItemCache to avoid
# this implicit limitation / hack (on the number of synthetic inodes, degenerate
# cases can inflate their number far beyond the number of archives).
self.offset = 1000000
2017-06-14 10:15:46 +00:00
# A temporary file that contains direct items, i.e. items directly cached in this layer.
# These are items that span more than one chunk and thus cannot be efficiently cached
# by the object cache (self.decrypted_repository), which would require variable-length structures;
2017-06-15 21:50:17 +00:00
# possible but not worth the effort, see iter_archive_items.
2017-06-14 10:15:46 +00:00
self.fd = tempfile.TemporaryFile(prefix="borg-tmp")
# A small LRU cache for chunks requested by ItemCache.get() from the object cache,
# this significantly speeds up directory traversal and similar operations which
# tend to re-read the same chunks over and over.
# The capacity is kept low because increasing it does not provide any significant advantage,
2017-06-15 21:50:17 +00:00
# but makes LRUCache's square behaviour noticeable and consumes more memory.
self.chunks = LRUCache(capacity=10)
2017-06-14 10:15:46 +00:00
# Instrumentation
2017-06-15 21:50:17 +00:00
# Count of indirect items, i.e. data is cached in the object cache, not directly in this cache
2017-06-14 10:15:46 +00:00
self.indirect_items = 0
# Count of direct items, i.e. data is in self.fd
self.direct_items = 0
def get(self, inode):
offset = inode - self.offset
if offset < 0:
raise ValueError("ItemCache.get() called with an invalid inode number")
2017-06-15 21:50:17 +00:00
if self.meta[offset] == ord(b"I"):
2017-06-14 10:15:46 +00:00
_, chunk_id_relative_offset, chunk_offset = self.indirect_entry_struct.unpack_from(self.meta, offset)
chunk_id_offset = offset - chunk_id_relative_offset
# bytearray slices are bytearrays as well, explicitly convert to bytes()
chunk_id = bytes(self.meta[chunk_id_offset : chunk_id_offset + 32])
chunk = self.chunks.get(chunk_id)
if not chunk:
csize, chunk = next(self.decrypted_repository.get_many([chunk_id]))
self.chunks[chunk_id] = chunk
2017-06-13 21:52:17 +00:00
data = memoryview(chunk)[chunk_offset:]
unpacker = msgpack.Unpacker()
unpacker.feed(data)
return Item(internal_dict=next(unpacker))
2017-06-15 21:50:17 +00:00
elif self.meta[offset] == ord(b"S"):
fd_offset = int.from_bytes(self.meta[offset + 1 : offset + 9], "little")
self.fd.seek(fd_offset, io.SEEK_SET)
return Item(internal_dict=next(msgpack.Unpacker(self.fd, read_size=1024)))
else:
raise ValueError("Invalid entry type in self.meta")
def iter_archive_items(self, archive_item_ids, filter=None):
2017-06-14 10:15:46 +00:00
unpacker = msgpack.Unpacker()
2017-06-15 21:50:17 +00:00
# Current offset in the metadata stream, which consists of all metadata chunks glued together
2017-06-14 10:15:46 +00:00
stream_offset = 0
2017-06-15 21:50:17 +00:00
# Offset of the current chunk in the metadata stream
2017-06-14 10:15:46 +00:00
chunk_begin = 0
# Length of the chunk preceding the current chunk
2017-06-14 10:15:46 +00:00
last_chunk_length = 0
msgpacked_bytes = b""
write_offset = self.write_offset
meta = self.meta
pack_indirect_into = self.indirect_entry_struct.pack_into
for key, (csize, data) in zip(archive_item_ids, self.decrypted_repository.get_many(archive_item_ids)):
# Store the chunk ID in the meta-array
if write_offset + 32 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
meta[write_offset : write_offset + 32] = key
current_id_offset = write_offset
write_offset += 32
chunk_begin += last_chunk_length
last_chunk_length = len(data)
unpacker.feed(data)
while True:
try:
item = unpacker.unpack()
need_more_data = False
2017-06-14 10:15:46 +00:00
except msgpack.OutOfData:
need_more_data = True
start = stream_offset - chunk_begin
# tell() is not helpful for the need_more_data case, but we know it is the remainder
# of the data in that case. in the other case, tell() works as expected.
length = (len(data) - start) if need_more_data else (unpacker.tell() - stream_offset)
msgpacked_bytes += data[start : start + length]
stream_offset += length
if need_more_data:
2017-06-14 10:15:46 +00:00
# Need more data, feed the next chunk
break
item = Item(internal_dict=item)
if filter and not filter(item):
msgpacked_bytes = b""
continue
2017-06-14 10:15:46 +00:00
current_item = msgpacked_bytes
2017-06-15 21:50:17 +00:00
current_item_length = len(current_item)
current_spans_chunks = stream_offset - current_item_length < chunk_begin
2017-06-14 10:15:46 +00:00
msgpacked_bytes = b""
if write_offset + 9 >= len(meta):
self.meta = meta = meta + bytes(self.GROW_META_BY)
# item entries in the meta-array come in two different flavours, both nine bytes long.
# (1) for items that span chunks:
#
# 'S' + 8 byte offset into the self.fd file, where the msgpacked item starts.
#
# (2) for items that are completely contained in one chunk, which usually is the great majority
# (about 700:1 for system backups)
#
# 'I' + 4 byte offset where the chunk ID is + 4 byte offset in the chunk
# where the msgpacked items starts
#
# The chunk ID offset is the number of bytes _back_ from the start of the entry, i.e.:
#
# |Chunk ID| .... |S1234abcd|
# ^------ offset ----------^
if current_spans_chunks:
pos = self.fd.seek(0, io.SEEK_END)
self.fd.write(current_item)
meta[write_offset : write_offset + 9] = b"S" + pos.to_bytes(8, "little")
self.direct_items += 1
else:
item_offset = stream_offset - current_item_length - chunk_begin
pack_indirect_into(meta, write_offset, b"I", write_offset - current_id_offset, item_offset)
self.indirect_items += 1
2017-06-15 21:50:17 +00:00
inode = write_offset + self.offset
write_offset += 9
2017-06-14 10:15:46 +00:00
yield inode, item
2017-06-14 10:15:46 +00:00
self.write_offset = write_offset
2022-02-27 18:31:33 +00:00
class FuseBackend:
"""Virtual filesystem based on archive(s) to provide information to fuse"""
def __init__(self, manifest, args, decrypted_repository):
self._args = args
2021-04-16 13:02:16 +00:00
self.numeric_ids = args.numeric_ids
self._manifest = manifest
self.repo_objs = manifest.repo_objs
self.repository_uncached = manifest.repository
# Maps inode numbers to Item instances. This is used for synthetic inodes, i.e. file-system objects that are
# made up and are not contained in the archives. For example archive directories or intermediate directories
2017-06-14 10:15:46 +00:00
# not contained in archives.
self._items = {}
# cache up to <FILES> Items
self._inode_cache = LRUCache(capacity=FILES)
# _inode_count is the current count of synthetic inodes, i.e. those in self._items
self.inode_count = 0
2017-06-14 10:15:46 +00:00
# Maps inode numbers to the inode number of the parent
2013-07-23 11:40:54 +00:00
self.parent = {}
2017-06-14 10:15:46 +00:00
# Maps inode numbers to a dictionary mapping byte directory entry names to their inode numbers,
# i.e. this contains all dirents of everything that is mounted. (It becomes really big).
2013-07-23 11:40:54 +00:00
self.contents = defaultdict(dict)
self.default_uid = os.getuid()
self.default_gid = os.getgid()
self.default_dir = None
# Archives to be loaded when first accessed, mapped by their placeholder inode
2014-03-27 21:43:06 +00:00
self.pending_archives = {}
2017-06-14 09:14:10 +00:00
self.cache = ItemCache(decrypted_repository)
self.allow_damaged_files = False
self.versions = False
self.uid_forced = None
self.gid_forced = None
self.umask = 0
def _create_filesystem(self):
self._create_dir(parent=1) # first call, create root dir (inode == 1)
self.versions_index = FuseVersionsIndex()
for archive in self._manifest.archives.list_considering(self._args):
if self.versions:
# process archives immediately
self._process_archive(archive.name)
else:
# lazily load archives, create archive placeholder inode
archive_inode = self._create_dir(parent=1, mtime=int(archive.ts.timestamp() * 1e9))
self.contents[1][os.fsencode(archive.name)] = archive_inode
self.pending_archives[archive_inode] = archive.name
def get_item(self, inode):
item = self._inode_cache.get(inode)
if item is not None:
return item
try:
# this is a cheap get-from-dictionary operation, no need to cache the result.
return self._items[inode]
except KeyError:
# while self.cache does some internal caching, it has still quite some overhead, so we cache the result.
item = self.cache.get(inode)
self._inode_cache[inode] = item
return item
def check_pending_archive(self, inode):
# Check if this is an archive we need to load
archive_name = self.pending_archives.pop(inode, None)
if archive_name is not None:
self._process_archive(archive_name, [os.fsencode(archive_name)])
def _allocate_inode(self):
self.inode_count += 1
return self.inode_count
def _create_dir(self, parent, mtime=None):
"""Create directory"""
ino = self._allocate_inode()
if mtime is not None:
self._items[ino] = Item(internal_dict=self.default_dir.as_dict())
self._items[ino].mtime = mtime
else:
self._items[ino] = self.default_dir
self.parent[ino] = parent
return ino
def find_inode(self, path, prefix=[]):
segments = prefix + path.split(b"/")
inode = 1
for segment in segments:
inode = self.contents[inode][segment]
return inode
def _process_archive(self, archive_name, prefix=[]):
2017-06-23 23:24:14 +00:00
"""Build FUSE inode hierarchy from archive metadata"""
self.file_versions = {} # for versions mode: original path -> version
t0 = time.perf_counter()
archive = Archive(self._manifest, archive_name)
strip_components = self._args.strip_components
matcher = build_matcher(self._args.patterns, self._args.paths)
hlm = HardLinkManager(id_type=bytes, info_type=str) # hlid -> path
filter = build_filter(matcher, strip_components)
for item_inode, item in self.cache.iter_archive_items(archive.metadata.items, filter=filter):
if strip_components:
item.path = os.sep.join(item.path.split(os.sep)[strip_components:])
2017-06-14 10:15:46 +00:00
path = os.fsencode(item.path)
is_dir = stat.S_ISDIR(item.mode)
if is_dir:
2017-06-13 21:52:17 +00:00
try:
2017-06-14 10:15:46 +00:00
# This can happen if an archive was created with a command line like
# $ borg create ... dir1/file dir1
# In this case the code below will have created a default_dir inode for dir1 already.
inode = self.find_inode(path, prefix)
2017-06-14 10:15:46 +00:00
except KeyError:
pass
else:
self._items[inode] = item
2017-06-14 10:15:46 +00:00
continue
segments = prefix + path.split(b"/")
parent = 1
for segment in segments[:-1]:
parent = self._process_inner(segment, parent)
self._process_leaf(segments[-1], item, parent, prefix, is_dir, item_inode, hlm)
duration = time.perf_counter() - t0
logger.debug("fuse: _process_archive completed in %.1f s for archive %s", duration, archive.name)
def _process_leaf(self, name, item, parent, prefix, is_dir, item_inode, hlm):
path = item.path
del item.path # save some space
def file_version(item, path):
if "chunks" in item:
file_id = blake2b_128(path)
current_version, previous_id = self.versions_index.get(file_id, (0, None))
2022-06-10 18:36:58 +00:00
contents_id = blake2b_128(b"".join(chunk_id for chunk_id, _ in item.chunks))
if contents_id != previous_id:
current_version += 1
self.versions_index[file_id] = current_version, contents_id
return current_version
def make_versioned_name(name, version, add_dir=False):
if add_dir:
# add intermediate directory with same name as filename
path_fname = name.rsplit(b"/", 1)
name += b"/" + path_fname[-1]
# keep original extension at end to avoid confusing tools
name, ext = os.path.splitext(name)
version_enc = os.fsencode(".%05d" % version)
return name + version_enc + ext
if "hlid" in item:
link_target = hlm.retrieve(id=item.hlid, default=None)
if link_target is not None:
# Hard link was extracted previously, just link
link_target = os.fsencode(link_target)
if self.versions:
# adjust link target name with version
version = self.file_versions[link_target]
link_target = make_versioned_name(link_target, version, add_dir=True)
try:
inode = self.find_inode(link_target, prefix)
except KeyError:
logger.warning("Skipping broken hard link: %s -> %s", path, link_target)
return
item = self.get_item(inode)
item.nlink = item.get("nlink", 1) + 1
self._items[inode] = item
else:
inode = item_inode
self._items[inode] = item
# remember extracted item path, so that following hardlinks don't extract twice.
hlm.remember(id=item.hlid, info=path)
else:
inode = item_inode
if self.versions and not is_dir:
parent = self._process_inner(name, parent)
enc_path = os.fsencode(path)
version = file_version(item, enc_path)
2016-09-15 13:41:15 +00:00
if version is not None:
# regular file, with contents
name = make_versioned_name(name, version)
self.file_versions[enc_path] = version
self.parent[inode] = parent
if name:
self.contents[parent][name] = inode
def _process_inner(self, name, parent_inode):
dir = self.contents[parent_inode]
if name in dir:
inode = dir[name]
else:
inode = self._create_dir(parent_inode)
if name:
dir[name] = inode
return inode
2013-07-23 11:40:54 +00:00
class FuseOperations(llfuse.Operations, FuseBackend):
"""Export archive as a FUSE filesystem"""
def __init__(self, manifest, args, decrypted_repository):
llfuse.Operations.__init__(self)
FuseBackend.__init__(self, manifest, args, decrypted_repository)
self.decrypted_repository = decrypted_repository
data_cache_capacity = int(os.environ.get("BORG_MOUNT_DATA_CACHE_ENTRIES", os.cpu_count() or 1))
logger.debug("mount data cache capacity: %d chunks", data_cache_capacity)
self.data_cache = LRUCache(capacity=data_cache_capacity)
self._last_pos = LRUCache(capacity=FILES)
def sig_info_handler(self, sig_no, stack):
logger.debug(
"fuse: %d synth inodes, %d edges (%s)",
self.inode_count,
len(self.parent),
# getsizeof is the size of the dict itself; key and value are two small-ish integers,
# which are shared due to code structure (this has been verified).
format_file_size(sys.getsizeof(self.parent) + len(self.parent) * sys.getsizeof(self.inode_count)),
)
logger.debug("fuse: %d pending archives", len(self.pending_archives))
logger.debug(
"fuse: ItemCache %d entries (%d direct, %d indirect), meta-array size %s, direct items size %s",
self.cache.direct_items + self.cache.indirect_items,
self.cache.direct_items,
self.cache.indirect_items,
format_file_size(sys.getsizeof(self.cache.meta)),
format_file_size(os.stat(self.cache.fd.fileno()).st_size),
)
logger.debug(
"fuse: data cache: %d/%d entries, %s",
len(self.data_cache.items()),
self.data_cache._capacity,
format_file_size(sum(len(chunk) for key, chunk in self.data_cache.items())),
)
self.decrypted_repository.log_instrumentation()
def mount(self, mountpoint, mount_options, foreground=False):
"""Mount filesystem on *mountpoint* with *mount_options*."""
def pop_option(options, key, present, not_present, wanted_type, int_base=0):
assert isinstance(options, list) # we mutate this
for idx, option in enumerate(options):
if option == key:
options.pop(idx)
return present
if option.startswith(key + "="):
options.pop(idx)
value = option.split("=", 1)[1]
if wanted_type is bool:
v = value.lower()
if v in ("y", "yes", "true", "1"):
return True
if v in ("n", "no", "false", "0"):
return False
raise ValueError("unsupported value in option: %s" % option)
if wanted_type is int:
try:
return int(value, base=int_base)
except ValueError:
raise ValueError("unsupported value in option: %s" % option) from None
try:
return wanted_type(value)
except ValueError:
raise ValueError("unsupported value in option: %s" % option) from None
else:
return not_present
# default_permissions enables permission checking by the kernel. Without
# this, any umask (or uid/gid) would not have an effect and this could
# cause security issues if used with allow_other mount option.
# When not using allow_other or allow_root, access is limited to the
# mounting user anyway.
options = ["fsname=borgfs", "ro", "default_permissions"]
if mount_options:
options.extend(mount_options.split(","))
if is_darwin:
# macFUSE supports a volname mount option to give what finder displays on desktop / in directory list.
volname = pop_option(options, "volname", "", "", str)
# if the user did not specify it, we make something up,
# because otherwise it would be "macFUSE Volume 0 (Python)", #7690.
volname = volname or f"{os.path.basename(mountpoint)} (borgfs)"
options.append(f"volname={volname}")
ignore_permissions = pop_option(options, "ignore_permissions", True, False, bool)
if ignore_permissions:
# in case users have a use-case that requires NOT giving "default_permissions",
# this is enabled by the custom "ignore_permissions" mount option which just
# removes "default_permissions" again:
pop_option(options, "default_permissions", True, False, bool)
self.allow_damaged_files = pop_option(options, "allow_damaged_files", True, False, bool)
self.versions = pop_option(options, "versions", True, False, bool)
self.uid_forced = pop_option(options, "uid", None, None, int)
self.gid_forced = pop_option(options, "gid", None, None, int)
self.umask = pop_option(options, "umask", 0, 0, int, int_base=8) # umask is octal, e.g. 222 or 0222
dir_uid = self.uid_forced if self.uid_forced is not None else self.default_uid
dir_gid = self.gid_forced if self.gid_forced is not None else self.default_gid
dir_user = uid2user(dir_uid)
dir_group = gid2group(dir_gid)
assert isinstance(dir_user, str)
assert isinstance(dir_group, str)
dir_mode = 0o40755 & ~self.umask
self.default_dir = Item(
mode=dir_mode, mtime=int(time.time() * 1e9), user=dir_user, group=dir_group, uid=dir_uid, gid=dir_gid
)
self._create_filesystem()
llfuse.init(self, mountpoint, options)
if not foreground:
if isinstance(self.repository_uncached, RemoteRepository):
daemonize()
else:
with daemonizing() as (old_id, new_id):
# local repo: the locking process' PID is changing, migrate it:
logger.debug("fuse: mount local repo, going to background: migrating lock.")
self.repository_uncached.migrate_lock(old_id, new_id)
# If the file system crashes, we do not want to umount because in that
# case the mountpoint suddenly appears to become empty. This can have
# nasty consequences, imagine the user has e.g. an active rsync mirror
# job - seeing the mountpoint empty, rsync would delete everything in the
# mirror.
umount = False
try:
with signal_handler("SIGUSR1", self.sig_info_handler), signal_handler("SIGINFO", self.sig_info_handler):
signal = fuse_main()
# no crash and no signal (or it's ^C and we're in the foreground) -> umount request
umount = signal is None or (signal == SIGINT and foreground)
finally:
llfuse.close(umount)
2013-07-21 22:41:06 +00:00
@async_wrapper
def statfs(self, ctx=None):
2013-07-27 12:31:28 +00:00
stat_ = llfuse.StatvfsData()
2023-04-02 14:56:28 +00:00
stat_.f_bsize = 512 # Filesystem block size
stat_.f_frsize = 512 # Fragment size
stat_.f_blocks = 0 # Size of fs in f_frsize units
stat_.f_bfree = 0 # Number of free blocks
stat_.f_bavail = 0 # Number of free blocks for unprivileged users
stat_.f_files = 0 # Number of inodes
stat_.f_ffree = 0 # Number of free inodes
stat_.f_favail = 0 # Number of free inodes for unprivileged users
stat_.f_namemax = 255 # == NAME_MAX (depends on archive source OS / FS)
2013-07-27 12:31:28 +00:00
return stat_
def _getattr(self, inode, ctx=None):
item = self.get_item(inode)
2013-07-21 22:41:06 +00:00
entry = llfuse.EntryAttributes()
2013-07-23 11:40:54 +00:00
entry.st_ino = inode
2013-07-21 22:41:06 +00:00
entry.generation = 0
entry.entry_timeout = 300
entry.attr_timeout = 300
entry.st_mode = item.mode & ~self.umask
entry.st_nlink = item.get("nlink", 1)
2021-04-16 13:02:16 +00:00
entry.st_uid, entry.st_gid = get_item_uid_gid(
item,
numeric=self.numeric_ids,
uid_default=self.default_uid,
gid_default=self.default_gid,
uid_forced=self.uid_forced,
gid_forced=self.gid_forced,
)
entry.st_rdev = item.get("rdev", 0)
entry.st_size = item.get_size()
2013-07-21 22:41:06 +00:00
entry.st_blksize = 512
2017-02-14 20:08:38 +00:00
entry.st_blocks = (entry.st_size + entry.st_blksize - 1) // entry.st_blksize
# note: older archives only have mtime (not atime nor ctime)
entry.st_mtime_ns = mtime_ns = item.mtime
entry.st_atime_ns = item.get("atime", mtime_ns)
entry.st_ctime_ns = item.get("ctime", mtime_ns)
entry.st_birthtime_ns = item.get("birthtime", mtime_ns)
2013-07-21 22:41:06 +00:00
return entry
@async_wrapper
def getattr(self, inode, ctx=None):
return self._getattr(inode, ctx=ctx)
@async_wrapper
def listxattr(self, inode, ctx=None):
item = self.get_item(inode)
return item.get("xattrs", {}).keys()
2013-07-23 08:44:29 +00:00
@async_wrapper
def getxattr(self, inode, name, ctx=None):
item = self.get_item(inode)
2013-07-23 08:44:29 +00:00
try:
return item.get("xattrs", {})[name] or b""
2013-07-23 08:44:29 +00:00
except KeyError:
raise llfuse.FUSEError(llfuse.ENOATTR) from None
2013-07-23 08:44:29 +00:00
@async_wrapper
def lookup(self, parent_inode, name, ctx=None):
self.check_pending_archive(parent_inode)
2013-07-21 22:41:06 +00:00
if name == b".":
inode = parent_inode
elif name == b"..":
2013-07-23 11:40:54 +00:00
inode = self.parent[parent_inode]
2013-07-21 22:41:06 +00:00
else:
2013-07-23 11:40:54 +00:00
inode = self.contents[parent_inode].get(name)
2013-07-21 22:41:06 +00:00
if not inode:
raise llfuse.FUSEError(errno.ENOENT)
return self._getattr(inode)
2013-07-21 22:41:06 +00:00
@async_wrapper
def open(self, inode, flags, ctx=None):
if not self.allow_damaged_files:
item = self.get_item(inode)
if "chunks_healthy" in item:
# Processed archive items don't carry the path anymore; for converting the inode
# to the path we'd either have to store the inverse of the current structure,
# or search the entire archive. So we just don't print it. It's easy to correlate anyway.
logger.warning(
"File has damaged (all-zero) chunks. Try running borg check --repair. "
"Mount with allow_damaged_files to read damaged files."
)
raise llfuse.FUSEError(errno.EIO)
return llfuse.FileInfo(fh=inode) if has_pyfuse3 else inode
2013-07-21 22:41:06 +00:00
@async_wrapper
def opendir(self, inode, ctx=None):
self.check_pending_archive(inode)
2013-07-21 22:41:06 +00:00
return inode
@async_wrapper
2013-07-21 22:41:06 +00:00
def read(self, fh, offset, size):
parts = []
item = self.get_item(fh)
# optimize for linear reads:
# we cache the chunk number and the in-file offset of the chunk in _last_pos[fh]
chunk_no, chunk_offset = self._last_pos.get(fh, (0, 0))
if chunk_offset > offset:
# this is not a linear read, so we lost track and need to start from beginning again...
chunk_no, chunk_offset = (0, 0)
offset -= chunk_offset
chunks = item.chunks
# note: using index iteration to avoid frequently copying big (sub)lists by slicing
for idx in range(chunk_no, len(chunks)):
2022-06-10 18:36:58 +00:00
id, s = chunks[idx]
2013-07-21 22:41:06 +00:00
if s < offset:
offset -= s
chunk_offset += s
chunk_no += 1
2013-07-21 22:41:06 +00:00
continue
n = min(size, s - offset)
if id in self.data_cache:
data = self.data_cache[id]
if offset + n == len(data):
# evict fully read chunk from cache
del self.data_cache[id]
else:
_, data = self.repo_objs.parse(id, self.repository_uncached.get(id), ro_type=ROBJ_FILE_STREAM)
if offset + n < len(data):
# chunk was only partially read, cache it
self.data_cache[id] = data
parts.append(data[offset : offset + n])
2013-07-21 22:41:06 +00:00
offset = 0
size -= n
if not size:
if fh in self._last_pos:
self._last_pos.replace(fh, (chunk_no, chunk_offset))
else:
self._last_pos[fh] = (chunk_no, chunk_offset)
2013-07-21 22:41:06 +00:00
break
return b"".join(parts)
# note: we can't have a generator (with yield) and not a generator (async) in the same method
if has_pyfuse3:
2022-07-15 11:26:35 +00:00
async def readdir(self, fh, off, token): # type: ignore[misc]
entries = [(b".", fh), (b"..", self.parent[fh])]
entries.extend(self.contents[fh].items())
for i, (name, inode) in enumerate(entries[off:], off):
attrs = self._getattr(inode)
if not llfuse.readdir_reply(token, name, attrs, i + 1):
break
else:
2022-07-15 11:26:35 +00:00
def readdir(self, fh, off): # type: ignore[misc]
entries = [(b".", fh), (b"..", self.parent[fh])]
entries.extend(self.contents[fh].items())
for i, (name, inode) in enumerate(entries[off:], off):
attrs = self._getattr(inode)
yield name, attrs, i + 1
2013-07-21 22:41:06 +00:00
@async_wrapper
def readlink(self, inode, ctx=None):
item = self.get_item(inode)
return os.fsencode(item.target)