From 06eab6a22871c17605b4ed37904299979f3e9268 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Thu, 8 Sep 2022 19:38:18 +0200 Subject: [PATCH] RepositoryCache: cache complete and meta-only chunks separately --- src/borg/remote.py | 28 +++++++++------ src/borg/testsuite/remote.py | 68 ++++++++++++++++++++++++++++-------- 2 files changed, 71 insertions(+), 25 deletions(-) diff --git a/src/borg/remote.py b/src/borg/remote.py index b21e37495..de0002ba2 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -1191,6 +1191,12 @@ def query_size_limit(self): available_space = shutil.disk_usage(self.basedir).free self.size_limit = int(min(available_space * 0.25, 2**31)) + def prefixed_key(self, key, complete): + # just prefix another byte telling whether this key refers to a complete chunk + # or a without-data-metadata-only chunk (see also read_data param). + prefix = b"\x01" if complete else b"\x00" + return prefix + key + def key_filename(self, key): return os.path.join(self.basedir, bin_to_hex(key)) @@ -1204,12 +1210,13 @@ def backoff(self): os.unlink(file) self.evictions += 1 - def add_entry(self, key, data, cache): + def add_entry(self, key, data, cache, complete): transformed = self.transform(key, data) if not cache: return transformed packed = self.pack(transformed) - file = self.key_filename(key) + pkey = self.prefixed_key(key, complete=complete) + file = self.key_filename(pkey) try: with open(file, "wb") as fd: fd.write(packed) @@ -1225,7 +1232,7 @@ def add_entry(self, key, data, cache): raise else: self.size += len(packed) - self.cache.add(key) + self.cache.add(pkey) if self.size > self.size_limit: self.backoff() return transformed @@ -1253,27 +1260,28 @@ def close(self): def get_many(self, keys, read_data=True, cache=True): # TODO: this currently always requests the full chunk from self.repository (read_data=True). # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. - unknown_keys = [key for key in keys if key not in self.cache] - repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=True)) + unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] + repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data)) for key in keys: - if key in self.cache: - file = self.key_filename(key) + pkey = self.prefixed_key(key, complete=read_data) + if pkey in self.cache: + file = self.key_filename(pkey) with open(file, "rb") as fd: self.hits += 1 yield self.unpack(fd.read()) else: for key_, data in repository_iterator: if key_ == key: - transformed = self.add_entry(key, data, cache) + transformed = self.add_entry(key, data, cache, complete=read_data) self.misses += 1 yield transformed break else: # slow path: eviction during this get_many removed this key from the cache t0 = time.perf_counter() - data = self.repository.get(key, read_data=True) + data = self.repository.get(key, read_data=read_data) self.slow_lat += time.perf_counter() - t0 - transformed = self.add_entry(key, data, cache) + transformed = self.add_entry(key, data, cache, complete=read_data) self.slow_misses += 1 yield transformed # Consume any pending requests diff --git a/src/borg/testsuite/remote.py b/src/borg/testsuite/remote.py index 35f2b6df3..b5cb2e642 100644 --- a/src/borg/testsuite/remote.py +++ b/src/borg/testsuite/remote.py @@ -12,6 +12,7 @@ from ..helpers import IntegrityError from ..repoobj import RepoObj from .hashindex import H +from .repository import fchunk, pdchunk from .key import TestKey @@ -74,9 +75,9 @@ class TestRepositoryCache: def repository(self, tmpdir): self.repository_location = os.path.join(str(tmpdir), "repository") with Repository(self.repository_location, exclusive=True, create=True) as repository: - repository.put(H(1), b"1234") - repository.put(H(2), b"5678") - repository.put(H(3), bytes(100)) + repository.put(H(1), fchunk(b"1234")) + repository.put(H(2), fchunk(b"5678")) + repository.put(H(3), fchunk(bytes(100))) yield repository @pytest.fixture @@ -85,19 +86,55 @@ def cache(self, repository): def test_simple(self, cache: RepositoryCache): # Single get()s are not cached, since they are used for unique objects like archives. - assert cache.get(H(1)) == b"1234" + assert pdchunk(cache.get(H(1))) == b"1234" assert cache.misses == 1 assert cache.hits == 0 - assert list(cache.get_many([H(1)])) == [b"1234"] + assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] assert cache.misses == 2 assert cache.hits == 0 - assert list(cache.get_many([H(1)])) == [b"1234"] + assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"] assert cache.misses == 2 assert cache.hits == 1 - assert cache.get(H(1)) == b"1234" + assert pdchunk(cache.get(H(1))) == b"1234" + assert cache.misses == 2 + assert cache.hits == 2 + + def test_meta(self, cache: RepositoryCache): + # same as test_simple, but not reading the chunk data (metadata only). + # Single get()s are not cached, since they are used for unique objects like archives. + assert pdchunk(cache.get(H(1), read_data=False)) == b"" + assert cache.misses == 1 + assert cache.hits == 0 + + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] + assert cache.misses == 2 + assert cache.hits == 0 + + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] + assert cache.misses == 2 + assert cache.hits == 1 + + assert pdchunk(cache.get(H(1), read_data=False)) == b"" + assert cache.misses == 2 + assert cache.hits == 2 + + def test_mixed(self, cache: RepositoryCache): + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] + assert cache.misses == 1 + assert cache.hits == 0 + + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] + assert cache.misses == 2 + assert cache.hits == 0 + + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""] + assert cache.misses == 2 + assert cache.hits == 1 + + assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"] assert cache.misses == 2 assert cache.hits == 2 @@ -105,11 +142,11 @@ def test_backoff(self, cache: RepositoryCache): def query_size_limit(): cache.size_limit = 0 - assert list(cache.get_many([H(1), H(2)])) == [b"1234", b"5678"] + assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"] assert cache.misses == 2 assert cache.evictions == 0 iterator = cache.get_many([H(1), H(3), H(2)]) - assert next(iterator) == b"1234" + assert pdchunk(next(iterator)) == b"1234" # Force cache to back off qsl = cache.query_size_limit @@ -120,11 +157,11 @@ def query_size_limit(): assert cache.evictions == 2 assert H(1) not in cache.cache assert H(2) not in cache.cache - assert next(iterator) == bytes(100) + assert pdchunk(next(iterator)) == bytes(100) assert cache.slow_misses == 0 # Since H(2) was in the cache when we called get_many(), but has # been evicted during iterating the generator, it will be a slow miss. - assert next(iterator) == b"5678" + assert pdchunk(next(iterator)) == b"5678" assert cache.slow_misses == 1 def test_enospc(self, cache: RepositoryCache): @@ -145,16 +182,16 @@ def truncate(self, n=None): pass iterator = cache.get_many([H(1), H(2), H(3)]) - assert next(iterator) == b"1234" + assert pdchunk(next(iterator)) == b"1234" with patch("builtins.open", enospc_open): - assert next(iterator) == b"5678" + assert pdchunk(next(iterator)) == b"5678" assert cache.enospc == 1 # We didn't patch query_size_limit which would set size_limit to some low # value, so nothing was actually evicted. assert cache.evictions == 0 - assert next(iterator) == bytes(100) + assert pdchunk(next(iterator)) == bytes(100) @pytest.fixture def key(self, repository, monkeypatch): @@ -193,7 +230,8 @@ def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3): iterator = decrypted_cache.get_many([H1, H2, H3]) assert next(iterator) == (4, b"1234") - with open(decrypted_cache.key_filename(H2), "a+b") as fd: + pkey = decrypted_cache.prefixed_key(H2, complete=True) + with open(decrypted_cache.key_filename(pkey), "a+b") as fd: fd.seek(-1, io.SEEK_END) corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little") fd.seek(-1, io.SEEK_END)