Added support for archive checkpoints

This commit is contained in:
Jonas Borgström 2011-09-10 17:19:02 +02:00
parent f56b6bf4cc
commit 55d049c74c
3 changed files with 43 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import os
import socket
import stat
import sys
import time
from cStringIO import StringIO
from xattr import xattr, XATTR_NOFOLLOW
@ -26,7 +27,11 @@ class Archive(object):
class DoesNotExist(Exception):
pass
def __init__(self, store, key, manifest, name=None, cache=None):
class AlreadyExists(Exception):
pass
def __init__(self, store, key, manifest, name, cache=None, create=False, checkpoint_interval=300):
self.key = key
self.store = store
self.cache = cache
@ -35,11 +40,23 @@ class Archive(object):
self.items_ids = []
self.hard_links = {}
self.stats = Statistics()
if name:
self.name = name
self.checkpoint_interval = checkpoint_interval
if create:
if name in manifest.archives:
raise self.AlreadyExists
self.last_checkpoint = time.time()
i = 0
while True:
self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '')
if not self.checkpoint_name in manifest.archives:
break
i += 1
else:
try:
info = self.manifest.archives[name]
except KeyError:
raise Archive.DoesNotExist
raise self.DoesNotExist
self.load(info['id'])
def load(self, id):
@ -79,6 +96,10 @@ class Archive(object):
def add_item(self, item):
self.items.write(msgpack.packb(item))
now = time.time()
if now - self.last_checkpoint > self.checkpoint_interval:
self.last_checkpoint = now
self.write_checkpoint()
if self.items.tell() > ITEMS_BUFFER:
self.flush_items()
@ -98,9 +119,15 @@ class Archive(object):
else:
self.items.write(chunks[-1])
def save(self, name, cache):
def write_checkpoint(self):
self.save(self.checkpoint_name)
del self.manifest.archives[self.checkpoint_name]
self.cache.chunk_decref(self.id)
def save(self, name=None):
name = name or self.name
if name in self.manifest.archives:
raise ValueError('Archive %s already exists' % name)
raise self.AlreadyExists(name)
self.flush_items(flush=True)
metadata = {
'version': 1,
@ -113,11 +140,11 @@ class Archive(object):
}
data = msgpack.packb(metadata)
self.id = self.key.id_hash(data)
cache.add_chunk(self.id, data, self.stats)
self.cache.add_chunk(self.id, data, self.stats)
self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']}
self.manifest.write()
self.store.commit()
cache.commit()
self.cache.commit()
def calc_stats(self, cache):
# This function is a bit evil since it abuses the cache to calculate
@ -347,7 +374,5 @@ class Archive(object):
@staticmethod
def list_archives(store, key, manifest, cache=None):
for name, info in manifest.archives.items():
archive = Archive(store, key, manifest, cache=cache)
archive.load(info['id'])
yield archive
yield Archive(store, key, manifest, name, cache=cache)

View File

@ -56,11 +56,9 @@ class Archiver(object):
store = self.open_store(args.archive)
key = Key(store)
manifest = Manifest(store, key)
if args.archive.archive in manifest.archives:
self.print_error('Archive already exists')
return self.exit_code
cache = Cache(store, key, manifest)
archive = Archive(store, key, manifest, cache=cache)
archive = Archive(store, key, manifest, args.archive.archive, cache=cache,
create=True, checkpoint_interval=args.checkpoint_interval)
# Add darc cache dir to inode_skip list
skip_inodes = set()
try:
@ -77,7 +75,7 @@ class Archiver(object):
pass
for path in args.paths:
self._process(archive, cache, args.patterns, skip_inodes, path)
archive.save(args.archive.archive, cache)
archive.save()
if args.stats:
t = datetime.now()
diff = t - t0
@ -310,6 +308,9 @@ class Archiver(object):
subparser.add_argument('-e', '--exclude', dest='patterns',
type=ExcludePattern, action='append',
help='Include condition')
subparser.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval',
type=int, default=300, metavar='SECONDS',
help='Write checkpointe ever SECONDS seconds (Default: 300)')
subparser.add_argument('archive', metavar='ARCHIVE',
type=location_validator(archive=True),
help='Archive to create')

View File

@ -115,7 +115,8 @@ class Test(unittest.TestCase):
def test_corrupted_store(self):
self.create_src_archive('test')
self.darc('verify', self.store_path + '::test')
fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', '2'), 'r+')
name = sorted(os.listdir(os.path.join(self.tmpdir, 'store', 'data', '0')), reverse=True)[0]
fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', name), 'r+')
fd.seek(100)
fd.write('X')
fd.close()