mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-26 01:37:20 +00:00
RepositoryCache: cache complete and meta-only chunks separately
This commit is contained in:
parent
106abbe4d9
commit
06eab6a228
2 changed files with 71 additions and 25 deletions
|
@ -1191,6 +1191,12 @@ def query_size_limit(self):
|
||||||
available_space = shutil.disk_usage(self.basedir).free
|
available_space = shutil.disk_usage(self.basedir).free
|
||||||
self.size_limit = int(min(available_space * 0.25, 2**31))
|
self.size_limit = int(min(available_space * 0.25, 2**31))
|
||||||
|
|
||||||
|
def prefixed_key(self, key, complete):
|
||||||
|
# just prefix another byte telling whether this key refers to a complete chunk
|
||||||
|
# or a without-data-metadata-only chunk (see also read_data param).
|
||||||
|
prefix = b"\x01" if complete else b"\x00"
|
||||||
|
return prefix + key
|
||||||
|
|
||||||
def key_filename(self, key):
|
def key_filename(self, key):
|
||||||
return os.path.join(self.basedir, bin_to_hex(key))
|
return os.path.join(self.basedir, bin_to_hex(key))
|
||||||
|
|
||||||
|
@ -1204,12 +1210,13 @@ def backoff(self):
|
||||||
os.unlink(file)
|
os.unlink(file)
|
||||||
self.evictions += 1
|
self.evictions += 1
|
||||||
|
|
||||||
def add_entry(self, key, data, cache):
|
def add_entry(self, key, data, cache, complete):
|
||||||
transformed = self.transform(key, data)
|
transformed = self.transform(key, data)
|
||||||
if not cache:
|
if not cache:
|
||||||
return transformed
|
return transformed
|
||||||
packed = self.pack(transformed)
|
packed = self.pack(transformed)
|
||||||
file = self.key_filename(key)
|
pkey = self.prefixed_key(key, complete=complete)
|
||||||
|
file = self.key_filename(pkey)
|
||||||
try:
|
try:
|
||||||
with open(file, "wb") as fd:
|
with open(file, "wb") as fd:
|
||||||
fd.write(packed)
|
fd.write(packed)
|
||||||
|
@ -1225,7 +1232,7 @@ def add_entry(self, key, data, cache):
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
self.size += len(packed)
|
self.size += len(packed)
|
||||||
self.cache.add(key)
|
self.cache.add(pkey)
|
||||||
if self.size > self.size_limit:
|
if self.size > self.size_limit:
|
||||||
self.backoff()
|
self.backoff()
|
||||||
return transformed
|
return transformed
|
||||||
|
@ -1253,27 +1260,28 @@ def close(self):
|
||||||
def get_many(self, keys, read_data=True, cache=True):
|
def get_many(self, keys, read_data=True, cache=True):
|
||||||
# TODO: this currently always requests the full chunk from self.repository (read_data=True).
|
# TODO: this currently always requests the full chunk from self.repository (read_data=True).
|
||||||
# It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
|
# It could use different cache keys depending on read_data and cache full vs. meta-only chunks.
|
||||||
unknown_keys = [key for key in keys if key not in self.cache]
|
unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache]
|
||||||
repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=True))
|
repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data))
|
||||||
for key in keys:
|
for key in keys:
|
||||||
if key in self.cache:
|
pkey = self.prefixed_key(key, complete=read_data)
|
||||||
file = self.key_filename(key)
|
if pkey in self.cache:
|
||||||
|
file = self.key_filename(pkey)
|
||||||
with open(file, "rb") as fd:
|
with open(file, "rb") as fd:
|
||||||
self.hits += 1
|
self.hits += 1
|
||||||
yield self.unpack(fd.read())
|
yield self.unpack(fd.read())
|
||||||
else:
|
else:
|
||||||
for key_, data in repository_iterator:
|
for key_, data in repository_iterator:
|
||||||
if key_ == key:
|
if key_ == key:
|
||||||
transformed = self.add_entry(key, data, cache)
|
transformed = self.add_entry(key, data, cache, complete=read_data)
|
||||||
self.misses += 1
|
self.misses += 1
|
||||||
yield transformed
|
yield transformed
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
# slow path: eviction during this get_many removed this key from the cache
|
# slow path: eviction during this get_many removed this key from the cache
|
||||||
t0 = time.perf_counter()
|
t0 = time.perf_counter()
|
||||||
data = self.repository.get(key, read_data=True)
|
data = self.repository.get(key, read_data=read_data)
|
||||||
self.slow_lat += time.perf_counter() - t0
|
self.slow_lat += time.perf_counter() - t0
|
||||||
transformed = self.add_entry(key, data, cache)
|
transformed = self.add_entry(key, data, cache, complete=read_data)
|
||||||
self.slow_misses += 1
|
self.slow_misses += 1
|
||||||
yield transformed
|
yield transformed
|
||||||
# Consume any pending requests
|
# Consume any pending requests
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
from ..helpers import IntegrityError
|
from ..helpers import IntegrityError
|
||||||
from ..repoobj import RepoObj
|
from ..repoobj import RepoObj
|
||||||
from .hashindex import H
|
from .hashindex import H
|
||||||
|
from .repository import fchunk, pdchunk
|
||||||
from .key import TestKey
|
from .key import TestKey
|
||||||
|
|
||||||
|
|
||||||
|
@ -74,9 +75,9 @@ class TestRepositoryCache:
|
||||||
def repository(self, tmpdir):
|
def repository(self, tmpdir):
|
||||||
self.repository_location = os.path.join(str(tmpdir), "repository")
|
self.repository_location = os.path.join(str(tmpdir), "repository")
|
||||||
with Repository(self.repository_location, exclusive=True, create=True) as repository:
|
with Repository(self.repository_location, exclusive=True, create=True) as repository:
|
||||||
repository.put(H(1), b"1234")
|
repository.put(H(1), fchunk(b"1234"))
|
||||||
repository.put(H(2), b"5678")
|
repository.put(H(2), fchunk(b"5678"))
|
||||||
repository.put(H(3), bytes(100))
|
repository.put(H(3), fchunk(bytes(100)))
|
||||||
yield repository
|
yield repository
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -85,19 +86,55 @@ def cache(self, repository):
|
||||||
|
|
||||||
def test_simple(self, cache: RepositoryCache):
|
def test_simple(self, cache: RepositoryCache):
|
||||||
# Single get()s are not cached, since they are used for unique objects like archives.
|
# Single get()s are not cached, since they are used for unique objects like archives.
|
||||||
assert cache.get(H(1)) == b"1234"
|
assert pdchunk(cache.get(H(1))) == b"1234"
|
||||||
assert cache.misses == 1
|
assert cache.misses == 1
|
||||||
assert cache.hits == 0
|
assert cache.hits == 0
|
||||||
|
|
||||||
assert list(cache.get_many([H(1)])) == [b"1234"]
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
|
||||||
assert cache.misses == 2
|
assert cache.misses == 2
|
||||||
assert cache.hits == 0
|
assert cache.hits == 0
|
||||||
|
|
||||||
assert list(cache.get_many([H(1)])) == [b"1234"]
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)])] == [b"1234"]
|
||||||
assert cache.misses == 2
|
assert cache.misses == 2
|
||||||
assert cache.hits == 1
|
assert cache.hits == 1
|
||||||
|
|
||||||
assert cache.get(H(1)) == b"1234"
|
assert pdchunk(cache.get(H(1))) == b"1234"
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 2
|
||||||
|
|
||||||
|
def test_meta(self, cache: RepositoryCache):
|
||||||
|
# same as test_simple, but not reading the chunk data (metadata only).
|
||||||
|
# Single get()s are not cached, since they are used for unique objects like archives.
|
||||||
|
assert pdchunk(cache.get(H(1), read_data=False)) == b""
|
||||||
|
assert cache.misses == 1
|
||||||
|
assert cache.hits == 0
|
||||||
|
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 0
|
||||||
|
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 1
|
||||||
|
|
||||||
|
assert pdchunk(cache.get(H(1), read_data=False)) == b""
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 2
|
||||||
|
|
||||||
|
def test_mixed(self, cache: RepositoryCache):
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||||
|
assert cache.misses == 1
|
||||||
|
assert cache.hits == 0
|
||||||
|
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 0
|
||||||
|
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=False)] == [b""]
|
||||||
|
assert cache.misses == 2
|
||||||
|
assert cache.hits == 1
|
||||||
|
|
||||||
|
assert [pdchunk(ch) for ch in cache.get_many([H(1)], read_data=True)] == [b"1234"]
|
||||||
assert cache.misses == 2
|
assert cache.misses == 2
|
||||||
assert cache.hits == 2
|
assert cache.hits == 2
|
||||||
|
|
||||||
|
@ -105,11 +142,11 @@ def test_backoff(self, cache: RepositoryCache):
|
||||||
def query_size_limit():
|
def query_size_limit():
|
||||||
cache.size_limit = 0
|
cache.size_limit = 0
|
||||||
|
|
||||||
assert list(cache.get_many([H(1), H(2)])) == [b"1234", b"5678"]
|
assert [pdchunk(ch) for ch in cache.get_many([H(1), H(2)])] == [b"1234", b"5678"]
|
||||||
assert cache.misses == 2
|
assert cache.misses == 2
|
||||||
assert cache.evictions == 0
|
assert cache.evictions == 0
|
||||||
iterator = cache.get_many([H(1), H(3), H(2)])
|
iterator = cache.get_many([H(1), H(3), H(2)])
|
||||||
assert next(iterator) == b"1234"
|
assert pdchunk(next(iterator)) == b"1234"
|
||||||
|
|
||||||
# Force cache to back off
|
# Force cache to back off
|
||||||
qsl = cache.query_size_limit
|
qsl = cache.query_size_limit
|
||||||
|
@ -120,11 +157,11 @@ def query_size_limit():
|
||||||
assert cache.evictions == 2
|
assert cache.evictions == 2
|
||||||
assert H(1) not in cache.cache
|
assert H(1) not in cache.cache
|
||||||
assert H(2) not in cache.cache
|
assert H(2) not in cache.cache
|
||||||
assert next(iterator) == bytes(100)
|
assert pdchunk(next(iterator)) == bytes(100)
|
||||||
assert cache.slow_misses == 0
|
assert cache.slow_misses == 0
|
||||||
# Since H(2) was in the cache when we called get_many(), but has
|
# Since H(2) was in the cache when we called get_many(), but has
|
||||||
# been evicted during iterating the generator, it will be a slow miss.
|
# been evicted during iterating the generator, it will be a slow miss.
|
||||||
assert next(iterator) == b"5678"
|
assert pdchunk(next(iterator)) == b"5678"
|
||||||
assert cache.slow_misses == 1
|
assert cache.slow_misses == 1
|
||||||
|
|
||||||
def test_enospc(self, cache: RepositoryCache):
|
def test_enospc(self, cache: RepositoryCache):
|
||||||
|
@ -145,16 +182,16 @@ def truncate(self, n=None):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
iterator = cache.get_many([H(1), H(2), H(3)])
|
iterator = cache.get_many([H(1), H(2), H(3)])
|
||||||
assert next(iterator) == b"1234"
|
assert pdchunk(next(iterator)) == b"1234"
|
||||||
|
|
||||||
with patch("builtins.open", enospc_open):
|
with patch("builtins.open", enospc_open):
|
||||||
assert next(iterator) == b"5678"
|
assert pdchunk(next(iterator)) == b"5678"
|
||||||
assert cache.enospc == 1
|
assert cache.enospc == 1
|
||||||
# We didn't patch query_size_limit which would set size_limit to some low
|
# We didn't patch query_size_limit which would set size_limit to some low
|
||||||
# value, so nothing was actually evicted.
|
# value, so nothing was actually evicted.
|
||||||
assert cache.evictions == 0
|
assert cache.evictions == 0
|
||||||
|
|
||||||
assert next(iterator) == bytes(100)
|
assert pdchunk(next(iterator)) == bytes(100)
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def key(self, repository, monkeypatch):
|
def key(self, repository, monkeypatch):
|
||||||
|
@ -193,7 +230,8 @@ def test_cache_corruption(self, decrypted_cache: RepositoryCache, H1, H2, H3):
|
||||||
iterator = decrypted_cache.get_many([H1, H2, H3])
|
iterator = decrypted_cache.get_many([H1, H2, H3])
|
||||||
assert next(iterator) == (4, b"1234")
|
assert next(iterator) == (4, b"1234")
|
||||||
|
|
||||||
with open(decrypted_cache.key_filename(H2), "a+b") as fd:
|
pkey = decrypted_cache.prefixed_key(H2, complete=True)
|
||||||
|
with open(decrypted_cache.key_filename(pkey), "a+b") as fd:
|
||||||
fd.seek(-1, io.SEEK_END)
|
fd.seek(-1, io.SEEK_END)
|
||||||
corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little")
|
corrupted = (int.from_bytes(fd.read(), "little") ^ 2).to_bytes(1, "little")
|
||||||
fd.seek(-1, io.SEEK_END)
|
fd.seek(-1, io.SEEK_END)
|
||||||
|
|
Loading…
Reference in a new issue