borg/src/borg/manifest.py

324 lines
13 KiB
Python

import enum
import re
from collections import abc, namedtuple
from datetime import datetime, timedelta, timezone
from operator import attrgetter
from collections.abc import Sequence
from .logger import create_logger
logger = create_logger()
from .constants import * # NOQA
from .helpers.datastruct import StableDict
from .helpers.parseformat import bin_to_hex
from .helpers.time import parse_timestamp, calculate_relative_offset, archive_ts_now
from .helpers.errors import Error
from .patterns import get_regex_from_pattern
from .repoobj import RepoObj
class MandatoryFeatureUnsupported(Error):
"""Unsupported repository feature(s) {}. A newer version of borg is required to access this repository."""
exit_mcode = 25
class NoManifestError(Error):
"""Repository has no manifest."""
exit_mcode = 26
ArchiveInfo = namedtuple("ArchiveInfo", "name id ts")
# timestamp is a replacement for ts, archive is an alias for name (see SortBySpec)
AI_HUMAN_SORT_KEYS = ["timestamp", "archive"] + list(ArchiveInfo._fields)
AI_HUMAN_SORT_KEYS.remove("ts")
def filter_archives_by_date(archives, older=None, newer=None, oldest=None, newest=None):
def get_first_and_last_archive_ts(archives_list):
timestamps = [x.ts for x in archives_list]
return min(timestamps), max(timestamps)
if not archives:
return archives
now = archive_ts_now()
earliest_ts, latest_ts = get_first_and_last_archive_ts(archives)
until_ts = calculate_relative_offset(older, now, earlier=True) if older is not None else latest_ts
from_ts = calculate_relative_offset(newer, now, earlier=True) if newer is not None else earliest_ts
archives = [x for x in archives if from_ts <= x.ts <= until_ts]
if not archives:
return archives
earliest_ts, latest_ts = get_first_and_last_archive_ts(archives)
if oldest:
until_ts = calculate_relative_offset(oldest, earliest_ts, earlier=False)
archives = [x for x in archives if x.ts <= until_ts]
if newest:
from_ts = calculate_relative_offset(newest, latest_ts, earlier=True)
archives = [x for x in archives if x.ts >= from_ts]
return archives
class Archives(abc.MutableMapping):
"""
Nice wrapper around the archives dict, making sure only valid types/values get in
and we can deal with str keys (and it internally encodes to byte keys) and either
str timestamps or datetime timestamps.
"""
def __init__(self):
# key: str archive name, value: dict('id': bytes_id, 'time': str_iso_ts)
self._archives = {}
def __len__(self):
return len(self._archives)
def __iter__(self):
return iter(self._archives)
def __getitem__(self, name):
assert isinstance(name, str)
values = self._archives.get(name)
if values is None:
raise KeyError
ts = parse_timestamp(values["time"])
return ArchiveInfo(name=name, id=values["id"], ts=ts)
def __setitem__(self, name, info):
assert isinstance(name, str)
assert isinstance(info, tuple)
id, ts = info
assert isinstance(id, bytes)
if isinstance(ts, datetime):
ts = ts.isoformat(timespec="microseconds")
assert isinstance(ts, str)
self._archives[name] = {"id": id, "time": ts}
def __delitem__(self, name):
assert isinstance(name, str)
del self._archives[name]
def list(
self,
*,
consider_checkpoints=True,
match=None,
match_end=r"\Z",
sort_by=(),
reverse=False,
first=None,
last=None,
older=None,
newer=None,
oldest=None,
newest=None,
):
"""
Return list of ArchiveInfo instances according to the parameters.
First match *match* (considering *match_end*), then filter by timestamp considering *older* and *newer*.
Second, follow with a filter considering *oldest* and *newest*, then sort by the given *sort_by* argument.
Apply *first* and *last* filters, and then possibly *reverse* the list.
*sort_by* is a list of sort keys applied in reverse order.
*newer* and *older* are relative time markers that indicate offset from now.
*newest* and *oldest* are relative time markers that indicate offset from newest/oldest archive's timestamp.
Note: for better robustness, all filtering / limiting parameters must default to
"not limit / not filter", so a FULL archive list is produced by a simple .list().
some callers EXPECT to iterate over all archives in a repo for correct operation.
"""
if isinstance(sort_by, (str, bytes)):
raise TypeError("sort_by must be a sequence of str")
archives = self.values()
regex = get_regex_from_pattern(match or "re:.*")
regex = re.compile(regex + match_end)
archives = [x for x in archives if regex.match(x.name) is not None]
if any([oldest, newest, older, newer]):
archives = filter_archives_by_date(archives, oldest=oldest, newest=newest, newer=newer, older=older)
if not consider_checkpoints:
archives = [x for x in archives if ".checkpoint" not in x.name]
for sortkey in reversed(sort_by):
archives.sort(key=attrgetter(sortkey))
if first:
archives = archives[:first]
elif last:
archives = archives[max(len(archives) - last, 0) :]
if reverse:
archives.reverse()
return archives
def list_considering(self, args):
"""
get a list of archives, considering --first/last/prefix/match-archives/sort/consider-checkpoints cmdline args
"""
name = getattr(args, "name", None)
consider_checkpoints = getattr(args, "consider_checkpoints", None)
if name is not None:
raise Error(
"Giving a specific name is incompatible with options --first, --last, "
"-a / --match-archives, and --consider-checkpoints."
)
return self.list(
sort_by=args.sort_by.split(","),
consider_checkpoints=consider_checkpoints,
match=args.match_archives,
first=getattr(args, "first", None),
last=getattr(args, "last", None),
older=getattr(args, "older", None),
newer=getattr(args, "newer", None),
oldest=getattr(args, "oldest", None),
newest=getattr(args, "newest", None),
)
def set_raw_dict(self, d):
"""set the dict we get from the msgpack unpacker"""
for k, v in d.items():
assert isinstance(k, str)
assert isinstance(v, dict) and "id" in v and "time" in v
self._archives[k] = v
def get_raw_dict(self):
"""get the dict we can give to the msgpack packer"""
return self._archives
class Manifest:
@enum.unique
class Operation(enum.Enum):
# The comments here only roughly describe the scope of each feature. In the end, additions need to be
# based on potential problems older clients could produce when accessing newer repositories and the
# trade-offs of locking version out or still allowing access. As all older versions and their exact
# behaviours are known when introducing new features sometimes this might not match the general descriptions
# below.
# The READ operation describes which features are needed to list and extract the archives safely in the
# repository.
READ = "read"
# The CHECK operation is for all operations that need either to understand every detail
# of the repository (for consistency checks and repairs) or are seldom used functions that just
# should use the most restrictive feature set because more fine grained compatibility tracking is
# not needed.
CHECK = "check"
# The WRITE operation is for adding archives. Features here ensure that older clients don't add archives
# in an old format, or is used to lock out clients that for other reasons can no longer safely add new
# archives.
WRITE = "write"
# The DELETE operation is for all operations (like archive deletion) that need a 100% correct reference
# count and the need to be able to find all (directly and indirectly) referenced chunks of a given archive.
DELETE = "delete"
NO_OPERATION_CHECK: Sequence[Operation] = tuple()
SUPPORTED_REPO_FEATURES: frozenset[str] = frozenset([])
MANIFEST_ID = b"\0" * 32
def __init__(self, key, repository, item_keys=None, ro_cls=RepoObj):
self.archives = Archives()
self.config = {}
self.key = key
self.repo_objs = ro_cls(key)
self.repository = repository
self.item_keys = frozenset(item_keys) if item_keys is not None else ITEM_KEYS
self.timestamp = None
@property
def id_str(self):
return bin_to_hex(self.id)
@property
def last_timestamp(self):
return parse_timestamp(self.timestamp)
@classmethod
def load(cls, repository, operations, key=None, *, ro_cls=RepoObj):
from .item import ManifestItem
from .crypto.key import key_factory
from .repository import Repository
try:
cdata = repository.get(cls.MANIFEST_ID)
except Repository.ObjectNotFound:
raise NoManifestError
if not key:
key = key_factory(repository, cdata, ro_cls=ro_cls)
manifest = cls(key, repository, ro_cls=ro_cls)
_, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata, ro_type=ROBJ_MANIFEST)
manifest_dict = key.unpack_manifest(data)
m = ManifestItem(internal_dict=manifest_dict)
manifest.id = manifest.repo_objs.id_hash(data)
if m.get("version") not in (1, 2):
raise ValueError("Invalid manifest version")
manifest.archives.set_raw_dict(m.archives)
manifest.timestamp = m.get("timestamp")
manifest.config = m.config
# valid item keys are whatever is known in the repo or every key we know
manifest.item_keys = ITEM_KEYS
manifest.item_keys |= frozenset(m.config.get("item_keys", [])) # new location of item_keys since borg2
manifest.item_keys |= frozenset(m.get("item_keys", [])) # legacy: borg 1.x: item_keys not in config yet
manifest.check_repository_compatibility(operations)
return manifest
def check_repository_compatibility(self, operations):
for operation in operations:
assert isinstance(operation, self.Operation)
feature_flags = self.config.get("feature_flags", None)
if feature_flags is None:
return
if operation.value not in feature_flags:
continue
requirements = feature_flags[operation.value]
if "mandatory" in requirements:
unsupported = set(requirements["mandatory"]) - self.SUPPORTED_REPO_FEATURES
if unsupported:
raise MandatoryFeatureUnsupported(list(unsupported))
def get_all_mandatory_features(self):
result = {}
feature_flags = self.config.get("feature_flags", None)
if feature_flags is None:
return result
for operation, requirements in feature_flags.items():
if "mandatory" in requirements:
result[operation] = set(requirements["mandatory"])
return result
def write(self):
from .item import ManifestItem
# self.timestamp needs to be strictly monotonically increasing. Clocks often are not set correctly
if self.timestamp is None:
self.timestamp = datetime.now(tz=timezone.utc).isoformat(timespec="microseconds")
else:
incremented_ts = self.last_timestamp + timedelta(microseconds=1)
now_ts = datetime.now(tz=timezone.utc)
max_ts = max(incremented_ts, now_ts)
self.timestamp = max_ts.isoformat(timespec="microseconds")
# include checks for limits as enforced by limited unpacker (used by load())
assert len(self.archives) <= MAX_ARCHIVES
assert all(len(name) <= 255 for name in self.archives)
assert len(self.item_keys) <= 100
self.config["item_keys"] = tuple(sorted(self.item_keys))
manifest = ManifestItem(
version=2,
archives=StableDict(self.archives.get_raw_dict()),
timestamp=self.timestamp,
config=StableDict(self.config),
)
data = self.key.pack_metadata(manifest.as_dict())
self.id = self.repo_objs.id_hash(data)
self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data, ro_type=ROBJ_MANIFEST))