1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-08 15:37:09 +00:00

implement --match-archives tags:TAG1,TAG2,...

also: reduce code duplication
This commit is contained in:
Thomas Waldmann 2024-09-26 22:36:20 +02:00
parent 2a01d29cda
commit d30fa6f28e
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01

View file

@ -33,7 +33,7 @@ class NoManifestError(Error):
exit_mcode = 26 exit_mcode = 26
ArchiveInfo = namedtuple("ArchiveInfo", "name id ts") ArchiveInfo = namedtuple("ArchiveInfo", "name id ts tags", defaults=[()])
# timestamp is a replacement for ts, archive is an alias for name (see SortBySpec) # timestamp is a replacement for ts, archive is an alias for name (see SortBySpec)
AI_HUMAN_SORT_KEYS = ["timestamp", "archive"] + list(ArchiveInfo._fields) AI_HUMAN_SORT_KEYS = ["timestamp", "archive"] + list(ArchiveInfo._fields)
@ -129,6 +129,7 @@ def _get_archive_meta(self, id: bytes) -> dict:
time="1970-01-01T00:00:00.000000", time="1970-01-01T00:00:00.000000",
# new: # new:
exists=False, # we have the pointer, but the repo does not have an archive item exists=False, # we have the pointer, but the repo does not have an archive item
tags=(),
) )
else: else:
_, data = self.manifest.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META) _, data = self.manifest.repo_objs.parse(id, cdata, ro_type=ROBJ_ARCHIVE_META)
@ -149,6 +150,7 @@ def _get_archive_meta(self, id: bytes) -> dict:
size=archive_item.size, size=archive_item.size,
nfiles=archive_item.nfiles, nfiles=archive_item.nfiles,
comment=archive_item.comment, # not always present? comment=archive_item.comment, # not always present?
tags=tuple(sorted(getattr(archive_item, "tags", []))), # must be hashable
) )
return metadata return metadata
@ -159,7 +161,26 @@ def _infos(self):
def _info_tuples(self): def _info_tuples(self):
for info in self._infos(): for info in self._infos():
yield ArchiveInfo(name=info["name"], id=info["id"], ts=parse_timestamp(info["time"])) yield ArchiveInfo(name=info["name"], id=info["id"], ts=parse_timestamp(info["time"]), tags=info["tags"])
def _matching_info_tuples(self, match, match_end):
archive_infos = self._info_tuples()
if match is None:
archive_infos = list(archive_infos)
elif match.startswith("aid:"): # do a match on the archive ID (prefix)
wanted_id = match.removeprefix("aid:")
archive_infos = [x for x in archive_infos if bin_to_hex(x.id).startswith(wanted_id)]
if len(archive_infos) != 1:
raise CommandError("archive ID based match needs to match precisely one archive ID")
elif match.startswith("tags:"):
wanted_tags = match.removeprefix("tags:")
wanted_tags = [tag for tag in wanted_tags.split(",") if tag] # remove empty tags
archive_infos = [x for x in archive_infos if set(x.tags) >= set(wanted_tags)]
else: # do a match on the name
regex = get_regex_from_pattern(match)
regex = re.compile(regex + match_end)
archive_infos = [x for x in archive_infos if regex.match(x.name) is not None]
return archive_infos
def count(self): def count(self):
# return the count of archives in the repo # return the count of archives in the repo
@ -211,7 +232,9 @@ def _lookup_name(self, name, raw=False):
if archive_info["exists"] and archive_info["name"] == name: if archive_info["exists"] and archive_info["name"] == name:
if not raw: if not raw:
ts = parse_timestamp(archive_info["time"]) ts = parse_timestamp(archive_info["time"])
return ArchiveInfo(name=archive_info["name"], id=archive_info["id"], ts=ts) return ArchiveInfo(
name=archive_info["name"], id=archive_info["id"], ts=ts, tags=archive_info["tags"]
)
else: else:
return archive_info return archive_info
else: else:
@ -243,7 +266,9 @@ def get_by_id(self, id, raw=False):
if archive_info["exists"]: if archive_info["exists"]:
if not raw: if not raw:
ts = parse_timestamp(archive_info["time"]) ts = parse_timestamp(archive_info["time"])
archive_info = ArchiveInfo(name=archive_info["name"], id=archive_info["id"], ts=ts) archive_info = ArchiveInfo(
name=archive_info["name"], id=archive_info["id"], ts=ts, tags=archive_info["tags"]
)
return archive_info return archive_info
else: else:
for name, values in self._archives.items(): for name, values in self._archives.items():
@ -311,18 +336,7 @@ def list(
if isinstance(sort_by, (str, bytes)): if isinstance(sort_by, (str, bytes)):
raise TypeError("sort_by must be a sequence of str") raise TypeError("sort_by must be a sequence of str")
archive_infos = self._info_tuples() archive_infos = self._matching_info_tuples(match, match_end)
if match is None:
archive_infos = list(archive_infos)
elif match.startswith("aid:"): # do a match on the archive ID (prefix)
wanted_id = match.removeprefix("aid:")
archive_infos = [x for x in archive_infos if bin_to_hex(x.id).startswith(wanted_id)]
if len(archive_infos) != 1:
raise CommandError("archive ID based match needs to match precisely one archive ID")
else: # do a match on the name
regex = get_regex_from_pattern(match)
regex = re.compile(regex + match_end)
archive_infos = [x for x in archive_infos if regex.match(x.name) is not None]
if any([oldest, newest, older, newer]): if any([oldest, newest, older, newer]):
archive_infos = filter_archives_by_date( archive_infos = filter_archives_by_date(
@ -361,14 +375,7 @@ def list_considering(self, args):
def get_one(self, match, *, match_end=r"\Z"): def get_one(self, match, *, match_end=r"\Z"):
"""get exactly one archive matching <match>""" """get exactly one archive matching <match>"""
assert match is not None assert match is not None
archive_infos = self._info_tuples() archive_infos = self._matching_info_tuples(match, match_end)
if match.startswith("aid:"): # do a match on the archive ID (prefix)
wanted_id = match.removeprefix("aid:")
archive_infos = [i for i in archive_infos if bin_to_hex(i.id).startswith(wanted_id)]
else: # do a match on the name
regex = get_regex_from_pattern(match)
regex = re.compile(regex + match_end)
archive_infos = [i for i in archive_infos if regex.match(i.name) is not None]
if len(archive_infos) != 1: if len(archive_infos) != 1:
raise CommandError(f"{match} needed to match precisely one archive, but matched {len(archive_infos)}.") raise CommandError(f"{match} needed to match precisely one archive, but matched {len(archive_infos)}.")
return archive_infos[0] return archive_infos[0]