From 55d049c74cd00495ee2ca2683714b3f42229f0b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Borgstr=C3=B6m?= Date: Sat, 10 Sep 2011 17:19:02 +0200 Subject: [PATCH] Added support for archive checkpoints --- darc/archive.py | 45 +++++++++++++++++++++++++++++++++++---------- darc/archiver.py | 11 ++++++----- darc/test.py | 3 ++- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/darc/archive.py b/darc/archive.py index d4c6c0e86..13fe4f26a 100644 --- a/darc/archive.py +++ b/darc/archive.py @@ -6,6 +6,7 @@ import os import socket import stat import sys +import time from cStringIO import StringIO from xattr import xattr, XATTR_NOFOLLOW @@ -26,7 +27,11 @@ class Archive(object): class DoesNotExist(Exception): pass - def __init__(self, store, key, manifest, name=None, cache=None): + class AlreadyExists(Exception): + pass + + + def __init__(self, store, key, manifest, name, cache=None, create=False, checkpoint_interval=300): self.key = key self.store = store self.cache = cache @@ -35,11 +40,23 @@ class Archive(object): self.items_ids = [] self.hard_links = {} self.stats = Statistics() - if name: + self.name = name + self.checkpoint_interval = checkpoint_interval + if create: + if name in manifest.archives: + raise self.AlreadyExists + self.last_checkpoint = time.time() + i = 0 + while True: + self.checkpoint_name = '%s.checkpoint%s' % (name, i and ('.%d' % i) or '') + if not self.checkpoint_name in manifest.archives: + break + i += 1 + else: try: info = self.manifest.archives[name] except KeyError: - raise Archive.DoesNotExist + raise self.DoesNotExist self.load(info['id']) def load(self, id): @@ -79,6 +96,10 @@ class Archive(object): def add_item(self, item): self.items.write(msgpack.packb(item)) + now = time.time() + if now - self.last_checkpoint > self.checkpoint_interval: + self.last_checkpoint = now + self.write_checkpoint() if self.items.tell() > ITEMS_BUFFER: self.flush_items() @@ -98,9 +119,15 @@ class Archive(object): else: self.items.write(chunks[-1]) - def save(self, name, cache): + def write_checkpoint(self): + self.save(self.checkpoint_name) + del self.manifest.archives[self.checkpoint_name] + self.cache.chunk_decref(self.id) + + def save(self, name=None): + name = name or self.name if name in self.manifest.archives: - raise ValueError('Archive %s already exists' % name) + raise self.AlreadyExists(name) self.flush_items(flush=True) metadata = { 'version': 1, @@ -113,11 +140,11 @@ class Archive(object): } data = msgpack.packb(metadata) self.id = self.key.id_hash(data) - cache.add_chunk(self.id, data, self.stats) + self.cache.add_chunk(self.id, data, self.stats) self.manifest.archives[name] = {'id': self.id, 'time': metadata['time']} self.manifest.write() self.store.commit() - cache.commit() + self.cache.commit() def calc_stats(self, cache): # This function is a bit evil since it abuses the cache to calculate @@ -347,7 +374,5 @@ class Archive(object): @staticmethod def list_archives(store, key, manifest, cache=None): for name, info in manifest.archives.items(): - archive = Archive(store, key, manifest, cache=cache) - archive.load(info['id']) - yield archive + yield Archive(store, key, manifest, name, cache=cache) diff --git a/darc/archiver.py b/darc/archiver.py index 024fde574..c62923965 100644 --- a/darc/archiver.py +++ b/darc/archiver.py @@ -56,11 +56,9 @@ class Archiver(object): store = self.open_store(args.archive) key = Key(store) manifest = Manifest(store, key) - if args.archive.archive in manifest.archives: - self.print_error('Archive already exists') - return self.exit_code cache = Cache(store, key, manifest) - archive = Archive(store, key, manifest, cache=cache) + archive = Archive(store, key, manifest, args.archive.archive, cache=cache, + create=True, checkpoint_interval=args.checkpoint_interval) # Add darc cache dir to inode_skip list skip_inodes = set() try: @@ -77,7 +75,7 @@ class Archiver(object): pass for path in args.paths: self._process(archive, cache, args.patterns, skip_inodes, path) - archive.save(args.archive.archive, cache) + archive.save() if args.stats: t = datetime.now() diff = t - t0 @@ -310,6 +308,9 @@ class Archiver(object): subparser.add_argument('-e', '--exclude', dest='patterns', type=ExcludePattern, action='append', help='Include condition') + subparser.add_argument('-c', '--checkpoint-interval', dest='checkpoint_interval', + type=int, default=300, metavar='SECONDS', + help='Write checkpointe ever SECONDS seconds (Default: 300)') subparser.add_argument('archive', metavar='ARCHIVE', type=location_validator(archive=True), help='Archive to create') diff --git a/darc/test.py b/darc/test.py index 7632a370d..38ce48018 100644 --- a/darc/test.py +++ b/darc/test.py @@ -115,7 +115,8 @@ class Test(unittest.TestCase): def test_corrupted_store(self): self.create_src_archive('test') self.darc('verify', self.store_path + '::test') - fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', '2'), 'r+') + name = sorted(os.listdir(os.path.join(self.tmpdir, 'store', 'data', '0')), reverse=True)[0] + fd = open(os.path.join(self.tmpdir, 'store', 'data', '0', name), 'r+') fd.seek(100) fd.write('X') fd.close()