mirror of
https://github.com/borgbackup/borg.git
synced 2024-12-25 17:27:31 +00:00
Fix issue with object overwrites and sequential transactions
This commit is contained in:
parent
c791bf574b
commit
58b736b580
1 changed files with 30 additions and 12 deletions
|
@ -69,16 +69,6 @@ def open(self, path):
|
|||
self.lock_fd = open(os.path.join(path, 'README'), 'r+')
|
||||
fcntl.flock(self.lock_fd, fcntl.LOCK_EX)
|
||||
self.rollback()
|
||||
self.config = RawConfigParser()
|
||||
self.config.read(os.path.join(path, 'config'))
|
||||
if self.config.getint('store', 'version') != 1:
|
||||
raise Exception('%s Does not look like a darc store')
|
||||
next_band = self.config.getint('store', 'next_band')
|
||||
max_band_size = self.config.getint('store', 'max_band_size')
|
||||
bands_per_dir = self.config.getint('store', 'bands_per_dir')
|
||||
self.meta = dict(self.config.items('meta'))
|
||||
self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
|
||||
self.io.cleanup()
|
||||
|
||||
def read_dict(self, filename):
|
||||
with open(filename, 'rb') as fd:
|
||||
|
@ -142,16 +132,16 @@ def compact_bands(self):
|
|||
return
|
||||
self.io.close_band()
|
||||
def lookup(key):
|
||||
return key in self.index
|
||||
return self.index.get(key, (-1, -1))[0] == band
|
||||
bands = self.bands
|
||||
for band in self.compact:
|
||||
if bands[band] > 0:
|
||||
for key, data in self.io.iter_objects(band, lookup):
|
||||
new_band, offset = self.io.write(key, data)
|
||||
self.index[key] = new_band, offset
|
||||
bands[band] -= 1
|
||||
bands.setdefault(new_band, 0)
|
||||
bands[new_band] += 1
|
||||
bands[band] -= 1
|
||||
self.write_dict(os.path.join(self.path, 'delete'), tuple(self.compact))
|
||||
|
||||
def rollback(self):
|
||||
|
@ -178,6 +168,16 @@ def rollback(self):
|
|||
shutil.rmtree(os.path.join(self.path, 'txn.tmp'))
|
||||
self.index = NSIndex(os.path.join(self.path, 'index'))
|
||||
self.bands = self.read_dict(os.path.join(self.path, 'band'))
|
||||
self.config = RawConfigParser()
|
||||
self.config.read(os.path.join(self.path, 'config'))
|
||||
if self.config.getint('store', 'version') != 1:
|
||||
raise Exception('%s Does not look like a darc store')
|
||||
next_band = self.config.getint('store', 'next_band')
|
||||
max_band_size = self.config.getint('store', 'max_band_size')
|
||||
bands_per_dir = self.config.getint('store', 'bands_per_dir')
|
||||
self.meta = dict(self.config.items('meta'))
|
||||
self.io = BandIO(self.path, next_band, max_band_size, bands_per_dir)
|
||||
self.io.cleanup()
|
||||
self.txn_active = False
|
||||
|
||||
@deferrable
|
||||
|
@ -192,6 +192,12 @@ def get(self, id):
|
|||
def put(self, id, data):
|
||||
if not self.txn_active:
|
||||
self.begin_txn()
|
||||
try:
|
||||
band, _ = self.index[id]
|
||||
self.bands[band] -= 1
|
||||
self.compact.add(band)
|
||||
except KeyError:
|
||||
pass
|
||||
band, offset = self.io.write(id, data)
|
||||
self.bands.setdefault(band, 0)
|
||||
self.bands[band] += 1
|
||||
|
@ -228,6 +234,7 @@ def __init__(self, path, nextband, limit, bands_per_dir, capacity=100):
|
|||
def close(self):
|
||||
for band in self.fds.keys():
|
||||
self.fds.pop(band).close()
|
||||
self.fds = None # Just to make sure we're disabled
|
||||
|
||||
def cleanup(self):
|
||||
"""Delete band files left by aborted transactions
|
||||
|
@ -336,6 +343,17 @@ def test1(self):
|
|||
self.store.close()
|
||||
store2 = Store(os.path.join(self.tmppath, 'store'))
|
||||
|
||||
def test2(self):
|
||||
"""Test multiple sequential transactions
|
||||
"""
|
||||
self.store.put('00000000000000000000000000000000', 'foo')
|
||||
self.store.put('00000000000000000000000000000001', 'foo')
|
||||
self.store.commit()
|
||||
self.store.delete('00000000000000000000000000000000')
|
||||
self.store.put('00000000000000000000000000000001', 'bar')
|
||||
self.store.commit()
|
||||
self.assertEqual(self.store.get('00000000000000000000000000000001'), 'bar')
|
||||
|
||||
|
||||
def suite():
|
||||
return unittest.TestLoader().loadTestsFromTestCase(StoreTestCase)
|
||||
|
|
Loading…
Reference in a new issue