from contextlib import suppress from io import TextIOWrapper from . import abc class SpecLoaderAdapter: """ Adapt a package spec to adapt the underlying loader. """ def __init__(self, spec, adapter=lambda spec: spec.loader): self.spec = spec self.loader = adapter(spec) def __getattr__(self, name): return getattr(self.spec, name) class TraversableResourcesLoader: """ Adapt a loader to provide TraversableResources. """ def __init__(self, spec): self.spec = spec def get_resource_reader(self, name): return CompatibilityFiles(self.spec)._native() def _io_wrapper(file, mode='r', *args, **kwargs): if mode == 'r': return TextIOWrapper(file, *args, **kwargs) elif mode == 'rb': return file raise ValueError( "Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode) ) class CompatibilityFiles: """ Adapter for an existing or non-existent resource reader to provide a compatibility .files(). """ class SpecPath(abc.Traversable): """ Path tied to a module spec. Can be read and exposes the resource reader children. """ def __init__(self, spec, reader): self._spec = spec self._reader = reader def iterdir(self): if not self._reader: return iter(()) return iter( CompatibilityFiles.ChildPath(self._reader, path) for path in self._reader.contents() ) def is_file(self): return False is_dir = is_file def joinpath(self, other): if not self._reader: return CompatibilityFiles.OrphanPath(other) return CompatibilityFiles.ChildPath(self._reader, other) @property def name(self): return self._spec.name def open(self, mode='r', *args, **kwargs): return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) class ChildPath(abc.Traversable): """ Path tied to a resource reader child. Can be read but doesn't expose any meaningful children. """ def __init__(self, reader, name): self._reader = reader self._name = name def iterdir(self): return iter(()) def is_file(self): return self._reader.is_resource(self.name) def is_dir(self): return not self.is_file() def joinpath(self, other): return CompatibilityFiles.OrphanPath(self.name, other) @property def name(self): return self._name def open(self, mode='r', *args, **kwargs): return _io_wrapper( self._reader.open_resource(self.name), mode, *args, **kwargs ) class OrphanPath(abc.Traversable): """ Orphan path, not tied to a module spec or resource reader. Can't be read and doesn't expose any meaningful children. """ def __init__(self, *path_parts): if len(path_parts) < 1: raise ValueError('Need at least one path part to construct a path') self._path = path_parts def iterdir(self): return iter(()) def is_file(self): return False is_dir = is_file def joinpath(self, other): return CompatibilityFiles.OrphanPath(*self._path, other) @property def name(self): return self._path[-1] def open(self, mode='r', *args, **kwargs): raise FileNotFoundError("Can't open orphan path") def __init__(self, spec): self.spec = spec @property def _reader(self): with suppress(AttributeError): return self.spec.loader.get_resource_reader(self.spec.name) def _native(self): """ Return the native reader if it supports files(). """ reader = self._reader return reader if hasattr(reader, 'files') else self def __getattr__(self, attr): return getattr(self._reader, attr) def files(self): return CompatibilityFiles.SpecPath(self.spec, self._reader) def wrap_spec(package): """ Construct a package spec with traversable compatibility on the spec/loader/reader. """ return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)