From 0dfae37e04477a1ddd88f7da149171400c90fa57 Mon Sep 17 00:00:00 2001 From: Nikolaus Schulz Date: Fri, 21 Nov 2008 10:48:41 +0100 Subject: [PATCH] test suite: first shot at implementing maildir test cases --- test_archivemail.py | 390 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 387 insertions(+), 3 deletions(-) diff --git a/test_archivemail.py b/test_archivemail.py index c83d1bf..5186de0 100755 --- a/test_archivemail.py +++ b/test_archivemail.py @@ -60,6 +60,8 @@ import unittest import gzip import cStringIO import rfc822 +import errno +import mailbox try: import archivemail @@ -71,9 +73,183 @@ except ImportError: print "Try renaming it from 'archivemail' to 'archivemail.py'." sys.exit(1) +# We want to iterate over messages in a compressed archive mbox and verify +# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in +# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered +# by mailbox._PartialFile.seek(). The bug is still pending as of Python +# 2.5.2. To work around it, we subclass gzip.GzipFile. +# +# It should be noted that seeking backwards in a GzipFile is emulated by +# re-reading the entire file from the beginning, which is extremely +# inefficient and won't work with large files; but our test archives are all +# small, so it's okay. + +class FixedGzipFile(gzip.GzipFile): + """GzipFile with seek method accepting whence parameter.""" + def seek(self, offset, whence=0): + try: + gzip.GzipFile.seek(self, offset, whence) + except TypeError: + if whence: + if whence == 1: + offset = self.offset + offset + else: + raise ValueError('Seek from end not supported') + gzip.GzipFile.seek(self, offset) + # precision of os.utime() when restoring mbox timestamps utimes_precision = 5 +class MessageIdFactory: + """Factory to create `uniqe' message-ids.""" + def __init__(self): + self.seq = 0 + def __call__(self): + self.seq += 1 + return "" % self.seq + +make_msgid = MessageIdFactory() + +class IndexedMailboxDir: + """An indexed mailbox directory, providing random message access by + message-id. Base class for a maildir and an mh subclass.""" + + def __init__(self, mdir_name): + assert tempfile.tempdir + self.root = tempfile.mkdtemp(prefix=mdir_name) + self.msg_id_dict = {} + self.deliveries = 0 + + def _add_to_index(self, msg_text, fpath): + """Add the given message to the index, for later random access.""" + # Extract the message-id as index key + msg_id = None + fp = cStringIO.StringIO(msg_text) + while True: + line = fp.readline() + # line empty means we didn't find a message-id + assert line + if line.lower().startswith("message-id:"): + msg_id = line.split(":", 1)[-1].strip() + assert msg_id + break + assert not self.msg_id_dict.has_key(msg_id) + self.msg_id_dict[msg_id] = fpath + + def __len__(self): + """Return the number of messages in this folder.""" + return len(self.msg_id_dict) + + def get_all_filenames(self): + """Return all relative pathnames of files in this mailbox.""" + return self.msg_id_dict.values() + + def clear(self): + """Remove all messages in this mailbox.""" + for relpath in self.msg_id_dict.values(): + try: os.remove(os.path.join(self.root, relpath)) + except OSError, e: + if e.errno != errno.ENOENT: raise + self.msg_id_dict.clear() + +class SimpleMaildir(IndexedMailboxDir): + """Primitive Maildir class, just good enough for generating short-lived + test maildirs.""" + + def __init__(self, mdir_name='maildir'): + IndexedMailboxDir.__init__(self, mdir_name) + for d in "cur", "tmp", "new": + os.mkdir(os.path.join(self.root, d)) + + def write(self, msg_str, new=True, flags=[]): + """Store a message with the given flags.""" + assert not (new and flags) + if new: + subdir = "new" + else: + subdir = "cur" + fname = self._mkname(new, flags) + relpath = os.path.join(subdir, fname) + path = os.path.join(self.root, relpath) + assert not os.path.exists(path) + f = open(path, "w") + f.write(msg_str) + f.close() + self._add_to_index(msg_str, relpath) + + def remove(self): + """Remove all files and directories that comprise this mailbox.""" + self.clear() + for d in "cur", "new", "tmp": + os.rmdir(os.path.join(self.root, d)) + os.rmdir(self.root) + self.root = None + + def _mkname(self, new, flags): + """Generate a unique filename for a new message.""" + validflags = 'DFPRST' + for f in flags: + assert f in validflags + # This 'unique' name should be good enough, since nobody else + # will ever write messages to this maildir folder. + uniq = str(self.deliveries) + self.deliveries += 1 + if new: + return uniq + if not flags: + return uniq + ':2,' + finfo = "".join(sorted(flags)) + return uniq + ':2,' + finfo + + def get_message_and_mbox_status(self, msgid): + """For the Message-Id msgid, return the matching message in text + format and its status, expressed as a set of mbox flags.""" + fpath = self.msg_id_dict[msgid] # Barfs if not found + mdir_flags = fpath.rsplit('2,', 1)[-1] + flagmap = { + 'F': 'F', + 'R': 'A', + 'S': 'R' + } + mbox_flags = set([flagmap[x] for x in mdir_flags]) + if fpath.startswith("cur/"): + mbox_flags.add('O') + fp = open(os.path.join(self.root, fpath), "r") + msg = fp.read() + fp.close() + return msg, mbox_flags + + +class SimpleMHMailbox(IndexedMailboxDir): + """Primitive MH mailbox class, just good enough for generating short-lived + test mh mailboxes.""" + + def __init__(self, mdir_name='mh'): + IndexedMailboxDir.__init__(self, mdir_name) + + def write(self, msg_str): + self.deliveries += 1 + fname = str(self.deliveries) + path = os.path.join(self.root, fname) + assert not os.path.exists(fpath) + f = open(path, "w") + f.write(msg_str) + f.close() + self._add_to_index(msg_str, fname) + + def remove(self): + self.clear() + os.rmdir(self.root) + self.root = None + + def get_message(self, msgid): + """For the Message-Id msgid, return the matching message in text + format.""" + fpath = self.msg_id_dict[mid] # Barfs if not found + fp = open(os.path.join(self.root, fpath), "r") + msg_str = fp.read() + fp.close() + return msg_str class TestCaseInTempdir(unittest.TestCase): """Base class for testcases that need to create temporary files. @@ -980,12 +1156,212 @@ class TestArchiveSize(unittest.TestCase): archivemail.options.min_size = None +############# Test archiving maildirs ############### + +class TestArchiveMailboxdir(TestCaseInTempdir): + """Base class defining helper functions for doing test archive runs with + maildirs.""" + maildir = None # Maildir that will be processed by archivemail + orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object + remaining_msg = set() # Filenames of maildir messages that should be preserved + orig_archive = None # An uncompressed copy of a pre-existing archive, + # if one exists + + def setUp(self): + super(TestArchiveMailboxdir, self).setUp() + self.orig_maildir_obj = SimpleMaildir() + + def verify(self): + self._verify_remaining() + self._verify_archive() + + def _verify_remaining(self): + """Verify that the preserved messages weren't altered.""" + assert self.maildir + # Compare maildir with backup object. + dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root) + # Top-level has only directories cur, new, tmp and must be unchanged. + self.assertEqual(dcmp.left_list, dcmp.right_list) + found = set() + for d in dcmp.common_dirs: + dcmp2 = dcmp.subdirs[d] + # We need to verify three things. + # 1. directory is a subset of the original... + assert not dcmp2.left_only + # 2. all common files are identical... + self.assertEqual(dcmp2.common_files, dcmp2.same_files) + found = found.union([os.path.join(d, x) for x in dcmp2.common_files]) + # 3. exactly the `new' messages (recorded in self.remaining_msg) + # were preserved. + self.assertEqual(found, self.remaining_msg) + + def _verify_archive(self): + """Verify the archive correctness.""" + number_archived = len(self.orig_maildir_obj) - len(self.remaining_msg) + # TODO: currently make_archive_name does not include the .gz suffix. + # Is this something that should be fixed? + archive = archivemail.make_archive_name(self.maildir) + if archivemail.options.no_compress: + iszipped = False + else: + archive += '.gz' + iszipped = True + if number_archived == 0: + if self.orig_archive: + assertEqualContent(archive, self.orig_archive, iszipped) + else: + assert not os.path.exists(archive) + return + fp_new = fp_archive = tmp_archive_name = None + try: + if self.orig_archive: + new_size = os.path.getsize(archive) + # Brute force: split archive in old and new part and verify the + # parts separately. (Of course this destroys the archive.) + fp_archive = open(archive, "r+") + fp_archive.seek(self.orig_archive_size) + fd, tmp_archive_name = tempfile.mkstemp() + fp_new = os.fdopen(fd, "w") + shutil.copyfileobj(fp_archive, fp_new) + fp_new.close() + fp_archive.truncate(self.orig_archive_size) + fp_archive.close() + assertEqualContent(archive, self.orig_archive, iszipped) + new_archive = tmp_archive_name + else: + new_archive = archive + if archivemail.options.no_compress: + fp_archive = open(new_archive, "r") + else: + fp_archive = FixedGzipFile(new_archive, "r") + mb = mailbox.UnixMailbox(fp_archive) + found = 0 + for msg in mb: + self.verify_maildir_has_msg(self.orig_maildir_obj, msg) + found += 1 + self.assertEqual(found, number_archived) + finally: + if tmp_archive_name: + os.remove(tmp_archive_name) + if fp_new is not None: + fp_new.close() + if fp_archive is not None: + fp_archive.close() + + def verify_maildir_has_msg(self, maildir, msg): + """Assert that the given maildir has a copy of the rfc822 message.""" + mid = msg['Message-Id'] # Complains if there is no message-id + mdir_msg_str, mdir_flags = \ + maildir.get_message_and_mbox_status(mid) + mbox_flags = set(msg.get('status', '') + msg.get('x-status', '')) + self.assertEqual(mdir_flags, mbox_flags) + + headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'), + msg.headers) + headers = "".join(headers) + msg.rewindbody() + # Discard last mbox LF which is not part of the message. + body = msg.fp.read()[:-1] + msg_str = headers + os.linesep + body + self.assertEqual(mdir_msg_str, msg_str) + + def add_messages(self, body=None, headers=None, hours_old=0, messages=1): + for count in range(messages): + msg = make_message(body, default_headers=headers, mkfrom=False, + hours_old=hours_old) + self.orig_maildir_obj.write(msg, new=False) + + def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1, + make_old_archive=False): + if mknew: + self.add_messages(body, headers, 179*24, messages) + self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames()) + if mkold: + self.add_messages(body, headers, 181*24, messages) + self.maildir = copy_maildir(self.orig_maildir_obj.root) + if make_old_archive: + archive = archivemail.make_archive_name(self.maildir) + self.orig_archive = make_archive_and_plain_copy(archive) + # FIXME: .gz extension handling is a mess II + if not archivemail.options.no_compress: + archive += '.gz' + self.orig_archive_size = os.path.getsize(archive) + +class TestEmptyMaildir(TestCaseInTempdir): + def setUp(self): + super(TestEmptyMaildir, self).setUp() + archivemail.options.quiet = True + + def testEmpty(self): + """Archiving an empty maildir should not result in an archive.""" + self.mdir = SimpleMaildir() + archivemail.archive(self.mdir.root) + assert not os.path.exists(self.mdir.root + '_archive.gz') + + def tearDown(self): + super(TestEmptyMaildir, self).tearDown() + archivemail.options.quiet = False + +class TestMaildir(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildir, self).setUp() + archivemail.options.quiet = True + + def testOld(self): + self.make_maildir(True, False, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testNew(self): + self.make_maildir(False, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixed(self): + self.make_maildir(True, True, messages=3) + archivemail.archive(self.maildir) + self.verify() + + def testMixedExisting(self): + self.make_maildir(True, True, messages=3, make_old_archive=True) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + super(TestMaildir, self).tearDown() + + +class TestMaildirPreserveUnread(TestArchiveMailboxdir): + def setUp(self): + super(TestMaildirPreserveUnread, self).setUp() + archivemail.options.quiet = True + archivemail.options.preserve_unread = True + + def testOldRead(self): + """--preserve-unread archives all old read messages in a maildir.""" + # XXX + smd = self.orig_maildir_obj = SimpleMaildir("orig") + for count in range(3): + msg = make_message(hours_old=24*181) + smd.write(msg, new=False, flags='S') + self.maildir = copy_maildir(smd.root) + archivemail.archive(self.maildir) + self.verify() + + def tearDown(self): + archivemail.options.quiet = False + archivemail.options.preserve_unread = False + super(TestMaildirPreserveUnread, self).tearDown() + + ########## helper routines ############ -def make_message(body=None, default_headers={}, hours_old=None, wantobj=False): +def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False): headers = copy.copy(default_headers) if not headers: headers = {} + headers['Message-Id'] = make_msgid() if not headers.has_key('Date'): time_message = time.time() - (60 * 60 * hours_old) headers['Date'] = time.asctime(time.localtime(time_message)) @@ -995,7 +1371,7 @@ def make_message(body=None, default_headers={}, hours_old=None, wantobj=False): headers['To'] = "receipient@dummy.domain" if not headers.has_key('Subject'): headers['Subject'] = "This is the subject" - if not headers.has_key('From_'): + if mkfrom and not headers.has_key('From_'): headers['From_'] = "%s %s" % (headers['From'], headers['Date']) if not body: body = "This is the message body" @@ -1030,7 +1406,7 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1): file = os.fdopen(fd, "w") for count in range(messages): msg = make_message(body=body, default_headers=headers, - hours_old=hours_old) + mkfrom=True, hours_old=hours_old) file.write(msg) file.close() return name @@ -1059,6 +1435,14 @@ def make_archive_and_plain_copy(archive_name): rawfp.close() return copy_name +def copy_maildir(maildir, prefix="tmp"): + """Create a copy of the given maildir and return the absolute path of the + new direcory.""" + newdir = tempfile.mkdtemp(prefix=prefix) + for d in "cur", "new", "tmp": + shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d)) + return newdir + def assertEqualContent(firstfile, secondfile, zippedfirst=False): """Verify that the two files exist and have identical content. If zippedfirst is True, assume that firstfile is gzip-compressed."""