mirror of https://github.com/borgbackup/borg.git
1270 lines
46 KiB
Python
1270 lines
46 KiB
Python
import abc
|
|
import argparse
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import os.path
|
|
import re
|
|
import shlex
|
|
import stat
|
|
import uuid
|
|
from typing import Dict, Set, Tuple, ClassVar, Any, TYPE_CHECKING, Literal
|
|
from binascii import hexlify
|
|
from collections import Counter, OrderedDict
|
|
from datetime import datetime, timezone
|
|
from functools import partial
|
|
from string import Formatter
|
|
|
|
from ..logger import create_logger
|
|
|
|
logger = create_logger()
|
|
|
|
from .errors import Error
|
|
from .fs import get_keys_dir, make_path_safe
|
|
from .msgpack import Timestamp
|
|
from .time import OutputTimestamp, format_time, safe_timestamp
|
|
from .. import __version__ as borg_version
|
|
from .. import __version_tuple__ as borg_version_tuple
|
|
from ..constants import * # NOQA
|
|
|
|
if TYPE_CHECKING:
|
|
from ..item import ItemDiff
|
|
|
|
|
|
def bin_to_hex(binary):
|
|
return hexlify(binary).decode("ascii")
|
|
|
|
|
|
def safe_decode(s, coding="utf-8", errors="surrogateescape"):
|
|
"""decode bytes to str, with round-tripping "invalid" bytes"""
|
|
if s is None:
|
|
return None
|
|
return s.decode(coding, errors)
|
|
|
|
|
|
def safe_encode(s, coding="utf-8", errors="surrogateescape"):
|
|
"""encode str to bytes, with round-tripping "invalid" bytes"""
|
|
if s is None:
|
|
return None
|
|
return s.encode(coding, errors)
|
|
|
|
|
|
def remove_surrogates(s, errors="replace"):
|
|
"""Replace surrogates generated by fsdecode with '?'"""
|
|
return s.encode("utf-8", errors).decode("utf-8")
|
|
|
|
|
|
def binary_to_json(key, value):
|
|
assert isinstance(key, str)
|
|
assert isinstance(value, bytes)
|
|
return {key + "_b64": base64.b64encode(value).decode("ascii")}
|
|
|
|
|
|
def text_to_json(key, value):
|
|
"""
|
|
Return a dict made from key/value that can be fed safely into a JSON encoder.
|
|
|
|
JSON can only contain pure, valid unicode (but not: unicode with surrogate escapes).
|
|
|
|
But sometimes we have to deal with such values and we do it like this:
|
|
- <key>: value as pure unicode text (surrogate escapes, if any, replaced by ?)
|
|
- <key>_b64: value as base64 encoded binary representation (only set if value has surrogate-escapes)
|
|
"""
|
|
coding = "utf-8"
|
|
assert isinstance(key, str)
|
|
assert isinstance(value, str) # str might contain surrogate escapes
|
|
data = {}
|
|
try:
|
|
value.encode(coding, errors="strict") # check if pure unicode
|
|
except UnicodeEncodeError:
|
|
# value has surrogate escape sequences
|
|
data[key] = remove_surrogates(value)
|
|
value_bytes = value.encode(coding, errors="surrogateescape")
|
|
data.update(binary_to_json(key, value_bytes))
|
|
else:
|
|
# value is pure unicode
|
|
data[key] = value
|
|
# we do not give the b64 representation, not needed
|
|
return data
|
|
|
|
|
|
def join_cmd(argv, rs=False):
|
|
cmd = shlex.join(argv)
|
|
return remove_surrogates(cmd) if rs else cmd
|
|
|
|
|
|
def eval_escapes(s):
|
|
"""Evaluate literal escape sequences in a string (eg `\\n` -> `\n`)."""
|
|
return s.encode("ascii", "backslashreplace").decode("unicode-escape")
|
|
|
|
|
|
def decode_dict(d, keys, encoding="utf-8", errors="surrogateescape"):
|
|
for key in keys:
|
|
if isinstance(d.get(key), bytes):
|
|
d[key] = d[key].decode(encoding, errors)
|
|
return d
|
|
|
|
|
|
def positive_int_validator(value):
|
|
"""argparse type for positive integers"""
|
|
int_value = int(value)
|
|
if int_value <= 0:
|
|
raise argparse.ArgumentTypeError("A positive integer is required: %s" % value)
|
|
return int_value
|
|
|
|
|
|
def interval(s):
|
|
"""Convert a string representing a valid interval to a number of hours."""
|
|
multiplier = {"H": 1, "d": 24, "w": 24 * 7, "m": 24 * 31, "y": 24 * 365}
|
|
|
|
if s.endswith(tuple(multiplier.keys())):
|
|
number = s[:-1]
|
|
suffix = s[-1]
|
|
else:
|
|
# range suffixes in ascending multiplier order
|
|
ranges = [k for k, v in sorted(multiplier.items(), key=lambda t: t[1])]
|
|
raise argparse.ArgumentTypeError(f'Unexpected interval time unit "{s[-1]}": expected one of {ranges!r}')
|
|
|
|
try:
|
|
hours = int(number) * multiplier[suffix]
|
|
except ValueError:
|
|
hours = -1
|
|
|
|
if hours <= 0:
|
|
raise argparse.ArgumentTypeError('Unexpected interval number "%s": expected an integer greater than 0' % number)
|
|
|
|
return hours
|
|
|
|
|
|
def ChunkerParams(s):
|
|
params = s.strip().split(",")
|
|
count = len(params)
|
|
if count == 0:
|
|
raise argparse.ArgumentTypeError("no chunker params given")
|
|
algo = params[0].lower()
|
|
if algo == CH_FAIL and count == 3:
|
|
block_size = int(params[1])
|
|
fail_map = str(params[2])
|
|
return algo, block_size, fail_map
|
|
if algo == CH_FIXED and 2 <= count <= 3: # fixed, block_size[, header_size]
|
|
block_size = int(params[1])
|
|
header_size = int(params[2]) if count == 3 else 0
|
|
if block_size < 64:
|
|
# we are only disallowing the most extreme cases of abuse here - this does NOT imply
|
|
# that cutting chunks of the minimum allowed size is efficient concerning storage
|
|
# or in-memory chunk management.
|
|
# choose the block (chunk) size wisely: if you have a lot of data and you cut
|
|
# it into very small chunks, you are asking for trouble!
|
|
raise argparse.ArgumentTypeError("block_size must not be less than 64 Bytes")
|
|
if block_size > MAX_DATA_SIZE or header_size > MAX_DATA_SIZE:
|
|
raise argparse.ArgumentTypeError(
|
|
"block_size and header_size must not exceed MAX_DATA_SIZE [%d]" % MAX_DATA_SIZE
|
|
)
|
|
return algo, block_size, header_size
|
|
if algo == "default" and count == 1: # default
|
|
return CHUNKER_PARAMS
|
|
# this must stay last as it deals with old-style compat mode (no algorithm, 4 params, buzhash):
|
|
if algo == CH_BUZHASH and count == 5 or count == 4: # [buzhash, ]chunk_min, chunk_max, chunk_mask, window_size
|
|
chunk_min, chunk_max, chunk_mask, window_size = (int(p) for p in params[count - 4 :])
|
|
if not (chunk_min <= chunk_mask <= chunk_max):
|
|
raise argparse.ArgumentTypeError("required: chunk_min <= chunk_mask <= chunk_max")
|
|
if chunk_min < 6:
|
|
# see comment in 'fixed' algo check
|
|
raise argparse.ArgumentTypeError(
|
|
"min. chunk size exponent must not be less than 6 (2^6 = 64B min. chunk size)"
|
|
)
|
|
if chunk_max > 23:
|
|
raise argparse.ArgumentTypeError(
|
|
"max. chunk size exponent must not be more than 23 (2^23 = 8MiB max. chunk size)"
|
|
)
|
|
return CH_BUZHASH, chunk_min, chunk_max, chunk_mask, window_size
|
|
raise argparse.ArgumentTypeError("invalid chunker params")
|
|
|
|
|
|
def FilesCacheMode(s):
|
|
ENTRIES_MAP = dict(ctime="c", mtime="m", size="s", inode="i", rechunk="r", disabled="d")
|
|
VALID_MODES = ("cis", "ims", "cs", "ms", "cr", "mr", "d", "s") # letters in alpha order
|
|
entries = set(s.strip().split(","))
|
|
if not entries <= set(ENTRIES_MAP):
|
|
raise argparse.ArgumentTypeError(
|
|
"cache mode must be a comma-separated list of: %s" % ",".join(sorted(ENTRIES_MAP))
|
|
)
|
|
short_entries = {ENTRIES_MAP[entry] for entry in entries}
|
|
mode = "".join(sorted(short_entries))
|
|
if mode not in VALID_MODES:
|
|
raise argparse.ArgumentTypeError("cache mode short must be one of: %s" % ",".join(VALID_MODES))
|
|
return mode
|
|
|
|
|
|
def partial_format(format, mapping):
|
|
"""
|
|
Apply format.format_map(mapping) while preserving unknown keys
|
|
|
|
Does not support attribute access, indexing and ![rsa] conversions
|
|
"""
|
|
for key, value in mapping.items():
|
|
key = re.escape(key)
|
|
format = re.sub(
|
|
rf"(?<!\{{)((\{{{key}\}})|(\{{{key}:[^\}}]*\}}))", lambda match: match.group(1).format_map(mapping), format
|
|
)
|
|
return format
|
|
|
|
|
|
class DatetimeWrapper:
|
|
def __init__(self, dt):
|
|
self.dt = dt
|
|
|
|
def __format__(self, format_spec):
|
|
if format_spec == "":
|
|
format_spec = ISO_FORMAT_NO_USECS
|
|
return self.dt.__format__(format_spec)
|
|
|
|
|
|
class PlaceholderError(Error):
|
|
"""Formatting Error: "{}".format({}): {}({})"""
|
|
|
|
|
|
class InvalidPlaceholder(PlaceholderError):
|
|
"""Invalid placeholder "{}" in string: {}"""
|
|
|
|
|
|
def format_line(format, data):
|
|
for _, key, _, conversion in Formatter().parse(format):
|
|
if not key:
|
|
continue
|
|
if conversion or key not in data:
|
|
raise InvalidPlaceholder(key, format)
|
|
try:
|
|
return format.format_map(data)
|
|
except Exception as e:
|
|
raise PlaceholderError(format, data, e.__class__.__name__, str(e))
|
|
|
|
|
|
def _replace_placeholders(text, overrides={}):
|
|
"""Replace placeholders in text with their values."""
|
|
from ..platform import fqdn, hostname, getosusername
|
|
|
|
current_time = datetime.now(timezone.utc)
|
|
data = {
|
|
"pid": os.getpid(),
|
|
"fqdn": fqdn,
|
|
"reverse-fqdn": ".".join(reversed(fqdn.split("."))),
|
|
"hostname": hostname,
|
|
"now": DatetimeWrapper(current_time.astimezone()),
|
|
"utcnow": DatetimeWrapper(current_time),
|
|
"user": getosusername(),
|
|
"uuid4": str(uuid.uuid4()),
|
|
"borgversion": borg_version,
|
|
"borgmajor": "%d" % borg_version_tuple[:1],
|
|
"borgminor": "%d.%d" % borg_version_tuple[:2],
|
|
"borgpatch": "%d.%d.%d" % borg_version_tuple[:3],
|
|
**overrides,
|
|
}
|
|
return format_line(text, data)
|
|
|
|
|
|
class PlaceholderReplacer:
|
|
def __init__(self):
|
|
self.reset()
|
|
|
|
def override(self, key, value):
|
|
self.overrides[key] = value
|
|
|
|
def reset(self):
|
|
self.overrides = {}
|
|
|
|
def __call__(self, text, overrides=None):
|
|
ovr = {}
|
|
ovr.update(self.overrides)
|
|
ovr.update(overrides or {})
|
|
return _replace_placeholders(text, overrides=ovr)
|
|
|
|
|
|
replace_placeholders = PlaceholderReplacer()
|
|
|
|
|
|
def SortBySpec(text):
|
|
from ..manifest import AI_HUMAN_SORT_KEYS
|
|
|
|
for token in text.split(","):
|
|
if token not in AI_HUMAN_SORT_KEYS:
|
|
raise argparse.ArgumentTypeError("Invalid sort key: %s" % token)
|
|
return text.replace("timestamp", "ts")
|
|
|
|
|
|
def format_file_size(v, precision=2, sign=False, iec=False):
|
|
"""Format file size into a human friendly format"""
|
|
fn = sizeof_fmt_iec if iec else sizeof_fmt_decimal
|
|
return fn(v, suffix="B", sep=" ", precision=precision, sign=sign)
|
|
|
|
|
|
class FileSize(int):
|
|
def __new__(cls, value, iec=False):
|
|
obj = int.__new__(cls, value)
|
|
obj.iec = iec
|
|
return obj
|
|
|
|
def __format__(self, format_spec):
|
|
return format_file_size(int(self), iec=self.iec).__format__(format_spec)
|
|
|
|
|
|
def parse_file_size(s):
|
|
"""Return int from file size (1234, 55G, 1.7T)."""
|
|
if not s:
|
|
return int(s) # will raise
|
|
suffix = s[-1]
|
|
power = 1000
|
|
try:
|
|
factor = {"K": power, "M": power**2, "G": power**3, "T": power**4, "P": power**5}[suffix]
|
|
s = s[:-1]
|
|
except KeyError:
|
|
factor = 1
|
|
return int(float(s) * factor)
|
|
|
|
|
|
def parse_storage_quota(storage_quota):
|
|
parsed = parse_file_size(storage_quota)
|
|
if parsed < parse_file_size("10M"):
|
|
raise argparse.ArgumentTypeError("quota is too small (%s). At least 10M are required." % storage_quota)
|
|
return parsed
|
|
|
|
|
|
def sizeof_fmt(num, suffix="B", units=None, power=None, sep="", precision=2, sign=False):
|
|
sign = "+" if sign and num > 0 else ""
|
|
fmt = "{0:{1}.{2}f}{3}{4}{5}"
|
|
prec = 0
|
|
for unit in units[:-1]:
|
|
if abs(round(num, precision)) < power:
|
|
break
|
|
num /= float(power)
|
|
prec = precision
|
|
else:
|
|
unit = units[-1]
|
|
return fmt.format(num, sign, prec, sep, unit, suffix)
|
|
|
|
|
|
def sizeof_fmt_iec(num, suffix="B", sep="", precision=2, sign=False):
|
|
return sizeof_fmt(
|
|
num,
|
|
suffix=suffix,
|
|
sep=sep,
|
|
precision=precision,
|
|
sign=sign,
|
|
units=["", "Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"],
|
|
power=1024,
|
|
)
|
|
|
|
|
|
def sizeof_fmt_decimal(num, suffix="B", sep="", precision=2, sign=False):
|
|
return sizeof_fmt(
|
|
num,
|
|
suffix=suffix,
|
|
sep=sep,
|
|
precision=precision,
|
|
sign=sign,
|
|
units=["", "k", "M", "G", "T", "P", "E", "Z", "Y"],
|
|
power=1000,
|
|
)
|
|
|
|
|
|
def format_archive(archive):
|
|
return "%-36s %s [%s]" % (archive.name, format_time(archive.ts), bin_to_hex(archive.id))
|
|
|
|
|
|
def parse_stringified_list(s):
|
|
items = re.split(" *, *", s)
|
|
return [item for item in items if item != ""]
|
|
|
|
|
|
class Location:
|
|
"""Object representing a repository location"""
|
|
|
|
# user must not contain "@", ":" or "/".
|
|
# Quoting adduser error message:
|
|
# "To avoid problems, the username should consist only of letters, digits,
|
|
# underscores, periods, at signs and dashes, and not start with a dash
|
|
# (as defined by IEEE Std 1003.1-2001)."
|
|
# We use "@" as separator between username and hostname, so we must
|
|
# disallow it within the pure username part.
|
|
optional_user_re = r"""
|
|
(?:(?P<user>[^@:/]+)@)?
|
|
"""
|
|
|
|
# path must not contain :: (it ends at :: or string end), but may contain single colons.
|
|
# to avoid ambiguities with other regexes, it must also not start with ":" nor with "//" nor with "ssh://".
|
|
local_path_re = r"""
|
|
(?!(:|//|ssh://|socket://)) # not starting with ":" or // or ssh:// or socket://
|
|
(?P<path>([^:]|(:(?!:)))+) # any chars, but no "::"
|
|
"""
|
|
|
|
# file_path must not contain :: (it ends at :: or string end), but may contain single colons.
|
|
# it must start with a / and that slash is part of the path.
|
|
file_path_re = r"""
|
|
(?P<path>(([^/]*)/([^:]|(:(?!:)))+)) # start opt. servername, then /, then any chars, but no "::"
|
|
"""
|
|
|
|
# abs_path must not contain :: (it ends at :: or string end), but may contain single colons.
|
|
# it must start with a / and that slash is part of the path.
|
|
abs_path_re = r"""
|
|
(?P<path>(/([^:]|(:(?!:)))+)) # start with /, then any chars, but no "::"
|
|
"""
|
|
|
|
# host NAME, or host IP ADDRESS (v4 or v6, v6 must be in square brackets)
|
|
host_re = r"""
|
|
(?P<host>(
|
|
(?!\[)[^:/]+(?<!\]) # hostname or v4 addr, not containing : or / (does not match v6 addr: no brackets!)
|
|
|
|
|
\[[0-9a-fA-F:.]+\]) # ipv6 address in brackets
|
|
)
|
|
"""
|
|
|
|
# regexes for misc. kinds of supported location specifiers:
|
|
ssh_re = re.compile(
|
|
r"""
|
|
(?P<proto>ssh):// # ssh://
|
|
"""
|
|
+ optional_user_re
|
|
+ host_re
|
|
+ r""" # user@ (optional), host name or address
|
|
(?::(?P<port>\d+))? # :port (optional)
|
|
"""
|
|
+ abs_path_re,
|
|
re.VERBOSE,
|
|
) # path
|
|
|
|
socket_re = re.compile(
|
|
r"""
|
|
(?P<proto>socket):// # socket://
|
|
"""
|
|
+ abs_path_re,
|
|
re.VERBOSE,
|
|
) # path
|
|
|
|
file_re = re.compile(
|
|
r"""
|
|
(?P<proto>file):// # file://
|
|
"""
|
|
+ file_path_re,
|
|
re.VERBOSE,
|
|
) # servername/path or path
|
|
|
|
local_re = re.compile(local_path_re, re.VERBOSE) # local path
|
|
|
|
win_file_re = re.compile(
|
|
r"""
|
|
(?:file://)? # optional file protocol
|
|
(?P<path>
|
|
(?:[a-zA-Z]:)? # Drive letter followed by a colon (optional)
|
|
(?:[^:]+) # Anything which does not contain a :, at least one char
|
|
)
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
|
|
def __init__(self, text="", overrides={}, other=False):
|
|
self.repo_env_var = "BORG_OTHER_REPO" if other else "BORG_REPO"
|
|
self.valid = False
|
|
self.proto = None
|
|
self.user = None
|
|
self._host = None
|
|
self.port = None
|
|
self.path = None
|
|
self.raw = None
|
|
self.processed = None
|
|
self.parse(text, overrides)
|
|
|
|
def parse(self, text, overrides={}):
|
|
if not text:
|
|
# we did not get a text to parse, so we try to fetch from the environment
|
|
text = os.environ.get(self.repo_env_var)
|
|
if text is None:
|
|
return
|
|
|
|
self.raw = text # as given by user, might contain placeholders
|
|
self.processed = replace_placeholders(self.raw, overrides) # after placeholder replacement
|
|
valid = self._parse(self.processed)
|
|
if valid:
|
|
self.valid = True
|
|
else:
|
|
raise ValueError('Invalid location format: "%s"' % self.processed)
|
|
|
|
def _parse(self, text):
|
|
def normpath_special(p):
|
|
# avoid that normpath strips away our relative path hack and even makes p absolute
|
|
relative = p.startswith("/./")
|
|
p = os.path.normpath(p)
|
|
return ("/." + p) if relative else p
|
|
|
|
m = self.ssh_re.match(text)
|
|
if m:
|
|
self.proto = m.group("proto")
|
|
self.user = m.group("user")
|
|
self._host = m.group("host")
|
|
self.port = m.group("port") and int(m.group("port")) or None
|
|
self.path = normpath_special(m.group("path"))
|
|
return True
|
|
m = self.file_re.match(text)
|
|
if m:
|
|
self.proto = m.group("proto")
|
|
self.path = normpath_special(m.group("path"))
|
|
return True
|
|
m = self.socket_re.match(text)
|
|
if m:
|
|
self.proto = m.group("proto")
|
|
self.path = normpath_special(m.group("path"))
|
|
return True
|
|
m = self.local_re.match(text)
|
|
if m:
|
|
self.proto = "file"
|
|
self.path = normpath_special(m.group("path"))
|
|
return True
|
|
return False
|
|
|
|
def __str__(self):
|
|
items = [
|
|
"proto=%r" % self.proto,
|
|
"user=%r" % self.user,
|
|
"host=%r" % self.host,
|
|
"port=%r" % self.port,
|
|
"path=%r" % self.path,
|
|
]
|
|
return ", ".join(items)
|
|
|
|
def to_key_filename(self):
|
|
name = re.sub(r"[^\w]", "_", self.path).strip("_")
|
|
if self.proto not in ("file", "socket"):
|
|
name = re.sub(r"[^\w]", "_", self.host) + "__" + name
|
|
if len(name) > 100:
|
|
# Limit file names to some reasonable length. Most file systems
|
|
# limit them to 255 [unit of choice]; due to variations in unicode
|
|
# handling we truncate to 100 *characters*.
|
|
name = name[:100]
|
|
return os.path.join(get_keys_dir(), name)
|
|
|
|
def __repr__(self):
|
|
return "Location(%s)" % self
|
|
|
|
@property
|
|
def host(self):
|
|
# strip square brackets used for IPv6 addrs
|
|
if self._host is not None:
|
|
return self._host.lstrip("[").rstrip("]")
|
|
|
|
def canonical_path(self):
|
|
if self.proto in ("file", "socket"):
|
|
return self.path
|
|
else:
|
|
if self.path and self.path.startswith("~"):
|
|
path = "/" + self.path # /~/x = path x relative to home dir
|
|
elif self.path and not self.path.startswith("/"):
|
|
path = "/./" + self.path # /./x = path x relative to cwd
|
|
else:
|
|
path = self.path
|
|
return "ssh://{}{}{}{}".format(
|
|
f"{self.user}@" if self.user else "",
|
|
self._host, # needed for ipv6 addrs
|
|
f":{self.port}" if self.port else "",
|
|
path,
|
|
)
|
|
|
|
def with_timestamp(self, timestamp):
|
|
# note: this only affects the repository URL/path, not the archive name!
|
|
return Location(
|
|
self.raw,
|
|
overrides={
|
|
"now": DatetimeWrapper(timestamp),
|
|
"utcnow": DatetimeWrapper(timestamp.astimezone(timezone.utc)),
|
|
},
|
|
)
|
|
|
|
|
|
def location_validator(proto=None, other=False):
|
|
def validator(text):
|
|
try:
|
|
loc = Location(text, other=other)
|
|
except ValueError as err:
|
|
raise argparse.ArgumentTypeError(str(err)) from None
|
|
if proto is not None and loc.proto != proto:
|
|
if proto == "file":
|
|
raise argparse.ArgumentTypeError('"%s": Repository must be local' % text)
|
|
else:
|
|
raise argparse.ArgumentTypeError('"%s": Repository must be remote' % text)
|
|
return loc
|
|
|
|
return validator
|
|
|
|
|
|
def relative_time_marker_validator(text: str):
|
|
time_marker_regex = r"^\d+[md]$"
|
|
match = re.compile(time_marker_regex).search(text)
|
|
if not match:
|
|
raise argparse.ArgumentTypeError(f"Invalid relative time marker used: {text}")
|
|
else:
|
|
return text
|
|
|
|
|
|
def text_validator(*, name, max_length, min_length=0, invalid_ctrl_chars="\0", invalid_chars="", no_blanks=False):
|
|
def validator(text):
|
|
assert isinstance(text, str)
|
|
if len(text) < min_length:
|
|
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [length < {min_length}]')
|
|
if len(text) > max_length:
|
|
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [length > {max_length}]')
|
|
if invalid_ctrl_chars and re.search(f"[{re.escape(invalid_ctrl_chars)}]", text):
|
|
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [invalid control chars detected]')
|
|
if invalid_chars and re.search(f"[{re.escape(invalid_chars)}]", text):
|
|
raise argparse.ArgumentTypeError(
|
|
f'Invalid {name}: "{text}" [invalid chars detected matching "{invalid_chars}"]'
|
|
)
|
|
if no_blanks and (text.startswith(" ") or text.endswith(" ")):
|
|
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [leading or trailing blanks detected]')
|
|
try:
|
|
text.encode("utf-8", errors="strict")
|
|
except UnicodeEncodeError:
|
|
# looks like text contains surrogate-escapes
|
|
raise argparse.ArgumentTypeError(f'Invalid {name}: "{text}" [contains non-unicode characters]')
|
|
return text
|
|
|
|
return validator
|
|
|
|
|
|
comment_validator = text_validator(name="comment", max_length=10000)
|
|
|
|
|
|
def archivename_validator(text):
|
|
# we make sure that the archive name can be used as directory name (for borg mount)
|
|
MAX_PATH = 260 # Windows default. Since Win10, there is a registry setting LongPathsEnabled to get more.
|
|
MAX_DIRNAME = MAX_PATH - len("12345678.123")
|
|
SAFETY_MARGIN = 48 # borgfs path: mountpoint / archivename / dir / dir / ... / file
|
|
MAX_ARCHIVENAME = MAX_DIRNAME - SAFETY_MARGIN
|
|
invalid_ctrl_chars = "".join(chr(i) for i in range(32))
|
|
# note: ":" is also an invalid path char on windows, but we can not blacklist it,
|
|
# because e.g. our {now} placeholder creates ISO-8601 like output like 2022-12-10T20:47:42 .
|
|
invalid_chars = r"/" + r"\"<|>?*" # posix + windows
|
|
validate_text = text_validator(
|
|
name="archive name",
|
|
min_length=1,
|
|
max_length=MAX_ARCHIVENAME,
|
|
invalid_ctrl_chars=invalid_ctrl_chars,
|
|
invalid_chars=invalid_chars,
|
|
no_blanks=True,
|
|
)
|
|
return validate_text(text)
|
|
|
|
|
|
class BaseFormatter(metaclass=abc.ABCMeta):
|
|
format: str
|
|
static_data: Dict[str, Any]
|
|
FIXED_KEYS: ClassVar[Dict[str, str]] = {
|
|
# Formatting aids
|
|
"LF": "\n",
|
|
"SPACE": " ",
|
|
"TAB": "\t",
|
|
"CR": "\r",
|
|
"NUL": "\0",
|
|
"NEWLINE": "\n",
|
|
"NL": "\n", # \n is automatically converted to os.linesep on write
|
|
}
|
|
KEY_DESCRIPTIONS: ClassVar[Dict[str, str]] = {
|
|
"NEWLINE": "OS dependent line separator",
|
|
"NL": "alias of NEWLINE",
|
|
"NUL": "NUL character for creating print0 / xargs -0 like output",
|
|
"SPACE": "space character",
|
|
"TAB": "tab character",
|
|
"CR": "carriage return character",
|
|
"LF": "line feed character",
|
|
}
|
|
KEY_GROUPS: ClassVar[Tuple[Tuple[str, ...], ...]] = (("NEWLINE", "NL", "NUL", "SPACE", "TAB", "CR", "LF"),)
|
|
|
|
def __init__(self, format: str, static: Dict[str, Any]) -> None:
|
|
self.format = partial_format(format, static)
|
|
self.static_data = static
|
|
|
|
@abc.abstractmethod
|
|
def get_item_data(self, item, jsonline=False) -> dict:
|
|
raise NotImplementedError
|
|
|
|
def format_item(self, item, jsonline=False, sort=False):
|
|
data = self.get_item_data(item, jsonline)
|
|
return (
|
|
f"{json.dumps(data, cls=BorgJsonEncoder, sort_keys=sort)}\n" if jsonline else self.format.format_map(data)
|
|
)
|
|
|
|
@classmethod
|
|
def keys_help(cls):
|
|
help = []
|
|
keys: Set[str] = set()
|
|
keys.update(cls.KEY_DESCRIPTIONS.keys())
|
|
keys.update(key for group in cls.KEY_GROUPS for key in group)
|
|
|
|
for group in cls.KEY_GROUPS:
|
|
for key in group:
|
|
keys.remove(key)
|
|
text = "- " + key
|
|
if key in cls.KEY_DESCRIPTIONS:
|
|
text += ": " + cls.KEY_DESCRIPTIONS[key]
|
|
help.append(text)
|
|
help.append("")
|
|
assert not keys, str(keys)
|
|
return "\n".join(help)
|
|
|
|
|
|
class ArchiveFormatter(BaseFormatter):
|
|
KEY_DESCRIPTIONS = {
|
|
"archive": "archive name",
|
|
"name": 'alias of "archive"',
|
|
"comment": "archive comment",
|
|
# *start* is the key used by borg-info for this timestamp, this makes the formats more compatible
|
|
"start": "time (start) of creation of the archive",
|
|
"time": 'alias of "start"',
|
|
"end": "time (end) of creation of the archive",
|
|
"command_line": "command line which was used to create the archive",
|
|
"id": "internal ID of the archive",
|
|
"hostname": "hostname of host on which this archive was created",
|
|
"username": "username of user who created this archive",
|
|
"size": "size of this archive (data plus metadata, not considering compression and deduplication)",
|
|
"nfiles": "count of files in this archive",
|
|
}
|
|
KEY_GROUPS = (
|
|
("archive", "name", "comment", "id"),
|
|
("start", "time", "end", "command_line"),
|
|
("hostname", "username"),
|
|
("size", "nfiles"),
|
|
)
|
|
|
|
def __init__(self, format, repository, manifest, key, *, iec=False):
|
|
static_data = {} # here could be stuff on repo level, above archive level
|
|
static_data.update(self.FIXED_KEYS)
|
|
super().__init__(format, static_data)
|
|
self.repository = repository
|
|
self.manifest = manifest
|
|
self.key = key
|
|
self.name = None
|
|
self.id = None
|
|
self._archive = None
|
|
self.iec = iec
|
|
self.format_keys = {f[1] for f in Formatter().parse(format)}
|
|
self.call_keys = {
|
|
"hostname": partial(self.get_meta, "hostname", ""),
|
|
"username": partial(self.get_meta, "username", ""),
|
|
"comment": partial(self.get_meta, "comment", ""),
|
|
"command_line": partial(self.get_meta, "command_line", ""),
|
|
"size": partial(self.get_meta, "size", 0),
|
|
"nfiles": partial(self.get_meta, "nfiles", 0),
|
|
"end": self.get_ts_end,
|
|
}
|
|
self.used_call_keys = set(self.call_keys) & self.format_keys
|
|
|
|
def get_item_data(self, archive_info, jsonline=False):
|
|
self.name = archive_info.name
|
|
self.id = archive_info.id
|
|
item_data = {}
|
|
item_data.update({} if jsonline else self.static_data)
|
|
item_data.update(
|
|
{
|
|
"name": archive_info.name,
|
|
"archive": archive_info.name,
|
|
"id": bin_to_hex(archive_info.id),
|
|
"time": self.format_time(archive_info.ts),
|
|
"start": self.format_time(archive_info.ts),
|
|
}
|
|
)
|
|
for key in self.used_call_keys:
|
|
item_data[key] = self.call_keys[key]()
|
|
|
|
# Note: name and comment are validated, should never contain surrogate escapes.
|
|
# But unsure whether hostname, username, command_line could contain surrogate escapes, play safe:
|
|
for key in "hostname", "username", "command_line":
|
|
if key in item_data:
|
|
item_data.update(text_to_json(key, item_data[key]))
|
|
return item_data
|
|
|
|
@property
|
|
def archive(self):
|
|
"""lazy load / update loaded archive"""
|
|
if self._archive is None or self._archive.id != self.id:
|
|
from ..archive import Archive
|
|
|
|
self._archive = Archive(self.manifest, self.name, iec=self.iec)
|
|
return self._archive
|
|
|
|
def get_meta(self, key, default=None):
|
|
return self.archive.metadata.get(key, default)
|
|
|
|
def get_ts_end(self):
|
|
return self.format_time(self.archive.ts_end)
|
|
|
|
def format_time(self, ts):
|
|
return OutputTimestamp(ts)
|
|
|
|
|
|
class ItemFormatter(BaseFormatter):
|
|
# we provide the hash algos from python stdlib (except shake_*) and additionally xxh64.
|
|
# shake_* is not provided because it uses an incompatible .digest() method to support variable length.
|
|
hash_algorithms = set(hashlib.algorithms_guaranteed).union({"xxh64"}).difference({"shake_128", "shake_256"})
|
|
KEY_DESCRIPTIONS = {
|
|
"type": "file type (file, dir, symlink, ...)",
|
|
"mode": "file mode (as in stat)",
|
|
"uid": "user id of file owner",
|
|
"gid": "group id of file owner",
|
|
"user": "user name of file owner",
|
|
"group": "group name of file owner",
|
|
"path": "file path",
|
|
"target": "link target for symlinks",
|
|
"hlid": "hard link identity (same if hardlinking same fs object)",
|
|
"flags": "file flags",
|
|
"extra": 'prepends {target} with " -> " for soft links and " link to " for hard links',
|
|
"size": "file size",
|
|
"dsize": "deduplicated size",
|
|
"num_chunks": "number of chunks in this file",
|
|
"unique_chunks": "number of unique chunks in this file",
|
|
"mtime": "file modification time",
|
|
"ctime": "file change time",
|
|
"atime": "file access time",
|
|
"isomtime": "file modification time (ISO 8601 format)",
|
|
"isoctime": "file change time (ISO 8601 format)",
|
|
"isoatime": "file access time (ISO 8601 format)",
|
|
"xxh64": "XXH64 checksum of this file (note: this is NOT a cryptographic hash!)",
|
|
"health": 'either "healthy" (file ok) or "broken" (if file has all-zero replacement chunks)',
|
|
"archiveid": "internal ID of the archive",
|
|
"archivename": "name of the archive",
|
|
}
|
|
KEY_GROUPS = (
|
|
("type", "mode", "uid", "gid", "user", "group", "path", "target", "hlid", "flags"),
|
|
("size", "dsize", "num_chunks", "unique_chunks"),
|
|
("mtime", "ctime", "atime", "isomtime", "isoctime", "isoatime"),
|
|
tuple(sorted(hash_algorithms)),
|
|
("archiveid", "archivename", "extra"),
|
|
("health",),
|
|
)
|
|
|
|
KEYS_REQUIRING_CACHE = ("dsize", "unique_chunks")
|
|
|
|
@classmethod
|
|
def format_needs_cache(cls, format):
|
|
format_keys = {f[1] for f in Formatter().parse(format)}
|
|
return any(key in cls.KEYS_REQUIRING_CACHE for key in format_keys)
|
|
|
|
def __init__(self, archive, format):
|
|
from ..checksums import StreamingXXH64
|
|
|
|
static_data = {"archivename": archive.name, "archiveid": archive.fpr}
|
|
static_data.update(self.FIXED_KEYS)
|
|
super().__init__(format, static_data)
|
|
self.xxh64 = StreamingXXH64
|
|
self.archive = archive
|
|
self.format_keys = {f[1] for f in Formatter().parse(format)}
|
|
self.call_keys = {
|
|
"size": self.calculate_size,
|
|
"dsize": partial(self.sum_unique_chunks_metadata, lambda chunk: chunk.size),
|
|
"num_chunks": self.calculate_num_chunks,
|
|
"unique_chunks": partial(self.sum_unique_chunks_metadata, lambda chunk: 1),
|
|
"isomtime": partial(self.format_iso_time, "mtime"),
|
|
"isoctime": partial(self.format_iso_time, "ctime"),
|
|
"isoatime": partial(self.format_iso_time, "atime"),
|
|
"mtime": partial(self.format_time, "mtime"),
|
|
"ctime": partial(self.format_time, "ctime"),
|
|
"atime": partial(self.format_time, "atime"),
|
|
}
|
|
for hash_function in self.hash_algorithms:
|
|
self.call_keys[hash_function] = partial(self.hash_item, hash_function)
|
|
self.used_call_keys = set(self.call_keys) & self.format_keys
|
|
|
|
def get_item_data(self, item, jsonline=False):
|
|
item_data = {}
|
|
item_data.update({} if jsonline else self.static_data)
|
|
|
|
item_data.update(text_to_json("path", item.path))
|
|
target = item.get("target", "")
|
|
item_data.update(text_to_json("target", target))
|
|
if not jsonline:
|
|
item_data["extra"] = "" if not target else f" -> {item_data['target']}"
|
|
|
|
hlid = item.get("hlid")
|
|
hlid = bin_to_hex(hlid) if hlid else ""
|
|
item_data["hlid"] = hlid
|
|
|
|
mode = stat.filemode(item.mode)
|
|
item_type = mode[0]
|
|
item_data["type"] = item_type
|
|
item_data["mode"] = mode
|
|
|
|
item_data["uid"] = item.get("uid") # int or None
|
|
item_data["gid"] = item.get("gid") # int or None
|
|
item_data.update(text_to_json("user", item.get("user", str(item_data["uid"]))))
|
|
item_data.update(text_to_json("group", item.get("group", str(item_data["gid"]))))
|
|
|
|
if jsonline:
|
|
item_data["healthy"] = "chunks_healthy" not in item
|
|
else:
|
|
item_data["health"] = "broken" if "chunks_healthy" in item else "healthy"
|
|
item_data["flags"] = item.get("bsdflags") # int if flags known, else (if flags unknown) None
|
|
for key in self.used_call_keys:
|
|
item_data[key] = self.call_keys[key](item)
|
|
return item_data
|
|
|
|
def sum_unique_chunks_metadata(self, metadata_func, item):
|
|
"""
|
|
sum unique chunks metadata, a unique chunk is a chunk which is referenced globally as often as it is in the
|
|
item
|
|
|
|
item: The item to sum its unique chunks' metadata
|
|
metadata_func: A function that takes a parameter of type ChunkIndexEntry and returns a number, used to return
|
|
the metadata needed from the chunk
|
|
"""
|
|
chunk_index = self.archive.cache.chunks
|
|
chunks = item.get("chunks", [])
|
|
chunks_counter = Counter(c.id for c in chunks)
|
|
return sum(metadata_func(c) for c in chunks if chunk_index[c.id].refcount == chunks_counter[c.id])
|
|
|
|
def calculate_num_chunks(self, item):
|
|
return len(item.get("chunks", []))
|
|
|
|
def calculate_size(self, item):
|
|
# note: does not support hardlink slaves, they will be size 0
|
|
return item.get_size()
|
|
|
|
def hash_item(self, hash_function, item):
|
|
if "chunks" not in item:
|
|
return ""
|
|
if hash_function == "xxh64":
|
|
hash = self.xxh64()
|
|
elif hash_function in self.hash_algorithms:
|
|
hash = hashlib.new(hash_function)
|
|
for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks]):
|
|
hash.update(data)
|
|
return hash.hexdigest()
|
|
|
|
def format_time(self, key, item):
|
|
return OutputTimestamp(safe_timestamp(item.get(key) or item.mtime))
|
|
|
|
def format_iso_time(self, key, item):
|
|
return self.format_time(key, item).isoformat()
|
|
|
|
|
|
class DiffFormatter(BaseFormatter):
|
|
KEY_DESCRIPTIONS = {
|
|
"path": "archived file path",
|
|
"change": "all available changes",
|
|
"content": "file content change",
|
|
"mode": "file mode change",
|
|
"type": "file type change",
|
|
"owner": "file owner (user/group) change",
|
|
"user": "file user change",
|
|
"group": "file group change",
|
|
"link": "file link change",
|
|
"directory": "file directory change",
|
|
"blkdev": "file block device change",
|
|
"chrdev": "file character device change",
|
|
"fifo": "file fifo change",
|
|
"mtime": "file modification time change",
|
|
"ctime": "file change time change",
|
|
"isomtime": "file modification time change (ISO 8601)",
|
|
"isoctime": "file creation time change (ISO 8601)",
|
|
}
|
|
KEY_GROUPS = (
|
|
("path", "change"),
|
|
("content", "mode", "type", "owner", "group", "user"),
|
|
("link", "directory", "blkdev", "chrdev", "fifo"),
|
|
("mtime", "ctime", "isomtime", "isoctime"),
|
|
)
|
|
METADATA = ("mode", "type", "owner", "group", "user", "mtime", "ctime")
|
|
|
|
def __init__(self, format, content_only=False):
|
|
static_data = {}
|
|
static_data.update(self.FIXED_KEYS)
|
|
super().__init__(format or "{content}{link}{directory}{blkdev}{chrdev}{fifo} {path}{NL}", static_data)
|
|
self.content_only = content_only
|
|
self.format_keys = {f[1] for f in Formatter().parse(format)}
|
|
self.call_keys = {
|
|
"content": self.format_content,
|
|
"mode": self.format_mode,
|
|
"type": partial(self.format_mode, filetype=True),
|
|
"owner": partial(self.format_owner),
|
|
"group": partial(self.format_owner, spec="group"),
|
|
"user": partial(self.format_owner, spec="user"),
|
|
"link": partial(self.format_other, "link"),
|
|
"directory": partial(self.format_other, "directory"),
|
|
"blkdev": partial(self.format_other, "blkdev"),
|
|
"chrdev": partial(self.format_other, "chrdev"),
|
|
"fifo": partial(self.format_other, "fifo"),
|
|
"mtime": partial(self.format_time, "mtime"),
|
|
"ctime": partial(self.format_time, "ctime"),
|
|
"isomtime": partial(self.format_iso_time, "mtime"),
|
|
"isoctime": partial(self.format_iso_time, "ctime"),
|
|
}
|
|
self.used_call_keys = set(self.call_keys) & self.format_keys
|
|
if self.content_only:
|
|
self.used_call_keys -= set(self.METADATA)
|
|
|
|
def get_item_data(self, item: "ItemDiff", jsonline=False) -> dict:
|
|
diff_data = {}
|
|
for key in self.used_call_keys:
|
|
diff_data[key] = self.call_keys[key](item)
|
|
|
|
change = []
|
|
for key in self.call_keys:
|
|
if key in ("isomtime", "isoctime"):
|
|
continue
|
|
if self.content_only and key in self.METADATA:
|
|
continue
|
|
change.append(self.call_keys[key](item))
|
|
diff_data["change"] = " ".join([v for v in change if v])
|
|
diff_data["path"] = item.path
|
|
diff_data.update({} if jsonline else self.static_data)
|
|
return diff_data
|
|
|
|
def format_other(self, key, diff: "ItemDiff"):
|
|
change = diff.changes().get(key)
|
|
return f"{change.diff_type}".ljust(27) if change else "" # 27 is the length of the content change
|
|
|
|
def format_mode(self, diff: "ItemDiff", filetype=False):
|
|
change = diff.type() if filetype else diff.mode()
|
|
return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
|
|
|
|
def format_owner(self, diff: "ItemDiff", spec: Literal["owner", "user", "group"] = "owner"):
|
|
if spec == "user":
|
|
change = diff.user()
|
|
return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
|
|
if spec == "group":
|
|
change = diff.group()
|
|
return f"[{change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
|
|
if spec != "owner":
|
|
raise ValueError(f"Invalid owner spec: {spec}")
|
|
change = diff.owner()
|
|
if change:
|
|
return "[{}:{} -> {}:{}]".format(
|
|
change.diff_data["item1"][0],
|
|
change.diff_data["item1"][1],
|
|
change.diff_data["item2"][0],
|
|
change.diff_data["item2"][1],
|
|
)
|
|
return ""
|
|
|
|
def format_content(self, diff: "ItemDiff"):
|
|
change = diff.content()
|
|
if change:
|
|
if change.diff_type == "added":
|
|
return "{}: {:>20}".format(change.diff_type, format_file_size(change.diff_data["added"]))
|
|
if change.diff_type == "removed":
|
|
return "{}: {:>18}".format(change.diff_type, format_file_size(change.diff_data["removed"]))
|
|
if "added" not in change.diff_data and "removed" not in change.diff_data:
|
|
return "modified: (can't get size)"
|
|
return "{}: {:>8} {:>8}".format(
|
|
change.diff_type,
|
|
format_file_size(change.diff_data["added"], precision=1, sign=True),
|
|
format_file_size(-change.diff_data["removed"], precision=1, sign=True),
|
|
)
|
|
return ""
|
|
|
|
def format_time(self, key, diff: "ItemDiff"):
|
|
change = diff.changes().get(key)
|
|
return f"[{key}: {change.diff_data['item1']} -> {change.diff_data['item2']}]" if change else ""
|
|
|
|
def format_iso_time(self, key, diff: "ItemDiff"):
|
|
change = diff.changes().get(key)
|
|
return (
|
|
f"[{key}: {change.diff_data['item1'].isoformat()} -> {change.diff_data['item2'].isoformat()}]"
|
|
if change
|
|
else ""
|
|
)
|
|
|
|
|
|
def file_status(mode):
|
|
if stat.S_ISREG(mode):
|
|
return "A"
|
|
elif stat.S_ISDIR(mode):
|
|
return "d"
|
|
elif stat.S_ISBLK(mode):
|
|
return "b"
|
|
elif stat.S_ISCHR(mode):
|
|
return "c"
|
|
elif stat.S_ISLNK(mode):
|
|
return "s"
|
|
elif stat.S_ISFIFO(mode):
|
|
return "f"
|
|
return "?"
|
|
|
|
|
|
def clean_lines(lines, lstrip=None, rstrip=None, remove_empty=True, remove_comments=True):
|
|
"""
|
|
clean lines (usually read from a config file):
|
|
|
|
1. strip whitespace (left and right), 2. remove empty lines, 3. remove comments.
|
|
|
|
note: only "pure comment lines" are supported, no support for "trailing comments".
|
|
|
|
:param lines: input line iterator (e.g. list or open text file) that gives unclean input lines
|
|
:param lstrip: lstrip call arguments or False, if lstripping is not desired
|
|
:param rstrip: rstrip call arguments or False, if rstripping is not desired
|
|
:param remove_comments: remove comment lines (lines starting with "#")
|
|
:param remove_empty: remove empty lines
|
|
:return: yields processed lines
|
|
"""
|
|
for line in lines:
|
|
if lstrip is not False:
|
|
line = line.lstrip(lstrip)
|
|
if rstrip is not False:
|
|
line = line.rstrip(rstrip)
|
|
if remove_empty and not line:
|
|
continue
|
|
if remove_comments and line.startswith("#"):
|
|
continue
|
|
yield line
|
|
|
|
|
|
def swidth_slice(string, max_width):
|
|
"""
|
|
Return a slice of *max_width* cells from *string*.
|
|
|
|
Negative *max_width* means from the end of string.
|
|
|
|
*max_width* is in units of character cells (or "columns").
|
|
Latin characters are usually one cell wide, many CJK characters are two cells wide.
|
|
"""
|
|
from ..platform import swidth
|
|
|
|
reverse = max_width < 0
|
|
max_width = abs(max_width)
|
|
if reverse:
|
|
string = reversed(string)
|
|
current_swidth = 0
|
|
result = []
|
|
for character in string:
|
|
current_swidth += swidth(character)
|
|
if current_swidth > max_width:
|
|
break
|
|
result.append(character)
|
|
if reverse:
|
|
result.reverse()
|
|
return "".join(result)
|
|
|
|
|
|
def ellipsis_truncate(msg, space):
|
|
"""
|
|
shorten a long string by adding ellipsis between it and return it, example:
|
|
this_is_a_very_long_string -------> this_is..._string
|
|
"""
|
|
from ..platform import swidth
|
|
|
|
ellipsis_width = swidth("...")
|
|
msg_width = swidth(msg)
|
|
if space < 8:
|
|
# if there is very little space, just show ...
|
|
return "..." + " " * (space - ellipsis_width)
|
|
if space < ellipsis_width + msg_width:
|
|
return f"{swidth_slice(msg, space // 2 - ellipsis_width)}...{swidth_slice(msg, -space // 2)}"
|
|
return msg + " " * (space - msg_width)
|
|
|
|
|
|
class BorgJsonEncoder(json.JSONEncoder):
|
|
def default(self, o):
|
|
from ..repository import Repository
|
|
from ..remote import RemoteRepository
|
|
from ..archive import Archive
|
|
from ..cache import LocalCache, AdHocCache
|
|
|
|
if isinstance(o, Repository) or isinstance(o, RemoteRepository):
|
|
return {"id": bin_to_hex(o.id), "location": o._location.canonical_path()}
|
|
if isinstance(o, Archive):
|
|
return o.info()
|
|
if isinstance(o, LocalCache):
|
|
return {"path": o.path, "stats": o.stats()}
|
|
if isinstance(o, AdHocCache):
|
|
return {"stats": o.stats()}
|
|
if callable(getattr(o, "to_json", None)):
|
|
return o.to_json()
|
|
return super().default(o)
|
|
|
|
|
|
def basic_json_data(manifest, *, cache=None, extra=None):
|
|
key = manifest.key
|
|
data = extra or {}
|
|
data.update({"repository": BorgJsonEncoder().default(manifest.repository), "encryption": {"mode": key.ARG_NAME}})
|
|
data["repository"]["last_modified"] = OutputTimestamp(manifest.last_timestamp)
|
|
if key.NAME.startswith("key file"):
|
|
data["encryption"]["keyfile"] = key.find_key()
|
|
if cache:
|
|
data["cache"] = cache
|
|
return data
|
|
|
|
|
|
def json_dump(obj):
|
|
"""Dump using BorgJSONEncoder."""
|
|
return json.dumps(obj, sort_keys=True, indent=4, cls=BorgJsonEncoder)
|
|
|
|
|
|
def json_print(obj):
|
|
print(json_dump(obj))
|
|
|
|
|
|
def prepare_dump_dict(d):
|
|
def decode_bytes(value):
|
|
# this should somehow be reversible later, but usual strings should
|
|
# look nice and chunk ids should mostly show in hex. Use a special
|
|
# inband signaling character (ASCII DEL) to distinguish between
|
|
# decoded and hex mode.
|
|
if not value.startswith(b"\x7f"):
|
|
try:
|
|
value = value.decode()
|
|
return value
|
|
except UnicodeDecodeError:
|
|
pass
|
|
return "\u007f" + bin_to_hex(value)
|
|
|
|
def decode_tuple(t):
|
|
res = []
|
|
for value in t:
|
|
if isinstance(value, dict):
|
|
value = decode(value)
|
|
elif isinstance(value, tuple) or isinstance(value, list):
|
|
value = decode_tuple(value)
|
|
elif isinstance(value, bytes):
|
|
value = decode_bytes(value)
|
|
res.append(value)
|
|
return res
|
|
|
|
def decode(d):
|
|
res = OrderedDict()
|
|
for key, value in d.items():
|
|
if isinstance(value, dict):
|
|
value = decode(value)
|
|
elif isinstance(value, (tuple, list)):
|
|
value = decode_tuple(value)
|
|
elif isinstance(value, bytes):
|
|
value = decode_bytes(value)
|
|
elif isinstance(value, Timestamp):
|
|
value = value.to_unix_nano()
|
|
if isinstance(key, bytes):
|
|
key = key.decode()
|
|
res[key] = value
|
|
return res
|
|
|
|
return decode(d)
|
|
|
|
|
|
class Highlander(argparse.Action):
|
|
"""make sure some option is only given once"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.__called = False
|
|
super().__init__(*args, **kwargs)
|
|
|
|
def __call__(self, parser, namespace, values, option_string=None):
|
|
if self.__called:
|
|
raise argparse.ArgumentError(self, "There can be only one.")
|
|
self.__called = True
|
|
setattr(namespace, self.dest, values)
|
|
|
|
|
|
class MakePathSafeAction(Highlander):
|
|
def __call__(self, parser, namespace, path, option_string=None):
|
|
try:
|
|
sanitized_path = make_path_safe(path)
|
|
except ValueError as e:
|
|
raise argparse.ArgumentError(self, e)
|
|
if sanitized_path == ".":
|
|
raise argparse.ArgumentError(self, f"{path!r} is not a valid file name")
|
|
setattr(namespace, self.dest, sanitized_path)
|