from collections import defaultdict import errno import io import llfuse import msgpack import os import stat import tempfile import time from attic.archive import Archive from attic.helpers import daemonize from attic.remote import cache_if_remote # Does this version of llfuse support ns precision? have_fuse_mtime_ns = hasattr(llfuse.EntryAttributes, 'st_mtime_ns') class ItemCache: def __init__(self): self.fd = tempfile.TemporaryFile() self.offset = 1000000 def add(self, item): pos = self.fd.seek(0, io.SEEK_END) self.fd.write(msgpack.packb(item)) return pos + self.offset def get(self, inode): self.fd.seek(inode - self.offset, io.SEEK_SET) return next(msgpack.Unpacker(self.fd)) class AtticOperations(llfuse.Operations): """Export Attic archive as a fuse filesystem """ def __init__(self, key, repository, manifest, archive): super(AtticOperations, self).__init__() self._inode_count = 0 self.key = key self.repository = cache_if_remote(repository) self.items = {} self.parent = {} self.contents = defaultdict(dict) self.default_dir = {b'mode': 0o40755, b'mtime': int(time.time() * 1e9), b'uid': os.getuid(), b'gid': os.getgid()} self.pending_archives = {} self.cache = ItemCache() if archive: self.process_archive(archive) else: # Create root inode self.parent[1] = self.allocate_inode() self.items[1] = self.default_dir for archive_name in manifest.archives: # Create archive placeholder inode archive_inode = self.allocate_inode() self.items[archive_inode] = self.default_dir self.parent[archive_inode] = 1 self.contents[1][os.fsencode(archive_name)] = archive_inode self.pending_archives[archive_inode] = Archive(repository, key, manifest, archive_name) def process_archive(self, archive, prefix=[]): """Build fuse inode hierarchy from archive metadata """ unpacker = msgpack.Unpacker() for key, chunk in zip(archive.metadata[b'items'], self.repository.get_many(archive.metadata[b'items'])): data = self.key.decrypt(key, chunk) unpacker.feed(data) for item in unpacker: segments = prefix + os.fsencode(os.path.normpath(item[b'path'])).split(b'/') del item[b'path'] num_segments = len(segments) parent = 1 for i, segment in enumerate(segments, 1): # Insert a default root inode if needed if self._inode_count == 0 and segment: archive_inode = self.allocate_inode() self.items[archive_inode] = self.default_dir self.parent[archive_inode] = parent # Leaf segment? if i == num_segments: if b'source' in item and stat.S_ISREG(item[b'mode']): inode = self._find_inode(item[b'source'], prefix) item = self.cache.get(inode) item[b'nlink'] = item.get(b'nlink', 1) + 1 self.items[inode] = item else: inode = self.cache.add(item) self.parent[inode] = parent if segment: self.contents[parent][segment] = inode elif segment in self.contents[parent]: parent = self.contents[parent][segment] else: inode = self.allocate_inode() self.items[inode] = self.default_dir self.parent[inode] = parent if segment: self.contents[parent][segment] = inode parent = inode def allocate_inode(self): self._inode_count += 1 return self._inode_count def statfs(self): stat_ = llfuse.StatvfsData() stat_.f_bsize = 512 stat_.f_frsize = 512 stat_.f_blocks = 0 stat_.f_bfree = 0 stat_.f_bavail = 0 stat_.f_files = 0 stat_.f_ffree = 0 stat_.f_favail = 0 return stat_ def get_item(self, inode): try: return self.items[inode] except KeyError: return self.cache.get(inode) def _find_inode(self, path, prefix=[]): segments = prefix + os.fsencode(os.path.normpath(path)).split(b'/') inode = 1 for segment in segments: inode = self.contents[inode][segment] return inode def getattr(self, inode): item = self.get_item(inode) size = 0 try: size = sum(size for _, size, _ in item[b'chunks']) except KeyError: pass entry = llfuse.EntryAttributes() entry.st_ino = inode entry.generation = 0 entry.entry_timeout = 300 entry.attr_timeout = 300 entry.st_mode = item[b'mode'] entry.st_nlink = item.get(b'nlink', 1) entry.st_uid = item[b'uid'] entry.st_gid = item[b'gid'] entry.st_rdev = item.get(b'rdev', 0) entry.st_size = size entry.st_blksize = 512 entry.st_blocks = 1 if have_fuse_mtime_ns: entry.st_atime_ns = item[b'mtime'] entry.st_mtime_ns = item[b'mtime'] entry.st_ctime_ns = item[b'mtime'] else: entry.st_atime = item[b'mtime'] / 1e9 entry.st_mtime = item[b'mtime'] / 1e9 entry.st_ctime = item[b'mtime'] / 1e9 return entry def listxattr(self, inode): item = self.get_item(inode) return item.get(b'xattrs', {}).keys() def getxattr(self, inode, name): item = self.get_item(inode) try: return item.get(b'xattrs', {})[name] except KeyError: raise llfuse.FUSEError(errno.ENODATA) def _load_pending_archive(self, inode): # Check if this is an archive we need to load archive = self.pending_archives.pop(inode, None) if archive: self.process_archive(archive, [os.fsencode(archive.name)]) def lookup(self, parent_inode, name): self._load_pending_archive(parent_inode) if name == b'.': inode = parent_inode elif name == b'..': inode = self.parent[parent_inode] else: inode = self.contents[parent_inode].get(name) if not inode: raise llfuse.FUSEError(errno.ENOENT) return self.getattr(inode) def open(self, inode, flags): return inode def opendir(self, inode): self._load_pending_archive(inode) return inode def read(self, fh, offset, size): parts = [] item = self.get_item(fh) for id, s, csize in item[b'chunks']: if s < offset: offset -= s continue n = min(size, s - offset) chunk = self.key.decrypt(id, self.repository.get(id)) parts.append(chunk[offset:offset+n]) offset = 0 size -= n if not size: break return b''.join(parts) def readdir(self, fh, off): entries = [(b'.', fh), (b'..', self.parent[fh])] entries.extend(self.contents[fh].items()) for i, (name, inode) in enumerate(entries[off:], off): yield name, self.getattr(inode), i + 1 def readlink(self, inode): item = self.get_item(inode) return os.fsencode(item[b'source']) def mount(self, mountpoint, extra_options, foreground=False): options = ['fsname=atticfs', 'ro'] if extra_options: options.extend(extra_options.split(',')) llfuse.init(self, mountpoint, options) if not foreground: daemonize() try: llfuse.main(single=True) except: llfuse.close() raise llfuse.close()