1
0
Fork 0
mirror of https://github.com/borgbackup/borg.git synced 2025-02-13 01:45:56 +00:00

manifest: reorder methods, no other changes

This commit is contained in:
Thomas Waldmann 2024-09-15 01:47:25 +02:00
parent 77fc3fb884
commit 426d1c7dda
No known key found for this signature in database
GPG key ID: 243ACFA951F78E01

View file

@ -101,30 +101,19 @@ def finish(self, manifest):
manifest_archives = StableDict(self._get_raw_dict())
return manifest_archives
def count(self):
# return the count of archives in the repo
return len(list(self.ids()))
def exists(self, name):
# check if an archive with this name exists
assert isinstance(name, str)
def ids(self):
# yield the binary IDs of all archives
if not self.legacy:
return name in self.names()
try:
infos = list(self.repository.store_list("archives"))
except ObjectNotFound:
infos = []
for info in infos:
info = ItemInfo(*info) # RPC does not give us a NamedTuple
yield hex_to_bin(info.name)
else:
return name in self._archives
def exists_name_and_id(self, name, id):
# check if an archive with this name AND id exists
assert isinstance(name, str)
assert isinstance(id, bytes)
if not self.legacy:
for archive_info in self._infos():
if archive_info["name"] == name and archive_info["id"] == id:
return True
else:
return False
else:
raise NotImplementedError
for archive_info in self._archives.values():
yield archive_info["id"]
def _get_archive_meta(self, id: bytes) -> dict:
# get all metadata directly from the ArchiveItem in the repo.
@ -163,20 +152,6 @@ def _get_archive_meta(self, id: bytes) -> dict:
)
return metadata
def ids(self):
# yield the binary IDs of all archives
if not self.legacy:
try:
infos = list(self.repository.store_list("archives"))
except ObjectNotFound:
infos = []
for info in infos:
info = ItemInfo(*info) # RPC does not give us a NamedTuple
yield hex_to_bin(info.name)
else:
for archive_info in self._archives.values():
yield archive_info["id"]
def _infos(self):
# yield the infos of all archives
for id in self.ids():
@ -186,6 +161,36 @@ def _info_tuples(self):
for info in self._infos():
yield ArchiveInfo(name=info["name"], id=info["id"], ts=parse_timestamp(info["time"]))
def count(self):
# return the count of archives in the repo
return len(list(self.ids()))
def names(self):
# yield the names of all archives
for archive_info in self._infos():
yield archive_info["name"]
def exists(self, name):
# check if an archive with this name exists
assert isinstance(name, str)
if not self.legacy:
return name in self.names()
else:
return name in self._archives
def exists_name_and_id(self, name, id):
# check if an archive with this name AND id exists
assert isinstance(name, str)
assert isinstance(id, bytes)
if not self.legacy:
for archive_info in self._infos():
if archive_info["name"] == name and archive_info["id"] == id:
return True
else:
return False
else:
raise NotImplementedError
def _lookup_name(self, name, raw=False):
assert isinstance(name, str)
assert not self.legacy
@ -199,11 +204,6 @@ def _lookup_name(self, name, raw=False):
else:
raise KeyError(name)
def names(self):
# yield the names of all archives
for archive_info in self._infos():
yield archive_info["name"]
def get(self, name, raw=False):
assert isinstance(name, str)
if not self.legacy: