borg/src/borg/manifest.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

324 lines
13 KiB
Python
Raw Permalink Normal View History

2017-07-31 15:54:45 +00:00
import enum
import re
from collections import abc, namedtuple
from datetime import datetime, timedelta, timezone
2017-07-31 15:54:45 +00:00
from operator import attrgetter
2023-04-02 00:14:54 +00:00
from collections.abc import Sequence
2017-07-31 15:54:45 +00:00
from .logger import create_logger
2017-07-31 15:54:45 +00:00
logger = create_logger()
from .constants import * # NOQA
from .helpers.datastruct import StableDict
from .helpers.parseformat import bin_to_hex
from .helpers.time import parse_timestamp, calculate_relative_offset, archive_ts_now
from .helpers.errors import Error
from .patterns import get_regex_from_pattern
from .repoobj import RepoObj
2017-07-31 15:54:45 +00:00
class MandatoryFeatureUnsupported(Error):
"""Unsupported repository feature(s) {}. A newer version of borg is required to access this repository."""
exit_mcode = 25
2017-07-31 15:54:45 +00:00
class NoManifestError(Error):
"""Repository has no manifest."""
exit_mcode = 26
2017-07-31 15:54:45 +00:00
ArchiveInfo = namedtuple("ArchiveInfo", "name id ts")
# timestamp is a replacement for ts, archive is an alias for name (see SortBySpec)
AI_HUMAN_SORT_KEYS = ["timestamp", "archive"] + list(ArchiveInfo._fields)
2017-07-31 15:54:45 +00:00
AI_HUMAN_SORT_KEYS.remove("ts")
def filter_archives_by_date(archives, older=None, newer=None, oldest=None, newest=None):
def get_first_and_last_archive_ts(archives_list):
timestamps = [x.ts for x in archives_list]
return min(timestamps), max(timestamps)
if not archives:
return archives
now = archive_ts_now()
earliest_ts, latest_ts = get_first_and_last_archive_ts(archives)
until_ts = calculate_relative_offset(older, now, earlier=True) if older is not None else latest_ts
from_ts = calculate_relative_offset(newer, now, earlier=True) if newer is not None else earliest_ts
archives = [x for x in archives if from_ts <= x.ts <= until_ts]
if not archives:
return archives
earliest_ts, latest_ts = get_first_and_last_archive_ts(archives)
if oldest:
until_ts = calculate_relative_offset(oldest, earliest_ts, earlier=False)
archives = [x for x in archives if x.ts <= until_ts]
if newest:
from_ts = calculate_relative_offset(newest, latest_ts, earlier=True)
archives = [x for x in archives if x.ts >= from_ts]
return archives
2017-07-31 15:54:45 +00:00
class Archives(abc.MutableMapping):
"""
Nice wrapper around the archives dict, making sure only valid types/values get in
and we can deal with str keys (and it internally encodes to byte keys) and either
str timestamps or datetime timestamps.
"""
2017-07-31 15:54:45 +00:00
def __init__(self):
# key: str archive name, value: dict('id': bytes_id, 'time': str_iso_ts)
2017-07-31 15:54:45 +00:00
self._archives = {}
def __len__(self):
return len(self._archives)
def __iter__(self):
return iter(self._archives)
2017-07-31 15:54:45 +00:00
def __getitem__(self, name):
assert isinstance(name, str)
values = self._archives.get(name)
2017-07-31 15:54:45 +00:00
if values is None:
raise KeyError
ts = parse_timestamp(values["time"])
return ArchiveInfo(name=name, id=values["id"], ts=ts)
2017-07-31 15:54:45 +00:00
def __setitem__(self, name, info):
assert isinstance(name, str)
assert isinstance(info, tuple)
id, ts = info
assert isinstance(id, bytes)
if isinstance(ts, datetime):
ts = ts.isoformat(timespec="microseconds")
2017-07-31 15:54:45 +00:00
assert isinstance(ts, str)
self._archives[name] = {"id": id, "time": ts}
2017-07-31 15:54:45 +00:00
def __delitem__(self, name):
assert isinstance(name, str)
del self._archives[name]
def list(
self,
*,
2023-01-23 14:11:53 +00:00
consider_checkpoints=True,
match=None,
match_end=r"\Z",
sort_by=(),
2023-01-23 14:11:53 +00:00
reverse=False,
first=None,
last=None,
older=None,
newer=None,
oldest=None,
newest=None,
):
2017-07-31 15:54:45 +00:00
"""
Return list of ArchiveInfo instances according to the parameters.
First match *match* (considering *match_end*), then filter by timestamp considering *older* and *newer*.
Second, follow with a filter considering *oldest* and *newest*, then sort by the given *sort_by* argument.
2017-07-31 15:54:45 +00:00
Apply *first* and *last* filters, and then possibly *reverse* the list.
*sort_by* is a list of sort keys applied in reverse order.
*newer* and *older* are relative time markers that indicate offset from now.
*newest* and *oldest* are relative time markers that indicate offset from newest/oldest archive's timestamp.
Note: for better robustness, all filtering / limiting parameters must default to
"not limit / not filter", so a FULL archive list is produced by a simple .list().
some callers EXPECT to iterate over all archives in a repo for correct operation.
2017-07-31 15:54:45 +00:00
"""
if isinstance(sort_by, (str, bytes)):
raise TypeError("sort_by must be a sequence of str")
archives = self.values()
regex = get_regex_from_pattern(match or "re:.*")
regex = re.compile(regex + match_end)
archives = [x for x in archives if regex.match(x.name) is not None]
if any([oldest, newest, older, newer]):
archives = filter_archives_by_date(archives, oldest=oldest, newest=newest, newer=newer, older=older)
if not consider_checkpoints:
archives = [x for x in archives if ".checkpoint" not in x.name]
2017-07-31 15:54:45 +00:00
for sortkey in reversed(sort_by):
archives.sort(key=attrgetter(sortkey))
if first:
archives = archives[:first]
elif last:
archives = archives[max(len(archives) - last, 0) :]
if reverse:
archives.reverse()
return archives
def list_considering(self, args):
"""
get a list of archives, considering --first/last/prefix/match-archives/sort/consider-checkpoints cmdline args
2017-07-31 15:54:45 +00:00
"""
2022-06-20 13:56:03 +00:00
name = getattr(args, "name", None)
consider_checkpoints = getattr(args, "consider_checkpoints", None)
if name is not None:
raise Error(
2023-07-25 23:10:24 +00:00
"Giving a specific name is incompatible with options --first, --last, "
"-a / --match-archives, and --consider-checkpoints."
)
2022-06-20 13:56:03 +00:00
return self.list(
sort_by=args.sort_by.split(","),
consider_checkpoints=consider_checkpoints,
match=args.match_archives,
first=getattr(args, "first", None),
last=getattr(args, "last", None),
older=getattr(args, "older", None),
newer=getattr(args, "newer", None),
oldest=getattr(args, "oldest", None),
newest=getattr(args, "newest", None),
2022-06-20 13:56:03 +00:00
)
2017-07-31 15:54:45 +00:00
def set_raw_dict(self, d):
"""set the dict we get from the msgpack unpacker"""
for k, v in d.items():
assert isinstance(k, str)
assert isinstance(v, dict) and "id" in v and "time" in v
2017-07-31 15:54:45 +00:00
self._archives[k] = v
def get_raw_dict(self):
"""get the dict we can give to the msgpack packer"""
return self._archives
class Manifest:
@enum.unique
class Operation(enum.Enum):
# The comments here only roughly describe the scope of each feature. In the end, additions need to be
# based on potential problems older clients could produce when accessing newer repositories and the
# trade-offs of locking version out or still allowing access. As all older versions and their exact
2017-07-31 15:54:45 +00:00
# behaviours are known when introducing new features sometimes this might not match the general descriptions
# below.
# The READ operation describes which features are needed to list and extract the archives safely in the
2017-07-31 15:54:45 +00:00
# repository.
READ = "read"
# The CHECK operation is for all operations that need either to understand every detail
# of the repository (for consistency checks and repairs) or are seldom used functions that just
# should use the most restrictive feature set because more fine grained compatibility tracking is
# not needed.
CHECK = "check"
# The WRITE operation is for adding archives. Features here ensure that older clients don't add archives
# in an old format, or is used to lock out clients that for other reasons can no longer safely add new
# archives.
WRITE = "write"
# The DELETE operation is for all operations (like archive deletion) that need a 100% correct reference
# count and the need to be able to find all (directly and indirectly) referenced chunks of a given archive.
DELETE = "delete"
2022-07-15 11:26:35 +00:00
NO_OPERATION_CHECK: Sequence[Operation] = tuple()
2017-07-31 15:54:45 +00:00
2023-04-02 00:14:54 +00:00
SUPPORTED_REPO_FEATURES: frozenset[str] = frozenset([])
2017-07-31 15:54:45 +00:00
MANIFEST_ID = b"\0" * 32
def __init__(self, key, repository, item_keys=None, ro_cls=RepoObj):
2017-07-31 15:54:45 +00:00
self.archives = Archives()
self.config = {}
self.key = key
self.repo_objs = ro_cls(key)
2017-07-31 15:54:45 +00:00
self.repository = repository
self.item_keys = frozenset(item_keys) if item_keys is not None else ITEM_KEYS
self.timestamp = None
@property
def id_str(self):
return bin_to_hex(self.id)
@property
def last_timestamp(self):
return parse_timestamp(self.timestamp)
2017-07-31 15:54:45 +00:00
@classmethod
def load(cls, repository, operations, key=None, *, ro_cls=RepoObj):
from .item import ManifestItem
from .crypto.key import key_factory
from .repository import Repository
2017-07-31 15:54:45 +00:00
try:
cdata = repository.get(cls.MANIFEST_ID)
except Repository.ObjectNotFound:
raise NoManifestError
if not key:
key = key_factory(repository, cdata, ro_cls=ro_cls)
manifest = cls(key, repository, ro_cls=ro_cls)
_, data = manifest.repo_objs.parse(cls.MANIFEST_ID, cdata, ro_type=ROBJ_MANIFEST)
2023-09-16 00:02:44 +00:00
manifest_dict = key.unpack_manifest(data)
2017-07-31 15:54:45 +00:00
m = ManifestItem(internal_dict=manifest_dict)
manifest.id = manifest.repo_objs.id_hash(data)
2017-07-31 15:54:45 +00:00
if m.get("version") not in (1, 2):
raise ValueError("Invalid manifest version")
manifest.archives.set_raw_dict(m.archives)
manifest.timestamp = m.get("timestamp")
manifest.config = m.config
# valid item keys are whatever is known in the repo or every key we know
manifest.item_keys = ITEM_KEYS
manifest.item_keys |= frozenset(m.config.get("item_keys", [])) # new location of item_keys since borg2
manifest.item_keys |= frozenset(m.get("item_keys", [])) # legacy: borg 1.x: item_keys not in config yet
2017-07-31 15:54:45 +00:00
manifest.check_repository_compatibility(operations)
return manifest
2017-07-31 15:54:45 +00:00
def check_repository_compatibility(self, operations):
for operation in operations:
assert isinstance(operation, self.Operation)
feature_flags = self.config.get("feature_flags", None)
2017-07-31 15:54:45 +00:00
if feature_flags is None:
return
if operation.value not in feature_flags:
2017-07-31 15:54:45 +00:00
continue
requirements = feature_flags[operation.value]
if "mandatory" in requirements:
unsupported = set(requirements["mandatory"]) - self.SUPPORTED_REPO_FEATURES
2017-07-31 15:54:45 +00:00
if unsupported:
raise MandatoryFeatureUnsupported(list(unsupported))
2017-07-31 15:54:45 +00:00
def get_all_mandatory_features(self):
result = {}
feature_flags = self.config.get("feature_flags", None)
2017-07-31 15:54:45 +00:00
if feature_flags is None:
return result
for operation, requirements in feature_flags.items():
if "mandatory" in requirements:
result[operation] = set(requirements["mandatory"])
2017-07-31 15:54:45 +00:00
return result
def write(self):
from .item import ManifestItem
2017-07-31 15:54:45 +00:00
# self.timestamp needs to be strictly monotonically increasing. Clocks often are not set correctly
if self.timestamp is None:
self.timestamp = datetime.now(tz=timezone.utc).isoformat(timespec="microseconds")
2017-07-31 15:54:45 +00:00
else:
incremented_ts = self.last_timestamp + timedelta(microseconds=1)
now_ts = datetime.now(tz=timezone.utc)
max_ts = max(incremented_ts, now_ts)
self.timestamp = max_ts.isoformat(timespec="microseconds")
2017-07-31 15:54:45 +00:00
# include checks for limits as enforced by limited unpacker (used by load())
assert len(self.archives) <= MAX_ARCHIVES
assert all(len(name) <= 255 for name in self.archives)
assert len(self.item_keys) <= 100
self.config["item_keys"] = tuple(sorted(self.item_keys))
2017-07-31 15:54:45 +00:00
manifest = ManifestItem(
version=2,
2017-07-31 15:54:45 +00:00
archives=StableDict(self.archives.get_raw_dict()),
timestamp=self.timestamp,
config=StableDict(self.config),
)
2023-09-16 00:02:44 +00:00
data = self.key.pack_metadata(manifest.as_dict())
self.id = self.repo_objs.id_hash(data)
self.repository.put(self.MANIFEST_ID, self.repo_objs.format(self.MANIFEST_ID, {}, data, ro_type=ROBJ_MANIFEST))