test suite: first shot at implementing maildir test cases

This commit is contained in:
Nikolaus Schulz 2008-11-21 10:48:41 +01:00
parent 78c4c6e3da
commit 0dfae37e04
1 changed files with 387 additions and 3 deletions

View File

@ -60,6 +60,8 @@ import unittest
import gzip import gzip
import cStringIO import cStringIO
import rfc822 import rfc822
import errno
import mailbox
try: try:
import archivemail import archivemail
@ -71,9 +73,183 @@ except ImportError:
print "Try renaming it from 'archivemail' to 'archivemail.py'." print "Try renaming it from 'archivemail' to 'archivemail.py'."
sys.exit(1) sys.exit(1)
# We want to iterate over messages in a compressed archive mbox and verify
# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in
# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
# by mailbox._PartialFile.seek(). The bug is still pending as of Python
# 2.5.2. To work around it, we subclass gzip.GzipFile.
#
# It should be noted that seeking backwards in a GzipFile is emulated by
# re-reading the entire file from the beginning, which is extremely
# inefficient and won't work with large files; but our test archives are all
# small, so it's okay.
class FixedGzipFile(gzip.GzipFile):
"""GzipFile with seek method accepting whence parameter."""
def seek(self, offset, whence=0):
try:
gzip.GzipFile.seek(self, offset, whence)
except TypeError:
if whence:
if whence == 1:
offset = self.offset + offset
else:
raise ValueError('Seek from end not supported')
gzip.GzipFile.seek(self, offset)
# precision of os.utime() when restoring mbox timestamps # precision of os.utime() when restoring mbox timestamps
utimes_precision = 5 utimes_precision = 5
class MessageIdFactory:
"""Factory to create `uniqe' message-ids."""
def __init__(self):
self.seq = 0
def __call__(self):
self.seq += 1
return "<archivemail%d@localhost>" % self.seq
make_msgid = MessageIdFactory()
class IndexedMailboxDir:
"""An indexed mailbox directory, providing random message access by
message-id. Base class for a maildir and an mh subclass."""
def __init__(self, mdir_name):
assert tempfile.tempdir
self.root = tempfile.mkdtemp(prefix=mdir_name)
self.msg_id_dict = {}
self.deliveries = 0
def _add_to_index(self, msg_text, fpath):
"""Add the given message to the index, for later random access."""
# Extract the message-id as index key
msg_id = None
fp = cStringIO.StringIO(msg_text)
while True:
line = fp.readline()
# line empty means we didn't find a message-id
assert line
if line.lower().startswith("message-id:"):
msg_id = line.split(":", 1)[-1].strip()
assert msg_id
break
assert not self.msg_id_dict.has_key(msg_id)
self.msg_id_dict[msg_id] = fpath
def __len__(self):
"""Return the number of messages in this folder."""
return len(self.msg_id_dict)
def get_all_filenames(self):
"""Return all relative pathnames of files in this mailbox."""
return self.msg_id_dict.values()
def clear(self):
"""Remove all messages in this mailbox."""
for relpath in self.msg_id_dict.values():
try: os.remove(os.path.join(self.root, relpath))
except OSError, e:
if e.errno != errno.ENOENT: raise
self.msg_id_dict.clear()
class SimpleMaildir(IndexedMailboxDir):
"""Primitive Maildir class, just good enough for generating short-lived
test maildirs."""
def __init__(self, mdir_name='maildir'):
IndexedMailboxDir.__init__(self, mdir_name)
for d in "cur", "tmp", "new":
os.mkdir(os.path.join(self.root, d))
def write(self, msg_str, new=True, flags=[]):
"""Store a message with the given flags."""
assert not (new and flags)
if new:
subdir = "new"
else:
subdir = "cur"
fname = self._mkname(new, flags)
relpath = os.path.join(subdir, fname)
path = os.path.join(self.root, relpath)
assert not os.path.exists(path)
f = open(path, "w")
f.write(msg_str)
f.close()
self._add_to_index(msg_str, relpath)
def remove(self):
"""Remove all files and directories that comprise this mailbox."""
self.clear()
for d in "cur", "new", "tmp":
os.rmdir(os.path.join(self.root, d))
os.rmdir(self.root)
self.root = None
def _mkname(self, new, flags):
"""Generate a unique filename for a new message."""
validflags = 'DFPRST'
for f in flags:
assert f in validflags
# This 'unique' name should be good enough, since nobody else
# will ever write messages to this maildir folder.
uniq = str(self.deliveries)
self.deliveries += 1
if new:
return uniq
if not flags:
return uniq + ':2,'
finfo = "".join(sorted(flags))
return uniq + ':2,' + finfo
def get_message_and_mbox_status(self, msgid):
"""For the Message-Id msgid, return the matching message in text
format and its status, expressed as a set of mbox flags."""
fpath = self.msg_id_dict[msgid] # Barfs if not found
mdir_flags = fpath.rsplit('2,', 1)[-1]
flagmap = {
'F': 'F',
'R': 'A',
'S': 'R'
}
mbox_flags = set([flagmap[x] for x in mdir_flags])
if fpath.startswith("cur/"):
mbox_flags.add('O')
fp = open(os.path.join(self.root, fpath), "r")
msg = fp.read()
fp.close()
return msg, mbox_flags
class SimpleMHMailbox(IndexedMailboxDir):
"""Primitive MH mailbox class, just good enough for generating short-lived
test mh mailboxes."""
def __init__(self, mdir_name='mh'):
IndexedMailboxDir.__init__(self, mdir_name)
def write(self, msg_str):
self.deliveries += 1
fname = str(self.deliveries)
path = os.path.join(self.root, fname)
assert not os.path.exists(fpath)
f = open(path, "w")
f.write(msg_str)
f.close()
self._add_to_index(msg_str, fname)
def remove(self):
self.clear()
os.rmdir(self.root)
self.root = None
def get_message(self, msgid):
"""For the Message-Id msgid, return the matching message in text
format."""
fpath = self.msg_id_dict[mid] # Barfs if not found
fp = open(os.path.join(self.root, fpath), "r")
msg_str = fp.read()
fp.close()
return msg_str
class TestCaseInTempdir(unittest.TestCase): class TestCaseInTempdir(unittest.TestCase):
"""Base class for testcases that need to create temporary files. """Base class for testcases that need to create temporary files.
@ -980,12 +1156,212 @@ class TestArchiveSize(unittest.TestCase):
archivemail.options.min_size = None archivemail.options.min_size = None
############# Test archiving maildirs ###############
class TestArchiveMailboxdir(TestCaseInTempdir):
"""Base class defining helper functions for doing test archive runs with
maildirs."""
maildir = None # Maildir that will be processed by archivemail
orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object
remaining_msg = set() # Filenames of maildir messages that should be preserved
orig_archive = None # An uncompressed copy of a pre-existing archive,
# if one exists
def setUp(self):
super(TestArchiveMailboxdir, self).setUp()
self.orig_maildir_obj = SimpleMaildir()
def verify(self):
self._verify_remaining()
self._verify_archive()
def _verify_remaining(self):
"""Verify that the preserved messages weren't altered."""
assert self.maildir
# Compare maildir with backup object.
dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
# Top-level has only directories cur, new, tmp and must be unchanged.
self.assertEqual(dcmp.left_list, dcmp.right_list)
found = set()
for d in dcmp.common_dirs:
dcmp2 = dcmp.subdirs[d]
# We need to verify three things.
# 1. directory is a subset of the original...
assert not dcmp2.left_only
# 2. all common files are identical...
self.assertEqual(dcmp2.common_files, dcmp2.same_files)
found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
# 3. exactly the `new' messages (recorded in self.remaining_msg)
# were preserved.
self.assertEqual(found, self.remaining_msg)
def _verify_archive(self):
"""Verify the archive correctness."""
number_archived = len(self.orig_maildir_obj) - len(self.remaining_msg)
# TODO: currently make_archive_name does not include the .gz suffix.
# Is this something that should be fixed?
archive = archivemail.make_archive_name(self.maildir)
if archivemail.options.no_compress:
iszipped = False
else:
archive += '.gz'
iszipped = True
if number_archived == 0:
if self.orig_archive:
assertEqualContent(archive, self.orig_archive, iszipped)
else:
assert not os.path.exists(archive)
return
fp_new = fp_archive = tmp_archive_name = None
try:
if self.orig_archive:
new_size = os.path.getsize(archive)
# Brute force: split archive in old and new part and verify the
# parts separately. (Of course this destroys the archive.)
fp_archive = open(archive, "r+")
fp_archive.seek(self.orig_archive_size)
fd, tmp_archive_name = tempfile.mkstemp()
fp_new = os.fdopen(fd, "w")
shutil.copyfileobj(fp_archive, fp_new)
fp_new.close()
fp_archive.truncate(self.orig_archive_size)
fp_archive.close()
assertEqualContent(archive, self.orig_archive, iszipped)
new_archive = tmp_archive_name
else:
new_archive = archive
if archivemail.options.no_compress:
fp_archive = open(new_archive, "r")
else:
fp_archive = FixedGzipFile(new_archive, "r")
mb = mailbox.UnixMailbox(fp_archive)
found = 0
for msg in mb:
self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
found += 1
self.assertEqual(found, number_archived)
finally:
if tmp_archive_name:
os.remove(tmp_archive_name)
if fp_new is not None:
fp_new.close()
if fp_archive is not None:
fp_archive.close()
def verify_maildir_has_msg(self, maildir, msg):
"""Assert that the given maildir has a copy of the rfc822 message."""
mid = msg['Message-Id'] # Complains if there is no message-id
mdir_msg_str, mdir_flags = \
maildir.get_message_and_mbox_status(mid)
mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
self.assertEqual(mdir_flags, mbox_flags)
headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
msg.headers)
headers = "".join(headers)
msg.rewindbody()
# Discard last mbox LF which is not part of the message.
body = msg.fp.read()[:-1]
msg_str = headers + os.linesep + body
self.assertEqual(mdir_msg_str, msg_str)
def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
for count in range(messages):
msg = make_message(body, default_headers=headers, mkfrom=False,
hours_old=hours_old)
self.orig_maildir_obj.write(msg, new=False)
def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
make_old_archive=False):
if mknew:
self.add_messages(body, headers, 179*24, messages)
self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
if mkold:
self.add_messages(body, headers, 181*24, messages)
self.maildir = copy_maildir(self.orig_maildir_obj.root)
if make_old_archive:
archive = archivemail.make_archive_name(self.maildir)
self.orig_archive = make_archive_and_plain_copy(archive)
# FIXME: .gz extension handling is a mess II
if not archivemail.options.no_compress:
archive += '.gz'
self.orig_archive_size = os.path.getsize(archive)
class TestEmptyMaildir(TestCaseInTempdir):
def setUp(self):
super(TestEmptyMaildir, self).setUp()
archivemail.options.quiet = True
def testEmpty(self):
"""Archiving an empty maildir should not result in an archive."""
self.mdir = SimpleMaildir()
archivemail.archive(self.mdir.root)
assert not os.path.exists(self.mdir.root + '_archive.gz')
def tearDown(self):
super(TestEmptyMaildir, self).tearDown()
archivemail.options.quiet = False
class TestMaildir(TestArchiveMailboxdir):
def setUp(self):
super(TestMaildir, self).setUp()
archivemail.options.quiet = True
def testOld(self):
self.make_maildir(True, False, messages=3)
archivemail.archive(self.maildir)
self.verify()
def testNew(self):
self.make_maildir(False, True, messages=3)
archivemail.archive(self.maildir)
self.verify()
def testMixed(self):
self.make_maildir(True, True, messages=3)
archivemail.archive(self.maildir)
self.verify()
def testMixedExisting(self):
self.make_maildir(True, True, messages=3, make_old_archive=True)
archivemail.archive(self.maildir)
self.verify()
def tearDown(self):
archivemail.options.quiet = False
super(TestMaildir, self).tearDown()
class TestMaildirPreserveUnread(TestArchiveMailboxdir):
def setUp(self):
super(TestMaildirPreserveUnread, self).setUp()
archivemail.options.quiet = True
archivemail.options.preserve_unread = True
def testOldRead(self):
"""--preserve-unread archives all old read messages in a maildir."""
# XXX
smd = self.orig_maildir_obj = SimpleMaildir("orig")
for count in range(3):
msg = make_message(hours_old=24*181)
smd.write(msg, new=False, flags='S')
self.maildir = copy_maildir(smd.root)
archivemail.archive(self.maildir)
self.verify()
def tearDown(self):
archivemail.options.quiet = False
archivemail.options.preserve_unread = False
super(TestMaildirPreserveUnread, self).tearDown()
########## helper routines ############ ########## helper routines ############
def make_message(body=None, default_headers={}, hours_old=None, wantobj=False): def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
headers = copy.copy(default_headers) headers = copy.copy(default_headers)
if not headers: if not headers:
headers = {} headers = {}
headers['Message-Id'] = make_msgid()
if not headers.has_key('Date'): if not headers.has_key('Date'):
time_message = time.time() - (60 * 60 * hours_old) time_message = time.time() - (60 * 60 * hours_old)
headers['Date'] = time.asctime(time.localtime(time_message)) headers['Date'] = time.asctime(time.localtime(time_message))
@ -995,7 +1371,7 @@ def make_message(body=None, default_headers={}, hours_old=None, wantobj=False):
headers['To'] = "receipient@dummy.domain" headers['To'] = "receipient@dummy.domain"
if not headers.has_key('Subject'): if not headers.has_key('Subject'):
headers['Subject'] = "This is the subject" headers['Subject'] = "This is the subject"
if not headers.has_key('From_'): if mkfrom and not headers.has_key('From_'):
headers['From_'] = "%s %s" % (headers['From'], headers['Date']) headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
if not body: if not body:
body = "This is the message body" body = "This is the message body"
@ -1030,7 +1406,7 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1):
file = os.fdopen(fd, "w") file = os.fdopen(fd, "w")
for count in range(messages): for count in range(messages):
msg = make_message(body=body, default_headers=headers, msg = make_message(body=body, default_headers=headers,
hours_old=hours_old) mkfrom=True, hours_old=hours_old)
file.write(msg) file.write(msg)
file.close() file.close()
return name return name
@ -1059,6 +1435,14 @@ def make_archive_and_plain_copy(archive_name):
rawfp.close() rawfp.close()
return copy_name return copy_name
def copy_maildir(maildir, prefix="tmp"):
"""Create a copy of the given maildir and return the absolute path of the
new direcory."""
newdir = tempfile.mkdtemp(prefix=prefix)
for d in "cur", "new", "tmp":
shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
return newdir
def assertEqualContent(firstfile, secondfile, zippedfirst=False): def assertEqualContent(firstfile, secondfile, zippedfirst=False):
"""Verify that the two files exist and have identical content. If zippedfirst """Verify that the two files exist and have identical content. If zippedfirst
is True, assume that firstfile is gzip-compressed.""" is True, assume that firstfile is gzip-compressed."""