repo.list() yielding IDs in on-disk order

This commit is contained in:
Thomas Waldmann 2016-09-23 21:45:01 +02:00
parent 27bc73c23e
commit 90111363ba
3 changed files with 66 additions and 1 deletions

View File

@ -62,6 +62,7 @@ class RepositoryServer: # pragma: no cover
'destroy', 'destroy',
'get', 'get',
'list', 'list',
'scan',
'negotiate', 'negotiate',
'open', 'open',
'put', 'put',
@ -467,6 +468,9 @@ This problem will go away as soon as the server has been upgraded to 1.0.7+.
def list(self, limit=None, marker=None): def list(self, limit=None, marker=None):
return self.call('list', limit, marker) return self.call('list', limit, marker)
def scan(self, limit=None, marker=None):
return self.call('scan', limit, marker)
def get(self, id_): def get(self, id_):
for resp in self.get_many([id_]): for resp in self.get_many([id_]):
return resp return resp

View File

@ -2,7 +2,7 @@ import errno
import os import os
import shutil import shutil
import struct import struct
from binascii import unhexlify from binascii import hexlify, unhexlify
from collections import defaultdict from collections import defaultdict
from configparser import ConfigParser from configparser import ConfigParser
from datetime import datetime from datetime import datetime
@ -750,10 +750,53 @@ class Repository:
return id in self.index return id in self.index
def list(self, limit=None, marker=None): def list(self, limit=None, marker=None):
"""
list <limit> IDs starting from after id <marker> - in index (pseudo-random) order.
"""
if not self.index: if not self.index:
self.index = self.open_index(self.get_transaction_id()) self.index = self.open_index(self.get_transaction_id())
return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)] return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)]
def scan(self, limit=None, marker=None):
"""
list <limit> IDs starting from after id <marker> - in on-disk order, so that a client
fetching data in this order does linear reads and reuses stuff from disk cache.
We rely on repository.check() has run already (either now or some time before) and that:
- if we are called from a borg check command, self.index is a valid, fresh, in-sync repo index.
- if we are called from elsewhere, either self.index or the on-disk index is valid and in-sync.
- the repository segments are valid (no CRC errors).
if we encounter CRC errors in segment entry headers, rest of segment is skipped.
"""
if limit is not None and limit < 1:
raise ValueError('please use limit > 0 or limit = None')
if not self.index:
transaction_id = self.get_transaction_id()
self.index = self.open_index(transaction_id)
at_start = marker is None
# smallest valid seg is <uint32> 0, smallest valid offs is <uint32> 8
marker_segment, marker_offset = (0, 0) if at_start else self.index[marker]
result = []
for segment, filename in self.io.segment_iterator():
if segment < marker_segment:
continue
obj_iterator = self.io.iter_objects(segment, read_data=False, include_data=False)
while True:
try:
tag, id, offset, size = next(obj_iterator)
except (StopIteration, IntegrityError):
# either end-of-segment or an error - we can not seek to objects at
# higher offsets than one that has an error in the header fields
break
if segment == marker_segment and offset <= marker_offset:
continue
if tag == TAG_PUT and (segment, offset) == self.index.get(id):
# we have found an existing and current object
result.append(id)
if len(result) == limit:
return result
return result
def get(self, id_): def get(self, id_):
if not self.index: if not self.index:
self.index = self.open_index(self.get_transaction_id()) self.index = self.open_index(self.get_transaction_id())

View File

@ -133,6 +133,7 @@ class RepositoryTestCase(RepositoryTestCaseBase):
def test_list(self): def test_list(self):
for x in range(100): for x in range(100):
self.repository.put(H(x), b'SOMEDATA') self.repository.put(H(x), b'SOMEDATA')
self.repository.commit()
all = self.repository.list() all = self.repository.list()
self.assert_equal(len(all), 100) self.assert_equal(len(all), 100)
first_half = self.repository.list(limit=50) first_half = self.repository.list(limit=50)
@ -143,6 +144,23 @@ class RepositoryTestCase(RepositoryTestCaseBase):
self.assert_equal(second_half, all[50:]) self.assert_equal(second_half, all[50:])
self.assert_equal(len(self.repository.list(limit=50)), 50) self.assert_equal(len(self.repository.list(limit=50)), 50)
def test_scan(self):
for x in range(100):
self.repository.put(H(x), b'SOMEDATA')
self.repository.commit()
all = self.repository.scan()
assert len(all) == 100
first_half = self.repository.scan(limit=50)
assert len(first_half) == 50
assert first_half == all[:50]
second_half = self.repository.scan(marker=first_half[-1])
assert len(second_half) == 50
assert second_half == all[50:]
assert len(self.repository.scan(limit=50)) == 50
# check result order == on-disk order (which is hash order)
for x in range(100):
assert all[x] == H(x)
def test_max_data_size(self): def test_max_data_size(self):
max_data = b'x' * MAX_DATA_SIZE max_data = b'x' * MAX_DATA_SIZE
self.repository.put(H(0), max_data) self.repository.put(H(0), max_data)