mirror of
https://git.code.sf.net/p/archivemail/code
synced 2024-12-21 15:22:59 +00:00
e1a6028332
The test suite uses function that were introduced in Python 2.4 (e.g. set(), sorted()).
1771 lines
67 KiB
Python
Executable file
1771 lines
67 KiB
Python
Executable file
#! /usr/bin/env python
|
|
############################################################################
|
|
# Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
|
|
# (C) 2006-2011 Nikolaus Schulz <microschulz@web.de>
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program; if not, write to the Free Software
|
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
############################################################################
|
|
"""
|
|
Unit-test archivemail using 'PyUnit'.
|
|
|
|
TODO: add tests for:
|
|
* dotlock locks already existing
|
|
* archiving MH-format mailboxes
|
|
* a 3rd party process changing the mbox file being read
|
|
|
|
"""
|
|
|
|
import sys
|
|
|
|
def check_python_version():
|
|
"""Abort if we are running on python < v2.4"""
|
|
too_old_error = "This test script requires python version 2.4 or later. " + \
|
|
"Your version of python is:\n%s" % sys.version
|
|
try:
|
|
version = sys.version_info # we might not even have this function! :)
|
|
if (version[0] < 2) or (version[0] == 2 and version[1] < 4):
|
|
print too_old_error
|
|
sys.exit(1)
|
|
except AttributeError:
|
|
print too_old_error
|
|
sys.exit(1)
|
|
|
|
# define & run this early because 'unittest' requires Python >= 2.1
|
|
check_python_version()
|
|
|
|
import copy
|
|
import fcntl
|
|
import filecmp
|
|
import os
|
|
import re
|
|
import shutil
|
|
import stat
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
import gzip
|
|
import cStringIO
|
|
import rfc822
|
|
import mailbox
|
|
|
|
from types import ModuleType
|
|
archivemail = ModuleType("archivemail")
|
|
try:
|
|
module_fp = open("archivemail", "r")
|
|
except IOError:
|
|
print "The archivemail script should be in the current directory in order"
|
|
print "to be imported and tested. Sorry."
|
|
sys.exit(1)
|
|
exec module_fp in archivemail.__dict__
|
|
|
|
# We want to iterate over messages in a compressed archive mbox and verify
|
|
# them. This involves seeking in the mbox. The gzip.Gzipfile.seek() in
|
|
# Python 2.5 doesn't understand whence; this is Python bug #1355023, triggered
|
|
# by mailbox._PartialFile.seek(). The bug is still pending as of Python
|
|
# 2.5.2. To work around it, we subclass gzip.GzipFile.
|
|
#
|
|
# It should be noted that seeking backwards in a GzipFile is emulated by
|
|
# re-reading the entire file from the beginning, which is extremely
|
|
# inefficient and won't work with large files; but our test archives are all
|
|
# small, so it's okay.
|
|
|
|
class FixedGzipFile(gzip.GzipFile):
|
|
"""GzipFile with seek method accepting whence parameter."""
|
|
def seek(self, offset, whence=0):
|
|
try:
|
|
# Try calling gzip.GzipFile.seek with the whence parameter.
|
|
# For Python >= 2.7, it returns the new offset; pass that on.
|
|
return gzip.GzipFile.seek(self, offset, whence)
|
|
except TypeError:
|
|
if whence:
|
|
if whence == 1:
|
|
offset = self.offset + offset
|
|
else:
|
|
raise ValueError('Seek from end not supported')
|
|
return gzip.GzipFile.seek(self, offset)
|
|
|
|
# precision of os.utime() when restoring mbox timestamps
|
|
utimes_precision = 5
|
|
|
|
class MessageIdFactory:
|
|
"""Factory to create `uniqe' message-ids."""
|
|
def __init__(self):
|
|
self.seq = 0
|
|
def __call__(self):
|
|
self.seq += 1
|
|
return "<archivemail%d@localhost>" % self.seq
|
|
|
|
make_msgid = MessageIdFactory()
|
|
|
|
class IndexedMailboxDir:
|
|
"""An indexed mailbox directory, providing random message access by
|
|
message-id. Intended as a base class for a maildir and an mh subclass."""
|
|
|
|
def __init__(self, mdir_name):
|
|
assert tempfile.tempdir
|
|
self.root = tempfile.mkdtemp(prefix=mdir_name)
|
|
self.msg_id_dict = {}
|
|
self.deliveries = 0
|
|
|
|
def _add_to_index(self, msg_text, fpath):
|
|
"""Add the given message to the index, for later random access."""
|
|
# Extract the message-id as index key
|
|
msg_id = None
|
|
fp = cStringIO.StringIO(msg_text)
|
|
while True:
|
|
line = fp.readline()
|
|
# line empty means we didn't find a message-id
|
|
assert line
|
|
if line.lower().startswith("message-id:"):
|
|
msg_id = line.split(":", 1)[-1].strip()
|
|
assert msg_id
|
|
break
|
|
assert not self.msg_id_dict.has_key(msg_id)
|
|
self.msg_id_dict[msg_id] = fpath
|
|
|
|
def get_all_filenames(self):
|
|
"""Return all relative pathnames of files in this mailbox."""
|
|
return self.msg_id_dict.values()
|
|
|
|
class SimpleMaildir(IndexedMailboxDir):
|
|
"""Primitive Maildir class, just good enough for generating short-lived
|
|
test maildirs."""
|
|
|
|
def __init__(self, mdir_name='maildir'):
|
|
IndexedMailboxDir.__init__(self, mdir_name)
|
|
for d in "cur", "tmp", "new":
|
|
os.mkdir(os.path.join(self.root, d))
|
|
|
|
def write(self, msg_str, new=True, flags=[]):
|
|
"""Store a message with the given flags."""
|
|
assert not (new and flags)
|
|
if new:
|
|
subdir = "new"
|
|
else:
|
|
subdir = "cur"
|
|
fname = self._mkname(new, flags)
|
|
relpath = os.path.join(subdir, fname)
|
|
path = os.path.join(self.root, relpath)
|
|
assert not os.path.exists(path)
|
|
f = open(path, "w")
|
|
f.write(msg_str)
|
|
f.close()
|
|
self._add_to_index(msg_str, relpath)
|
|
|
|
def _mkname(self, new, flags):
|
|
"""Generate a unique filename for a new message."""
|
|
validflags = 'DFPRST'
|
|
for f in flags:
|
|
assert f in validflags
|
|
# This 'unique' name should be good enough, since nobody else
|
|
# will ever write messages to this maildir folder.
|
|
uniq = str(self.deliveries)
|
|
self.deliveries += 1
|
|
if new:
|
|
return uniq
|
|
if not flags:
|
|
return uniq + ':2,'
|
|
finfo = "".join(sorted(flags))
|
|
return uniq + ':2,' + finfo
|
|
|
|
def get_message_and_mbox_status(self, msgid):
|
|
"""For the Message-Id msgid, return the matching message in text
|
|
format and its status, expressed as a set of mbox flags."""
|
|
fpath = self.msg_id_dict[msgid] # Barfs if not found
|
|
mdir_flags = fpath.rsplit('2,', 1)[-1]
|
|
flagmap = {
|
|
'F': 'F',
|
|
'R': 'A',
|
|
'S': 'R'
|
|
}
|
|
mbox_flags = set([flagmap[x] for x in mdir_flags])
|
|
if fpath.startswith("cur/"):
|
|
mbox_flags.add('O')
|
|
fp = open(os.path.join(self.root, fpath), "r")
|
|
msg = fp.read()
|
|
fp.close()
|
|
return msg, mbox_flags
|
|
|
|
|
|
class TestCaseInTempdir(unittest.TestCase):
|
|
"""Base class for testcases that need to create temporary files.
|
|
All testcases that create temporary files should be derived from this
|
|
class, not directly from unittest.TestCase.
|
|
TestCaseInTempdir provides these methods:
|
|
|
|
setUp() Creates a safe temporary directory and sets tempfile.tempdir.
|
|
|
|
tearDown() Recursively removes the temporary directory and unsets
|
|
tempfile.tempdir.
|
|
|
|
Overriding methods should call the ones above."""
|
|
temproot = None
|
|
|
|
def setUp(self):
|
|
if not self.temproot:
|
|
assert not tempfile.tempdir
|
|
self.temproot = tempfile.tempdir = \
|
|
tempfile.mkdtemp(prefix="test-archivemail")
|
|
|
|
def tearDown(self):
|
|
assert tempfile.tempdir == self.temproot
|
|
if self.temproot:
|
|
shutil.rmtree(self.temproot)
|
|
tempfile.tempdir = self.temproot = None
|
|
|
|
|
|
############ Mbox Class testing ##############
|
|
|
|
class TestMboxDotlock(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestMboxDotlock, self).setUp()
|
|
self.mbox_name = make_mbox()
|
|
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
|
|
|
def testDotlock(self):
|
|
"""dotlock_lock/unlock should create/delete a lockfile"""
|
|
lock = self.mbox_name + ".lock"
|
|
self.mbox._dotlock_lock()
|
|
assert os.path.isfile(lock)
|
|
self.mbox._dotlock_unlock()
|
|
assert not os.path.isfile(lock)
|
|
|
|
def testDotlockingSucceedsUponEACCES(self):
|
|
"""A dotlock should silently be omitted upon EACCES."""
|
|
archivemail.options.quiet = True
|
|
mbox_dir = os.path.dirname(self.mbox_name)
|
|
os.chmod(mbox_dir, 0500)
|
|
try:
|
|
self.mbox._dotlock_lock()
|
|
self.mbox._dotlock_unlock()
|
|
finally:
|
|
os.chmod(mbox_dir, 0700)
|
|
archivemail.options.quiet = False
|
|
|
|
class TestMboxPosixLock(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestMboxPosixLock, self).setUp()
|
|
self.mbox_name = make_mbox()
|
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
|
|
|
def testPosixLock(self):
|
|
"""posix_lock/unlock should create/delete an advisory lock"""
|
|
|
|
# The following code snippet heavily lends from the Python 2.5 mailbox
|
|
# unittest.
|
|
# BEGIN robbery:
|
|
|
|
# Fork off a subprocess that will lock the file for 2 seconds,
|
|
# unlock it, and then exit.
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
# In the child, lock the mailbox.
|
|
self.mbox._posix_lock()
|
|
time.sleep(2)
|
|
self.mbox._posix_unlock()
|
|
os._exit(0)
|
|
|
|
# In the parent, sleep a bit to give the child time to acquire
|
|
# the lock.
|
|
time.sleep(0.5)
|
|
# The parent's file self.mbox.mbox_file shares fcntl locks with the
|
|
# duplicated FD in the child; reopen it so we get a different file
|
|
# table entry.
|
|
file = open(self.mbox_name, "r+")
|
|
lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
|
|
fd = file.fileno()
|
|
try:
|
|
self.assertRaises(IOError, fcntl.lockf, fd, lock_nb)
|
|
|
|
finally:
|
|
# Wait for child to exit. Locking should now succeed.
|
|
exited_pid, status = os.waitpid(pid, 0)
|
|
|
|
fcntl.lockf(fd, lock_nb)
|
|
fcntl.lockf(fd, fcntl.LOCK_UN)
|
|
# END robbery
|
|
|
|
|
|
class TestMboxNext(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestMboxNext, self).setUp()
|
|
self.not_empty_name = make_mbox(messages=18)
|
|
self.empty_name = make_mbox(messages=0)
|
|
|
|
def testNextEmpty(self):
|
|
"""mbox.next() should return None on an empty mailbox"""
|
|
mbox = archivemail.Mbox(self.empty_name)
|
|
msg = mbox.next()
|
|
self.assertEqual(msg, None)
|
|
|
|
def testNextNotEmpty(self):
|
|
"""mbox.next() should a message on a populated mailbox"""
|
|
mbox = archivemail.Mbox(self.not_empty_name)
|
|
for count in range(18):
|
|
msg = mbox.next()
|
|
assert msg
|
|
msg = mbox.next()
|
|
self.assertEqual(msg, None)
|
|
|
|
|
|
############ TempMbox Class testing ##############
|
|
|
|
class TestTempMboxWrite(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestTempMboxWrite, self).setUp()
|
|
|
|
def testWrite(self):
|
|
"""mbox.write() should append messages to a mbox mailbox"""
|
|
read_file = make_mbox(messages=3)
|
|
mbox_read = archivemail.Mbox(read_file)
|
|
mbox_write = archivemail.TempMbox()
|
|
write_file = mbox_write.mbox_file_name
|
|
for count in range(3):
|
|
msg = mbox_read.next()
|
|
mbox_write.write(msg)
|
|
mbox_read.close()
|
|
mbox_write.close()
|
|
assert filecmp.cmp(read_file, write_file, shallow=0)
|
|
|
|
def testWriteNone(self):
|
|
"""calling mbox.write() with no message should raise AssertionError"""
|
|
write = archivemail.TempMbox()
|
|
self.assertRaises(AssertionError, write.write, None)
|
|
|
|
class TestTempMboxRemove(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestTempMboxRemove, self).setUp()
|
|
self.mbox = archivemail.TempMbox()
|
|
self.mbox_name = self.mbox.mbox_file_name
|
|
|
|
def testMboxRemove(self):
|
|
"""remove() should delete a mbox mailbox"""
|
|
assert os.path.exists(self.mbox_name)
|
|
self.mbox.remove()
|
|
assert not os.path.exists(self.mbox_name)
|
|
|
|
|
|
|
|
########## options class testing #################
|
|
|
|
class TestOptionDefaults(unittest.TestCase):
|
|
def testVerbose(self):
|
|
"""verbose should be off by default"""
|
|
self.assertEqual(archivemail.options.verbose, False)
|
|
|
|
def testDaysOldMax(self):
|
|
"""default archival time should be 180 days"""
|
|
self.assertEqual(archivemail.options.days_old_max, 180)
|
|
|
|
def testQuiet(self):
|
|
"""quiet should be off by default"""
|
|
self.assertEqual(archivemail.options.quiet, False)
|
|
|
|
def testDeleteOldMail(self):
|
|
"""we should not delete old mail by default"""
|
|
self.assertEqual(archivemail.options.delete_old_mail, False)
|
|
|
|
def testNoCompress(self):
|
|
"""no-compression should be off by default"""
|
|
self.assertEqual(archivemail.options.no_compress, False)
|
|
|
|
def testIncludeFlagged(self):
|
|
"""we should not archive flagged messages by default"""
|
|
self.assertEqual(archivemail.options.include_flagged, False)
|
|
|
|
def testPreserveUnread(self):
|
|
"""we should not preserve unread messages by default"""
|
|
self.assertEqual(archivemail.options.preserve_unread, False)
|
|
|
|
class TestOptionParser(unittest.TestCase):
|
|
def setUp(self):
|
|
self.oldopts = copy.copy(archivemail.options)
|
|
|
|
def testOptionDate(self):
|
|
"""--date and -D options are parsed correctly"""
|
|
date_formats = (
|
|
"%Y-%m-%d", # ISO format
|
|
"%d %b %Y" , # Internet format
|
|
"%d %B %Y" , # Internet format with full month names
|
|
)
|
|
date = time.strptime("2000-07-29", "%Y-%m-%d")
|
|
unixdate = time.mktime(date)
|
|
for df in date_formats:
|
|
d = time.strftime(df, date)
|
|
for opt in '-D', '--date=':
|
|
archivemail.options.date_old_max = None
|
|
archivemail.options.parse_args([opt+d], "")
|
|
self.assertEqual(unixdate, archivemail.options.date_old_max)
|
|
|
|
def testOptionPreserveUnread(self):
|
|
"""--preserve-unread option is parsed correctly"""
|
|
archivemail.options.parse_args(["--preserve-unread"], "")
|
|
assert archivemail.options.preserve_unread
|
|
archivemail.options.preserve_unread = False
|
|
archivemail.options.parse_args(["-u"], "")
|
|
assert archivemail.options.preserve_unread
|
|
|
|
def testOptionSuffix(self):
|
|
"""--suffix and -s options are parsed correctly"""
|
|
for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
|
|
archivemail.options.parse_args(["--suffix="+suffix], "")
|
|
self.assertEqual(archivemail.options.archive_suffix, suffix)
|
|
archivemail.options.archive_suffix = None
|
|
archivemail.options.parse_args(["-s", suffix], "")
|
|
self.assertEqual(archivemail.options.archive_suffix, suffix)
|
|
|
|
def testOptionPrefix(self):
|
|
"""--prefix and -p options are parsed correctly"""
|
|
for prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
|
|
archivemail.options.parse_args(["--prefix="+prefix], "")
|
|
self.assertEqual(archivemail.options.archive_prefix, prefix)
|
|
archivemail.options.archive_prefix = None
|
|
archivemail.options.parse_args(["-p", prefix], "")
|
|
self.assertEqual(archivemail.options.archive_prefix, prefix)
|
|
|
|
def testOptionArchivename(self):
|
|
"""--archive-name and -a options are parsed correctly"""
|
|
for name in ("custom", ".withdot", "custom_%Y", "%Y/joe"):
|
|
archivemail.options.parse_args(["--archive-name="+name], "")
|
|
self.assertEqual(archivemail.options.archive_name, name)
|
|
archivemail.options.archive_name = None
|
|
archivemail.options.parse_args(["-a", name], "")
|
|
self.assertEqual(archivemail.options.archive_name, name)
|
|
|
|
def testOptionDryrun(self):
|
|
"""--dry-run option is parsed correctly"""
|
|
archivemail.options.parse_args(["--dry-run"], "")
|
|
assert archivemail.options.dry_run
|
|
archivemail.options.preserve_unread = False
|
|
archivemail.options.parse_args(["-n"], "")
|
|
assert archivemail.options.dry_run
|
|
|
|
def testOptionDays(self):
|
|
"""--days and -d options are parsed correctly"""
|
|
archivemail.options.parse_args(["--days=11"], "")
|
|
self.assertEqual(archivemail.options.days_old_max, 11)
|
|
archivemail.options.days_old_max = None
|
|
archivemail.options.parse_args(["-d11"], "")
|
|
self.assertEqual(archivemail.options.days_old_max, 11)
|
|
|
|
def testOptionDelete(self):
|
|
"""--delete option is parsed correctly"""
|
|
archivemail.options.parse_args(["--delete"], "")
|
|
assert archivemail.options.delete_old_mail
|
|
|
|
def testOptionCopy(self):
|
|
"""--copy option is parsed correctly"""
|
|
archivemail.options.parse_args(["--copy"], "")
|
|
assert archivemail.options.copy_old_mail
|
|
|
|
def testOptionOutputdir(self):
|
|
"""--output-dir and -o options are parsed correctly"""
|
|
for path in "/just/some/path", "relative/path":
|
|
archivemail.options.parse_args(["--output-dir=%s" % path], "")
|
|
self.assertEqual(archivemail.options.output_dir, path)
|
|
archivemail.options.output_dir = None
|
|
archivemail.options.parse_args(["-o%s" % path], "")
|
|
self.assertEqual(archivemail.options.output_dir, path)
|
|
|
|
def testOptionNocompress(self):
|
|
"""--no-compress option is parsed correctly"""
|
|
archivemail.options.parse_args(["--no-compress"], "")
|
|
assert archivemail.options.no_compress
|
|
|
|
def testOptionSize(self):
|
|
"""--size and -S options are parsed correctly"""
|
|
size = "666"
|
|
archivemail.options.parse_args(["--size=%s" % size ], "")
|
|
self.assertEqual(archivemail.options.min_size, int(size))
|
|
archivemail.options.parse_args(["-S%s" % size ], "")
|
|
self.assertEqual(archivemail.options.min_size, int(size))
|
|
|
|
def tearDown(self):
|
|
archivemail.options = self.oldopts
|
|
|
|
########## archivemail.is_older_than_days() unit testing #################
|
|
|
|
class TestIsTooOld(unittest.TestCase):
|
|
def testVeryOld(self):
|
|
"""with max_days=360, should be true for these dates > 1 year"""
|
|
for years in range(1, 10):
|
|
time_msg = time.time() - (years * 365 * 24 * 60 * 60)
|
|
assert archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=360)
|
|
|
|
def testOld(self):
|
|
"""with max_days=14, should be true for these dates > 14 days"""
|
|
for days in range(14, 360):
|
|
time_msg = time.time() - (days * 24 * 60 * 60)
|
|
assert archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=14)
|
|
|
|
def testJustOld(self):
|
|
"""with max_days=1, should be true for these dates >= 1 day"""
|
|
for minutes in range(0, 61):
|
|
time_msg = time.time() - (25 * 60 * 60) + (minutes * 60)
|
|
assert archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=1)
|
|
|
|
def testNotOld(self):
|
|
"""with max_days=9, should be false for these dates < 9 days"""
|
|
for days in range(0, 9):
|
|
time_msg = time.time() - (days * 24 * 60 * 60)
|
|
assert not archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=9)
|
|
|
|
def testJustNotOld(self):
|
|
"""with max_days=1, should be false for these hours <= 1 day"""
|
|
for minutes in range(0, 60):
|
|
time_msg = time.time() - (23 * 60 * 60) - (minutes * 60)
|
|
assert not archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=1)
|
|
|
|
def testFuture(self):
|
|
"""with max_days=1, should be false for times in the future"""
|
|
for minutes in range(0, 60):
|
|
time_msg = time.time() + (minutes * 60)
|
|
assert not archivemail.is_older_than_days(time_message=time_msg,
|
|
max_days=1)
|
|
|
|
########## archivemail.parse_imap_url() unit testing #################
|
|
|
|
class TestParseIMAPUrl(unittest.TestCase):
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
archivemail.options.verbose = False
|
|
archivemail.options.pwfile = None
|
|
|
|
urls_withoutpass = [
|
|
('imap://user@example.org@imap.example.org/upperbox/lowerbox',
|
|
('user', None, 'example.org@imap.example.org', 143,
|
|
'upperbox/lowerbox')),
|
|
('imap://"user@example.org"@imap.example.org/upperbox/lowerbox',
|
|
('user@example.org', None, 'imap.example.org', 143,
|
|
'upperbox/lowerbox')),
|
|
('imap://user@example.org"@imap.example.org/upperbox/lowerbox',
|
|
('user', None, 'example.org"@imap.example.org', 143,
|
|
'upperbox/lowerbox')),
|
|
('imaps://"user@example.org@imap.example.org/upperbox/lowerbox',
|
|
('"user', None, 'example.org@imap.example.org', 993,
|
|
'upperbox/lowerbox')),
|
|
('imaps://"us\\"er@example.org"@imap.example.org/upperbox/lowerbox',
|
|
('us"er@example.org', None, 'imap.example.org', 993,
|
|
'upperbox/lowerbox')),
|
|
('imaps://user\\@example.org@imap.example.org/upperbox/lowerbox',
|
|
('user\\', None, 'example.org@imap.example.org', 993,
|
|
'upperbox/lowerbox'))
|
|
]
|
|
urls_withpass = [
|
|
('imap://user@example.org:passwd@imap.example.org/upperbox/lowerbox',
|
|
('user@example.org', 'passwd', 'imap.example.org', 143,
|
|
'upperbox/lowerbox')),
|
|
('imaps://"user@example.org:passwd@imap.example.org/upperbox/lowerbox',
|
|
('"user@example.org', "passwd", 'imap.example.org', 993,
|
|
'upperbox/lowerbox')),
|
|
('imaps://u\\ser\\@example.org:"p@sswd"@imap.example.org/upperbox/lowerbox',
|
|
('u\\ser\\@example.org', 'p@sswd', 'imap.example.org', 993,
|
|
'upperbox/lowerbox'))
|
|
]
|
|
# These are invalid when the password's not stripped.
|
|
urls_onlywithpass = [
|
|
('imaps://"user@example.org":passwd@imap.example.org/upperbox/lowerbox',
|
|
('user@example.org', "passwd", 'imap.example.org',
|
|
'upperbox/lowerbox'))
|
|
]
|
|
def testUrlsWithoutPwfile(self):
|
|
"""Parse test urls with --pwfile option unset. This parses a password in
|
|
the URL, if present."""
|
|
archivemail.options.pwfile = None
|
|
for mbstr in self.urls_withpass + self.urls_withoutpass:
|
|
url = mbstr[0]
|
|
result = archivemail.parse_imap_url(url)
|
|
self.assertEqual(result, mbstr[1])
|
|
|
|
def testUrlsWithPwfile(self):
|
|
"""Parse test urls with --pwfile set. In this case the ':' character
|
|
loses its meaning as a delimiter."""
|
|
archivemail.options.pwfile = "whocares.txt"
|
|
for mbstr in self.urls_onlywithpass:
|
|
url = mbstr[0]
|
|
self.assertRaises(archivemail.UnexpectedError,
|
|
archivemail.parse_imap_url, url)
|
|
|
|
def testUrlsDefaultPorts(self):
|
|
"""If an IMAP URL does not specify a server port, the standard ports
|
|
are used."""
|
|
archivemail.options.pwfile = "doesnotexist.txt"
|
|
self.assertEqual(143, archivemail.parse_imap_url("imap://user@host/box")[3])
|
|
self.assertEqual(993, archivemail.parse_imap_url("imaps://user@host/box")[3])
|
|
|
|
def testUrlsWithPassAndPortnumber(self):
|
|
"""IMAP URLs with an embedded password and a server port number are
|
|
correctly parsed."""
|
|
self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
|
|
self.assertEqual(1234, archivemail.parse_imap_url("imap://user:pass@host:1234/box")[3])
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.verbose = False
|
|
archivemail.options.pwfile = None
|
|
|
|
########## quoting and un-quoting of IMAP strings ##########
|
|
|
|
class TestIMAPQuoting(unittest.TestCase):
|
|
stringlist = (
|
|
('{braces} and space', '"{braces} and space"'),
|
|
('\\backslash', '"\\\\backslash"'),
|
|
('with "quotes" inbetween', '"with \\"quotes\\" inbetween"'),
|
|
('ending with "quotes"', '"ending with \\"quotes\\""'),
|
|
('\\"backslash before quote', '"\\\\\\"backslash before quote"')
|
|
)
|
|
|
|
def testQuote(self):
|
|
for unquoted, quoted in self.stringlist:
|
|
self.assertEqual(archivemail.imap_quote(unquoted), quoted)
|
|
|
|
def testUnquote(self):
|
|
for unquoted, quoted in self.stringlist:
|
|
self.assertEqual(unquoted, archivemail.imap_unquote(quoted))
|
|
|
|
|
|
########## Modified UTF-7 support functions ##########
|
|
|
|
class TestModUTF7(unittest.TestCase):
|
|
goodpairs = (
|
|
(u"A\N{NOT IDENTICAL TO}A.", "A&ImI-A."),
|
|
(u"Hi Mom -\N{WHITE SMILING FACE}-!", "Hi Mom -&Jjo--!"),
|
|
(u"~peter/mail/\u53f0\u5317/\u65e5\u672c\u8a9e",
|
|
"~peter/mail/&U,BTFw-/&ZeVnLIqe-")
|
|
)
|
|
|
|
def testEncode(self):
|
|
"""Ensure that encoding text in modified UTF-7 works properly."""
|
|
for text, code in self.goodpairs:
|
|
self.assertEqual(archivemail.mod_utf7_encode(text), code)
|
|
|
|
def testDecode(self):
|
|
"""Ensure that decoding modified UTF-7 to text works properly."""
|
|
for text, code in self.goodpairs:
|
|
self.assertEqual(archivemail.mod_utf7_decode(code), text)
|
|
|
|
|
|
########## acceptance testing ###########
|
|
|
|
class TestArchive(TestCaseInTempdir):
|
|
"""Base class defining helper functions for doing test archiving runs."""
|
|
mbox = None # mbox file that will be processed by archivemail
|
|
good_archive = None # Uncompressed reference archive file to verify the
|
|
# archive after processing
|
|
good_mbox = None # Reference mbox file to verify the mbox after processing
|
|
|
|
def verify(self):
|
|
assert os.path.exists(self.mbox)
|
|
if self.good_mbox is not None:
|
|
assertEqualContent(self.mbox, self.good_mbox)
|
|
else:
|
|
self.assertEqual(os.path.getsize(self.mbox), 0)
|
|
archive_name = self.mbox + "_archive"
|
|
if not archivemail.options.no_compress:
|
|
archive_name += ".gz"
|
|
iszipped = True
|
|
else:
|
|
assert not os.path.exists(archive_name + ".gz")
|
|
iszipped = False
|
|
if self.good_archive is not None:
|
|
assertEqualContent(archive_name, self.good_archive, iszipped)
|
|
else:
|
|
assert not os.path.exists(archive_name)
|
|
|
|
def make_old_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
|
|
"""Prepare for a test run with an old mbox by making an old mbox,
|
|
optionally an existing archive, and a reference archive to verify the
|
|
archive after archivemail has run."""
|
|
self.mbox = make_mbox(body, headers, 181*24, messages)
|
|
archive_does_change = not (archivemail.options.dry_run or
|
|
archivemail.options.delete_old_mail)
|
|
mbox_does_not_change = archivemail.options.dry_run or \
|
|
archivemail.options.copy_old_mail
|
|
if make_old_archive:
|
|
archive = archivemail.make_archive_name(self.mbox)
|
|
self.good_archive = make_archive_and_plain_copy(archive)
|
|
if archive_does_change:
|
|
append_file(self.mbox, self.good_archive)
|
|
elif archive_does_change:
|
|
self.good_archive = tempfile.mkstemp()[1]
|
|
shutil.copyfile(self.mbox, self.good_archive)
|
|
if mbox_does_not_change:
|
|
if archive_does_change and not make_old_archive:
|
|
self.good_mbox = self.good_archive
|
|
else:
|
|
self.good_mbox = tempfile.mkstemp()[1]
|
|
shutil.copyfile(self.mbox, self.good_mbox)
|
|
|
|
def make_mixed_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
|
|
"""Prepare for a test run with a mixed mbox by making a mixed mbox,
|
|
optionally an existing archive, a reference archive to verify the
|
|
archive after archivemail has run, and likewise a reference mbox to
|
|
verify the mbox."""
|
|
self.make_old_mbox(body, headers, messages=messages, make_old_archive=make_old_archive)
|
|
new_mbox_name = make_mbox(body, headers, 179*24, messages)
|
|
append_file(new_mbox_name, self.mbox)
|
|
if self.good_mbox is None:
|
|
self.good_mbox = new_mbox_name
|
|
else:
|
|
if self.good_mbox == self.good_archive:
|
|
self.good_mbox = tempfile.mkstemp()[1]
|
|
shutil.copyfile(self.mbox, self.good_mbox)
|
|
else:
|
|
append_file(new_mbox_name, self.good_mbox)
|
|
|
|
def make_new_mbox(self, body=None, headers=None, messages=1, make_old_archive=False):
|
|
"""Prepare for a test run with a new mbox by making a new mbox,
|
|
optionally an exiting archive, and a reference mbox to verify the mbox
|
|
after archivemail has run."""
|
|
self.mbox = make_mbox(body, headers, 179*24, messages)
|
|
self.good_mbox = tempfile.mkstemp()[1]
|
|
shutil.copyfile(self.mbox, self.good_mbox)
|
|
if make_old_archive:
|
|
archive = archivemail.make_archive_name(self.mbox)
|
|
self.good_archive = make_archive_and_plain_copy(archive)
|
|
|
|
|
|
class TestArchiveMbox(TestArchive):
|
|
"""archiving should work based on the date of messages given"""
|
|
|
|
def setUp(self):
|
|
self.oldopts = copy.copy(archivemail.options)
|
|
archivemail.options.quiet = True
|
|
super(TestArchiveMbox, self).setUp()
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox"""
|
|
self.make_old_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOldFromInBody(self):
|
|
"""archiving an old mailbox with 'From ' in the body"""
|
|
body = """This is a message with ^From at the start of a line
|
|
From is on this line
|
|
This is after the ^From line"""
|
|
self.make_old_mbox(messages=3, body=body)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testDateSystem(self):
|
|
"""test that the --date option works as expected"""
|
|
test_headers = (
|
|
{
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
|
|
'Date' : None,
|
|
},
|
|
{
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : None,
|
|
'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : None,
|
|
'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
)
|
|
for headers in test_headers:
|
|
msg = make_message(default_headers=headers, wantobj=True)
|
|
date = time.strptime("2000-07-29", "%Y-%m-%d")
|
|
archivemail.options.date_old_max = time.mktime(date)
|
|
assert archivemail.should_archive(msg)
|
|
date = time.strptime("2000-07-27", "%Y-%m-%d")
|
|
archivemail.options.date_old_max = time.mktime(date)
|
|
assert not archivemail.should_archive(msg)
|
|
|
|
def testMixed(self):
|
|
"""archiving a mixed mailbox"""
|
|
self.make_mixed_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testNew(self):
|
|
"""archiving a new mailbox"""
|
|
self.make_new_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOldExisting(self):
|
|
"""archiving an old mailbox with an existing archive"""
|
|
self.make_old_mbox(messages=3, make_old_archive=True)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOldWeirdHeaders(self):
|
|
"""archiving old mailboxes with weird headers"""
|
|
weird_headers = (
|
|
{ # we should archive because of the date on the 'From_' line
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
|
|
'Date' : 'Friskhdfkjkh, 28 Jul 2002 1line noise6:11:36 +1000',
|
|
},
|
|
{ # we should archive because of the date on the 'From_' line
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
|
|
'Date' : None,
|
|
},
|
|
{ # we should archive because of the date in 'Delivery-date'
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
|
|
'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{ # we should archive because of the date in 'Delivery-date'
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : None,
|
|
'Delivery-date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{ # we should archive because of the date in 'Resent-Date'
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : 'Frcorruptioni, 28 Jul 20line noise00 16:6 +1000',
|
|
'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{ # we should archive because of the date in 'Resent-Date'
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2030',
|
|
'Date' : None,
|
|
'Resent-Date' : 'Fri, 28 Jul 2000 16:11:36 +1000',
|
|
},
|
|
{ # completely blank dates were crashing < version 0.4.7
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
|
|
'Date' : '',
|
|
},
|
|
{ # completely blank dates were crashing < version 0.4.7
|
|
'From_' : 'sender@dummy.domain Fri Jul 28 16:11:36 2000',
|
|
'Date' : '',
|
|
'Resent-Date' : '',
|
|
},
|
|
)
|
|
fd, self.mbox = tempfile.mkstemp()
|
|
fp = os.fdopen(fd, "w")
|
|
for headers in weird_headers:
|
|
msg_text = make_message(default_headers=headers)
|
|
fp.write(msg_text*2)
|
|
fp.close()
|
|
self.good_archive = tempfile.mkstemp()[1]
|
|
shutil.copyfile(self.mbox, self.good_archive)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options = self.oldopts
|
|
super(TestArchiveMbox, self).tearDown()
|
|
|
|
|
|
class TestArchiveMboxTimestamp(TestCaseInTempdir):
|
|
"""original mbox timestamps should always be preserved"""
|
|
def setUp(self):
|
|
super(TestArchiveMboxTimestamp, self).setUp()
|
|
archivemail.options.quiet = True
|
|
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180))
|
|
self.mtime = os.path.getmtime(self.mbox_name) - 66
|
|
self.atime = os.path.getatime(self.mbox_name) - 88
|
|
os.utime(self.mbox_name, (self.atime, self.mtime))
|
|
|
|
def testNew(self):
|
|
"""mbox timestamps should not change after no archival"""
|
|
archivemail.options.days_old_max = 181
|
|
archivemail.archive(self.mbox_name)
|
|
self.verify()
|
|
|
|
def testOld(self):
|
|
"""mbox timestamps should not change after archival"""
|
|
archivemail.options.days_old_max = 179
|
|
archivemail.archive(self.mbox_name)
|
|
self.verify()
|
|
|
|
def verify(self):
|
|
assert os.path.exists(self.mbox_name)
|
|
new_atime = os.path.getatime(self.mbox_name)
|
|
new_mtime = os.path.getmtime(self.mbox_name)
|
|
self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision)
|
|
self.assertAlmostEqual(self.atime, new_atime, utimes_precision)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.days_old_max = 180
|
|
os.remove(self.mbox_name)
|
|
super(TestArchiveMboxTimestamp, self).tearDown()
|
|
|
|
|
|
class TestArchiveMboxAll(unittest.TestCase):
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
archivemail.options.archive_all = True
|
|
|
|
def testNew(self):
|
|
"""new messages should be archived with --all"""
|
|
self.msg = make_message(hours_old=24*179, wantobj=True)
|
|
assert archivemail.should_archive(self.msg)
|
|
|
|
def testOld(self):
|
|
"""old messages should be archived with --all"""
|
|
self.msg = make_message(hours_old=24*181, wantobj=True)
|
|
assert archivemail.should_archive(self.msg)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_all = False
|
|
|
|
class TestArchiveMboxPreserveUnread(unittest.TestCase):
|
|
"""make sure the 'preserve_unread' option works"""
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
archivemail.options.preserve_unread = True
|
|
self.msg = make_message(hours_old=24*181, wantobj=True)
|
|
|
|
def testOldRead(self):
|
|
"""old read messages should be archived with --preserve-unread"""
|
|
self.msg["Status"] = "RO"
|
|
assert archivemail.should_archive(self.msg)
|
|
|
|
def testOldUnread(self):
|
|
"""old unread messages should not be archived with --preserve-unread"""
|
|
self.msg["Status"] = "O"
|
|
assert not archivemail.should_archive(self.msg)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.preserve_unread = False
|
|
|
|
|
|
class TestArchiveMboxSuffix(unittest.TestCase):
|
|
"""make sure the 'suffix' option works"""
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
|
|
def testSuffix(self):
|
|
"""archiving with specified --suffix arguments"""
|
|
for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"):
|
|
mbox_name = "foobar"
|
|
archivemail.options.archive_suffix = suffix
|
|
days_old_max = 180
|
|
parsed_suffix_time = time.time() - days_old_max*24*60*60
|
|
parsed_suffix = time.strftime(suffix,
|
|
time.localtime(parsed_suffix_time))
|
|
archive_name = mbox_name + parsed_suffix
|
|
self.assertEqual(archive_name,
|
|
archivemail.make_archive_name(mbox_name))
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_suffix = None
|
|
|
|
class TestArchiveMboxPrefix(unittest.TestCase):
|
|
"""make sure the 'prefix' option works"""
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
|
|
def testPrefix(self):
|
|
"""archiving with specified --prefix arguments"""
|
|
for archive_prefix in ("_static_", "_%B_%Y", "-%Y-%m-%d", "%Y/%m/"):
|
|
archivemail.options.archive_prefix = archive_prefix
|
|
for mbox_name in "foobar", "/tmp/foobar", "schnorchz/foobar":
|
|
archive_dir, archive_base = os.path.split(mbox_name)
|
|
days = archivemail.options.days_old_max
|
|
tm = time.localtime(time.time() - days*24*60*60)
|
|
prefix = time.strftime(archive_prefix, tm)
|
|
archive_name = os.path.join(archive_dir, prefix + archive_base)
|
|
self.assertEqual(archive_name,
|
|
archivemail.make_archive_name(mbox_name))
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_prefix = None
|
|
|
|
class TestArchiveName(unittest.TestCase):
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
|
|
def testArchiveName(self):
|
|
"""test the --archive-name option"""
|
|
archive_names = ("custom", ".withdot", "custom_%Y", "%Y/joe")
|
|
mbox = "foobar"
|
|
for name in archive_names:
|
|
archivemail.options.archive_name = name
|
|
days = archivemail.options.days_old_max
|
|
tm = time.localtime(time.time() - days*24*60*60)
|
|
name = time.strftime(name, tm)
|
|
self.assertEqual(archivemail.make_archive_name(mbox), name)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_name = None
|
|
|
|
class TestArchiveAffixes(unittest.TestCase):
|
|
def setUp(self):
|
|
self.mbox = "harbsch"
|
|
self.archive_prefix = "wurbl+"
|
|
self.archive_suffix = "+schronk&borsz"
|
|
archivemail.options.quiet = True
|
|
|
|
def testDefaultPrefix(self):
|
|
"""if no archive name affix is specified, the default archive suffix is appended"""
|
|
self.assertEqual(archivemail.make_archive_name(self.mbox),
|
|
self.mbox + archivemail.options.archive_default_suffix)
|
|
|
|
def testPrefixKillsDefaultSuffix(self):
|
|
"""if an archive name prefix is specified, the default archive suffix is not appended"""
|
|
archivemail.options.archive_prefix = self.archive_prefix
|
|
self.assertEqual(archivemail.make_archive_name(self.mbox),
|
|
self.archive_prefix + self.mbox)
|
|
|
|
def testPrefixAndSuffix(self):
|
|
"""specifying both an archive name prefix and suffix works"""
|
|
archivemail.options.archive_prefix = self.archive_prefix
|
|
archivemail.options.archive_suffix = self.archive_suffix
|
|
self.assertEqual(archivemail.make_archive_name(self.mbox),
|
|
self.archive_prefix + self.mbox + self.archive_suffix)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.archive_prefix = None
|
|
archivemail.options.archive_suffix = None
|
|
archivemail.options.quiet = False
|
|
|
|
class TestArchiveHiddenMbox(unittest.TestCase):
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
self.mbox = ".upper.lower"
|
|
|
|
def testHiddenMbox(self):
|
|
"""leading dots are stripped from the archive name when no prefix is added"""
|
|
self.assertEqual(archivemail.make_archive_name(self.mbox),
|
|
self.mbox.lstrip('.') +
|
|
archivemail.options.archive_default_suffix)
|
|
|
|
def testHiddenMboxPrefixedArchive(self):
|
|
"""no dots are stripped from the archive name when a prefix is added"""
|
|
prefix = ".hidden_"
|
|
archivemail.options.archive_prefix = prefix
|
|
self.assertEqual(archivemail.make_archive_name(self.mbox),
|
|
prefix + self.mbox)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_prefix = None
|
|
|
|
class TestArchiveDryRun(TestArchive):
|
|
"""make sure the 'dry-run' option works"""
|
|
def setUp(self):
|
|
super(TestArchiveDryRun, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.dry_run = True
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox with the 'dry-run' option"""
|
|
self.make_old_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options.dry_run = False
|
|
archivemail.options.quiet = False
|
|
super(TestArchiveDryRun, self).tearDown()
|
|
|
|
|
|
class TestArchiveDelete(TestArchive):
|
|
"""make sure the 'delete' option works"""
|
|
def setUp(self):
|
|
super(TestArchiveDelete, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.delete_old_mail = True
|
|
|
|
def testNew(self):
|
|
"""archiving a new mailbox with the 'delete' option"""
|
|
self.make_new_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testMixed(self):
|
|
"""archiving a mixed mailbox with the 'delete' option"""
|
|
self.make_mixed_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox with the 'delete' option"""
|
|
self.make_old_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options.delete_old_mail = False
|
|
archivemail.options.quiet = False
|
|
super(TestArchiveDelete, self).tearDown()
|
|
|
|
|
|
class TestArchiveCopy(TestArchive):
|
|
"""make sure the 'copy' option works"""
|
|
def setUp(self):
|
|
super(TestArchiveCopy, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.copy_old_mail = True
|
|
|
|
def testNew(self):
|
|
"""archiving a new mailbox with the 'copy' option"""
|
|
self.make_new_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testMixed(self):
|
|
"""archiving a mixed mailbox with the 'copy' option"""
|
|
self.make_mixed_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox with the 'copy' option"""
|
|
self.make_old_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options.copy_old_mail = False
|
|
archivemail.options.quiet = False
|
|
super(TestArchiveCopy, self).tearDown()
|
|
|
|
|
|
class TestArchiveMboxFlagged(unittest.TestCase):
|
|
"""make sure the 'include_flagged' option works"""
|
|
def setUp(self):
|
|
archivemail.options.include_flagged = False
|
|
archivemail.options.quiet = True
|
|
|
|
def testOld(self):
|
|
"""by default, old flagged messages should not be archived"""
|
|
msg = make_message(default_headers={"X-Status": "F"},
|
|
hours_old=24*181, wantobj=True)
|
|
assert not archivemail.should_archive(msg)
|
|
|
|
def testIncludeFlaggedNew(self):
|
|
"""new flagged messages should not be archived with include_flagged"""
|
|
msg = make_message(default_headers={"X-Status": "F"},
|
|
hours_old=24*179, wantobj=True)
|
|
assert not archivemail.should_archive(msg)
|
|
|
|
def testIncludeFlaggedOld(self):
|
|
"""old flagged messages should be archived with include_flagged"""
|
|
archivemail.options.include_flagged = True
|
|
msg = make_message(default_headers={"X-Status": "F"},
|
|
hours_old=24*181, wantobj=True)
|
|
assert archivemail.should_archive(msg)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.include_flagged = False
|
|
archivemail.options.quiet = False
|
|
|
|
|
|
class TestArchiveMboxOutputDir(unittest.TestCase):
|
|
"""make sure that the 'output-dir' option works"""
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox with a sepecified output dir"""
|
|
for dir in "/just/a/path", "relative/path":
|
|
archivemail.options.output_dir = dir
|
|
archive_dir = archivemail.make_archive_name("/tmp/mbox")
|
|
self.assertEqual(dir, os.path.dirname(archive_dir))
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.output_dir = None
|
|
|
|
|
|
class TestArchiveMboxUncompressed(TestArchive):
|
|
"""make sure that the 'no_compress' option works"""
|
|
mbox_name = None
|
|
new_mbox = None
|
|
old_mbox = None
|
|
copy_name = None
|
|
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
archivemail.options.no_compress = True
|
|
super(TestArchiveMboxUncompressed, self).setUp()
|
|
|
|
def testOld(self):
|
|
"""archiving an old mailbox uncompressed"""
|
|
self.make_old_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testNew(self):
|
|
"""archiving a new mailbox uncompressed"""
|
|
self.make_new_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testMixed(self):
|
|
"""archiving a mixed mailbox uncompressed"""
|
|
self.make_mixed_mbox(messages=3)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def testOldExists(self):
|
|
"""archiving an old mailbox uncopressed with an existing archive"""
|
|
self.make_old_mbox(messages=3, make_old_archive=True)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.no_compress = False
|
|
super(TestArchiveMboxUncompressed, self).tearDown()
|
|
|
|
|
|
class TestArchiveSize(unittest.TestCase):
|
|
"""check that the 'size' argument works"""
|
|
def setUp(self):
|
|
archivemail.options.quiet = True
|
|
msg_text = make_message(hours_old=24*181)
|
|
self.msg_size = len(msg_text)
|
|
fp = cStringIO.StringIO(msg_text)
|
|
self.msg = rfc822.Message(fp)
|
|
|
|
def testSmaller(self):
|
|
"""giving a size argument smaller than the message"""
|
|
archivemail.options.min_size = self.msg_size - 1
|
|
assert archivemail.should_archive(self.msg)
|
|
|
|
def testBigger(self):
|
|
"""giving a size argument bigger than the message"""
|
|
archivemail.options.min_size = self.msg_size + 1
|
|
assert not archivemail.should_archive(self.msg)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.min_size = None
|
|
|
|
|
|
class TestXIMAPMessage(TestArchive):
|
|
"""Test if IMAP pseudo messages in mboxes are properly handled."""
|
|
def setUp(self):
|
|
super(TestXIMAPMessage, self).setUp()
|
|
archivemail.options.quiet = True
|
|
|
|
def testXIMAPMbox(self):
|
|
"""IMAP pseudo messages in an mbox are always preserved."""
|
|
self.good_mbox = make_mbox(hours_old=181*24, headers={'X-IMAP': 'dummytext'},
|
|
messages=1)
|
|
self.good_archive = make_mbox(hours_old=181*24, messages=3)
|
|
self.mbox = tempfile.mkstemp()[-1]
|
|
shutil.copyfile(self.good_mbox, self.mbox)
|
|
append_file(self.good_archive, self.mbox)
|
|
archivemail.archive(self.mbox)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
super(TestXIMAPMessage, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
|
|
|
|
############# Test archiving maildirs ###############
|
|
|
|
class TestArchiveMailboxdir(TestCaseInTempdir):
|
|
"""Base class defining helper functions for doing test archive runs with
|
|
maildirs."""
|
|
maildir = None # Maildir that will be processed by archivemail
|
|
orig_maildir_obj = None # A backup copy of the maildir, a SimpleMaildir object
|
|
remaining_msg = set() # Filenames of maildir messages that should be preserved
|
|
number_archived = 0 # Number of messages that get archived
|
|
orig_archive = None # An uncompressed copy of a pre-existing archive,
|
|
# if one exists
|
|
|
|
def setUp(self):
|
|
super(TestArchiveMailboxdir, self).setUp()
|
|
self.orig_maildir_obj = SimpleMaildir()
|
|
|
|
def verify(self):
|
|
self._verify_remaining()
|
|
self._verify_archive()
|
|
|
|
def _verify_remaining(self):
|
|
"""Verify that the preserved messages weren't altered."""
|
|
assert self.maildir
|
|
# Compare maildir with backup object.
|
|
dcmp = filecmp.dircmp(self.maildir, self.orig_maildir_obj.root)
|
|
# Top-level has only directories cur, new, tmp and must be unchanged.
|
|
self.assertEqual(dcmp.left_list, dcmp.right_list)
|
|
found = set()
|
|
for d in dcmp.common_dirs:
|
|
dcmp2 = dcmp.subdirs[d]
|
|
# We need to verify three things.
|
|
# 1. directory is a subset of the original...
|
|
assert not dcmp2.left_only
|
|
# 2. all common files are identical...
|
|
self.assertEqual(dcmp2.common_files, dcmp2.same_files)
|
|
found = found.union([os.path.join(d, x) for x in dcmp2.common_files])
|
|
# 3. exactly the `new' messages (recorded in self.remaining_msg)
|
|
# were preserved.
|
|
self.assertEqual(found, self.remaining_msg)
|
|
|
|
def _verify_archive(self):
|
|
"""Verify the archive correctness."""
|
|
# TODO: currently make_archive_name does not include the .gz suffix.
|
|
# Is this something that should be fixed?
|
|
archive = archivemail.make_archive_name(self.maildir)
|
|
if archivemail.options.no_compress:
|
|
iszipped = False
|
|
else:
|
|
archive += '.gz'
|
|
iszipped = True
|
|
if self.number_archived == 0:
|
|
if self.orig_archive:
|
|
assertEqualContent(archive, self.orig_archive, iszipped)
|
|
else:
|
|
assert not os.path.exists(archive)
|
|
return
|
|
fp_new = fp_archive = tmp_archive_name = None
|
|
try:
|
|
if self.orig_archive:
|
|
new_size = os.path.getsize(archive)
|
|
# Brute force: split archive in old and new part and verify the
|
|
# parts separately. (Of course this destroys the archive.)
|
|
fp_archive = open(archive, "r+")
|
|
fp_archive.seek(self.orig_archive_size)
|
|
fd, tmp_archive_name = tempfile.mkstemp()
|
|
fp_new = os.fdopen(fd, "w")
|
|
shutil.copyfileobj(fp_archive, fp_new)
|
|
fp_new.close()
|
|
fp_archive.truncate(self.orig_archive_size)
|
|
fp_archive.close()
|
|
assertEqualContent(archive, self.orig_archive, iszipped)
|
|
new_archive = tmp_archive_name
|
|
else:
|
|
new_archive = archive
|
|
if archivemail.options.no_compress:
|
|
fp_archive = open(new_archive, "r")
|
|
else:
|
|
fp_archive = FixedGzipFile(new_archive, "r")
|
|
mb = mailbox.UnixMailbox(fp_archive)
|
|
found = 0
|
|
for msg in mb:
|
|
self.verify_maildir_has_msg(self.orig_maildir_obj, msg)
|
|
found += 1
|
|
self.assertEqual(found, self.number_archived)
|
|
finally:
|
|
if tmp_archive_name:
|
|
os.remove(tmp_archive_name)
|
|
if fp_new is not None:
|
|
fp_new.close()
|
|
if fp_archive is not None:
|
|
fp_archive.close()
|
|
|
|
def verify_maildir_has_msg(self, maildir, msg):
|
|
"""Assert that the given maildir has a copy of the rfc822 message."""
|
|
mid = msg['Message-Id'] # Complains if there is no message-id
|
|
mdir_msg_str, mdir_flags = \
|
|
maildir.get_message_and_mbox_status(mid)
|
|
mbox_flags = set(msg.get('status', '') + msg.get('x-status', ''))
|
|
self.assertEqual(mdir_flags, mbox_flags)
|
|
|
|
headers = filter(lambda h: msg.isheader(h) not in ('status', 'x-status'),
|
|
msg.headers)
|
|
headers = "".join(headers)
|
|
msg.rewindbody()
|
|
# Discard last mbox LF which is not part of the message.
|
|
body = msg.fp.read()[:-1]
|
|
msg_str = headers + os.linesep + body
|
|
self.assertEqual(mdir_msg_str, msg_str)
|
|
|
|
def add_messages(self, body=None, headers=None, hours_old=0, messages=1):
|
|
for count in range(messages):
|
|
msg = make_message(body, default_headers=headers, mkfrom=False,
|
|
hours_old=hours_old)
|
|
self.orig_maildir_obj.write(msg, new=False)
|
|
|
|
def make_maildir(self, mkold, mknew, body=None, headers=None, messages=1,
|
|
make_old_archive=False):
|
|
mailbox_does_change = not (archivemail.options.dry_run or
|
|
archivemail.options.copy_old_mail)
|
|
archive_does_change = not (archivemail.options.dry_run or
|
|
archivemail.options.delete_old_mail)
|
|
if mknew:
|
|
self.add_messages(body, headers, 179*24, messages)
|
|
if archive_does_change and archivemail.options.archive_all:
|
|
self.number_archived += messages
|
|
if mailbox_does_change:
|
|
self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
|
|
if mkold:
|
|
self.add_messages(body, headers, 181*24, messages)
|
|
if archive_does_change:
|
|
self.number_archived += messages
|
|
if not mailbox_does_change:
|
|
self.remaining_msg = set(self.orig_maildir_obj.get_all_filenames())
|
|
self.maildir = copy_maildir(self.orig_maildir_obj.root)
|
|
if make_old_archive:
|
|
archive = archivemail.make_archive_name(self.maildir)
|
|
self.orig_archive = make_archive_and_plain_copy(archive)
|
|
# FIXME: .gz extension handling is a mess II
|
|
if not archivemail.options.no_compress:
|
|
archive += '.gz'
|
|
self.orig_archive_size = os.path.getsize(archive)
|
|
|
|
class TestEmptyMaildir(TestCaseInTempdir):
|
|
def setUp(self):
|
|
super(TestEmptyMaildir, self).setUp()
|
|
archivemail.options.quiet = True
|
|
|
|
def testEmpty(self):
|
|
"""Archiving an empty maildir should not result in an archive."""
|
|
self.mdir = SimpleMaildir()
|
|
archivemail.archive(self.mdir.root)
|
|
assert not os.path.exists(self.mdir.root + '_archive.gz')
|
|
|
|
def tearDown(self):
|
|
super(TestEmptyMaildir, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
|
|
class TestMaildir(TestArchiveMailboxdir):
|
|
def setUp(self):
|
|
super(TestMaildir, self).setUp()
|
|
archivemail.options.quiet = True
|
|
|
|
def testOld(self):
|
|
self.make_maildir(True, False, messages=3)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def testNew(self):
|
|
self.make_maildir(False, True, messages=3)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def testMixed(self):
|
|
self.make_maildir(True, True, messages=3)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def testMixedExisting(self):
|
|
self.make_maildir(True, True, messages=3, make_old_archive=True)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
super(TestMaildir, self).tearDown()
|
|
|
|
|
|
class TestMaildirPreserveUnread(TestCaseInTempdir):
|
|
"""Test if the preserve_unread option works with maildirs."""
|
|
def setUp(self):
|
|
super(TestMaildirPreserveUnread, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.preserve_unread = True
|
|
|
|
def testOldRead(self):
|
|
"""--preserve-unread archives old read messages in a maildir."""
|
|
smd = SimpleMaildir("orig")
|
|
msg = make_message(hours_old=24*181)
|
|
smd.write(msg, new=False, flags='S')
|
|
md = mailbox.Maildir(smd.root)
|
|
msg_obj = md.next()
|
|
assert archivemail.should_archive(msg_obj)
|
|
|
|
def testOldUnread(self):
|
|
"""--preserve-unread preserves old unread messages in a maildir."""
|
|
smd = SimpleMaildir("orig")
|
|
msg = make_message(hours_old=24*181)
|
|
smd.write(msg, new=False)
|
|
md = mailbox.Maildir(smd.root)
|
|
msg_obj = md.next()
|
|
assert not archivemail.should_archive(msg_obj)
|
|
|
|
def tearDown(self):
|
|
archivemail.options.quiet = False
|
|
archivemail.options.preserve_unread = False
|
|
super(TestMaildirPreserveUnread, self).tearDown()
|
|
|
|
class TestMaildirAll(TestArchiveMailboxdir):
|
|
def setUp(self):
|
|
super(TestMaildirAll, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.archive_all = True
|
|
|
|
def testNew(self):
|
|
"""New maildir messages should be archived with --all"""
|
|
self.add_messages(hours_old=24*181)
|
|
md = mailbox.Maildir(self.orig_maildir_obj.root)
|
|
msg_obj = md.next()
|
|
assert archivemail.should_archive(msg_obj)
|
|
|
|
def testOld(self):
|
|
"""Old maildir messages should be archived with --all"""
|
|
self.add_messages(hours_old=24*179)
|
|
md = mailbox.Maildir(self.orig_maildir_obj.root)
|
|
msg_obj = md.next()
|
|
assert archivemail.should_archive(msg_obj)
|
|
|
|
def tearDown(self):
|
|
super(TestMaildirAll, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
archivemail.options.archive_all = False
|
|
|
|
class TestMaildirDryRun(TestArchiveMailboxdir):
|
|
def setUp(self):
|
|
super(TestMaildirDryRun, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.dry_run = True
|
|
|
|
def testOld(self):
|
|
"""archiving an old maildir mailbox with the 'dry-run' option"""
|
|
self.make_maildir(True, False)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
super(TestMaildirDryRun, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
archivemail.options.dry_run = False
|
|
|
|
class TestMaildirDelete(TestArchiveMailboxdir):
|
|
def setUp(self):
|
|
super(TestMaildirDelete, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.delete_old_mail = True
|
|
|
|
def testOld(self):
|
|
"""archiving an old maildir mailbox with the 'delete' option"""
|
|
self.make_maildir(True, False)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def testNew(self):
|
|
"""archiving a new maildir mailbox with the 'delete' option"""
|
|
self.make_maildir(False, True)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
super(TestMaildirDelete, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
archivemail.options.delete_old_mail = False
|
|
|
|
class TestMaildirCopy(TestArchiveMailboxdir):
|
|
def setUp(self):
|
|
super(TestMaildirCopy, self).setUp()
|
|
archivemail.options.quiet = True
|
|
archivemail.options.copy_old_mail = True
|
|
|
|
def testOld(self):
|
|
"""archiving an old maildir mailbox with the 'copy' option"""
|
|
self.make_maildir(True, False)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def testNew(self):
|
|
"""archiving a new maildir mailbox with the 'copy' option"""
|
|
self.make_maildir(False, True)
|
|
archivemail.archive(self.maildir)
|
|
self.verify()
|
|
|
|
def tearDown(self):
|
|
super(TestMaildirCopy, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
archivemail.options.copy_old_mail = False
|
|
|
|
class TestArchiveMaildirFlagged(TestCaseInTempdir):
|
|
"""make sure the 'include_flagged' option works with maildir messages"""
|
|
def setUp(self):
|
|
super(TestArchiveMaildirFlagged, self).setUp()
|
|
archivemail.options.include_flagged = False
|
|
archivemail.options.quiet = True
|
|
|
|
def testOld(self):
|
|
"""by default, old flagged maildir messages should not be archived"""
|
|
smd = SimpleMaildir("orig")
|
|
msg = make_message(hours_old=24*181)
|
|
smd.write(msg, new=False, flags='F')
|
|
md = mailbox.Maildir(smd.root)
|
|
msg_obj = md.next()
|
|
assert not archivemail.should_archive(msg_obj)
|
|
|
|
def testIncludeFlaggedNew(self):
|
|
"""new flagged maildir messages should not be archived with include_flagged"""
|
|
smd = SimpleMaildir("orig")
|
|
msg = make_message(hours_old=24*179)
|
|
smd.write(msg, new=False, flags='F')
|
|
md = mailbox.Maildir(smd.root)
|
|
msg_obj = md.next()
|
|
assert not archivemail.should_archive(msg_obj)
|
|
|
|
def testIncludeFlaggedOld(self):
|
|
"""old flagged maildir messages should be archived with include_flagged"""
|
|
archivemail.options.include_flagged = True
|
|
smd = SimpleMaildir("orig")
|
|
msg = make_message(hours_old=24*181)
|
|
smd.write(msg, new=False, flags='F')
|
|
md = mailbox.Maildir(smd.root)
|
|
msg_obj = md.next()
|
|
assert archivemail.should_archive(msg_obj)
|
|
|
|
def tearDown(self):
|
|
super(TestArchiveMaildirFlagged, self).tearDown()
|
|
archivemail.options.include_flagged = False
|
|
archivemail.options.quiet = False
|
|
|
|
class TestArchiveMaildirSize(TestCaseInTempdir):
|
|
"""check that the 'size' argument works with maildir messages"""
|
|
def setUp(self):
|
|
super(TestArchiveMaildirSize, self).setUp()
|
|
archivemail.options.quiet = True
|
|
msg = make_message(hours_old=24*181)
|
|
self.msg_size = len(msg)
|
|
smd = SimpleMaildir("orig")
|
|
smd.write(msg, new=False)
|
|
md = mailbox.Maildir(smd.root)
|
|
self.msg_obj = md.next()
|
|
|
|
def testSmaller(self):
|
|
"""giving a size argument smaller than the maildir message"""
|
|
archivemail.options.min_size = self.msg_size - 1
|
|
assert archivemail.should_archive(self.msg_obj)
|
|
|
|
def testBigger(self):
|
|
"""giving a size argument bigger than the maildir message"""
|
|
archivemail.options.min_size = self.msg_size + 1
|
|
assert not archivemail.should_archive(self.msg_obj)
|
|
|
|
def tearDown(self):
|
|
super(TestArchiveMaildirSize, self).tearDown()
|
|
archivemail.options.quiet = False
|
|
archivemail.options.min_size = None
|
|
|
|
########## helper routines ############
|
|
|
|
def make_message(body=None, default_headers={}, hours_old=None, mkfrom=False, wantobj=False):
|
|
headers = copy.copy(default_headers)
|
|
if not headers:
|
|
headers = {}
|
|
headers['Message-Id'] = make_msgid()
|
|
if not headers.has_key('Date'):
|
|
time_message = time.time() - (60 * 60 * hours_old)
|
|
headers['Date'] = time.asctime(time.localtime(time_message))
|
|
if not headers.has_key('From'):
|
|
headers['From'] = "sender@dummy.domain"
|
|
if not headers.has_key('To'):
|
|
headers['To'] = "receipient@dummy.domain"
|
|
if not headers.has_key('Subject'):
|
|
headers['Subject'] = "This is the subject"
|
|
if mkfrom and not headers.has_key('From_'):
|
|
headers['From_'] = "%s %s" % (headers['From'], headers['Date'])
|
|
if not body:
|
|
body = "This is the message body"
|
|
|
|
msg = ""
|
|
if headers.has_key('From_'):
|
|
msg = msg + ("From %s\n" % headers['From_'])
|
|
del headers['From_']
|
|
for key in headers.keys():
|
|
if headers[key] is not None:
|
|
msg = msg + ("%s: %s\n" % (key, headers[key]))
|
|
msg = msg + "\n\n" + body + "\n\n"
|
|
if not wantobj:
|
|
return msg
|
|
fp = cStringIO.StringIO(msg)
|
|
return rfc822.Message(fp)
|
|
|
|
def append_file(source, dest):
|
|
"""appends the file named 'source' to the file named 'dest'"""
|
|
assert os.path.isfile(source)
|
|
assert os.path.isfile(dest)
|
|
read = open(source, "r")
|
|
write = open(dest, "a+")
|
|
shutil.copyfileobj(read,write)
|
|
read.close()
|
|
write.close()
|
|
|
|
|
|
def make_mbox(body=None, headers=None, hours_old=0, messages=1):
|
|
assert tempfile.tempdir
|
|
fd, name = tempfile.mkstemp()
|
|
file = os.fdopen(fd, "w")
|
|
for count in range(messages):
|
|
msg = make_message(body=body, default_headers=headers,
|
|
mkfrom=True, hours_old=hours_old)
|
|
file.write(msg)
|
|
file.close()
|
|
return name
|
|
|
|
def make_archive_and_plain_copy(archive_name):
|
|
"""Make an mbox archive of the given name like archivemail may have
|
|
created it. Also make an uncompressed copy of this archive and return its
|
|
name."""
|
|
copy_fd, copy_name = tempfile.mkstemp()
|
|
copy_fp = os.fdopen(copy_fd, "w")
|
|
if archivemail.options.no_compress:
|
|
fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
|
|
fp = os.fdopen(fd, "w")
|
|
else:
|
|
archive_name += ".gz"
|
|
fd = os.open(archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT)
|
|
rawfp = os.fdopen(fd, "w")
|
|
fp = gzip.GzipFile(fileobj=rawfp)
|
|
for count in range(3):
|
|
msg = make_message(hours_old=24*360)
|
|
fp.write(msg)
|
|
copy_fp.write(msg)
|
|
fp.close()
|
|
copy_fp.close()
|
|
if not archivemail.options.no_compress:
|
|
rawfp.close()
|
|
return copy_name
|
|
|
|
def copy_maildir(maildir, prefix="tmp"):
|
|
"""Create a copy of the given maildir and return the absolute path of the
|
|
new direcory."""
|
|
newdir = tempfile.mkdtemp(prefix=prefix)
|
|
for d in "cur", "new", "tmp":
|
|
shutil.copytree(os.path.join(maildir, d), os.path.join(newdir, d))
|
|
return newdir
|
|
|
|
def assertEqualContent(firstfile, secondfile, zippedfirst=False):
|
|
"""Verify that the two files exist and have identical content. If zippedfirst
|
|
is True, assume that firstfile is gzip-compressed."""
|
|
assert os.path.exists(firstfile)
|
|
assert os.path.exists(secondfile)
|
|
if zippedfirst:
|
|
try:
|
|
fp1 = gzip.GzipFile(firstfile, "r")
|
|
fp2 = open(secondfile, "r")
|
|
assert cmp_fileobj(fp1, fp2)
|
|
finally:
|
|
fp1.close()
|
|
fp2.close()
|
|
else:
|
|
assert filecmp.cmp(firstfile, secondfile, shallow=0)
|
|
|
|
def cmp_fileobj(fp1, fp2):
|
|
"""Return if reading the fileobjects yields identical content."""
|
|
bufsize = 8192
|
|
while True:
|
|
b1 = fp1.read(bufsize)
|
|
b2 = fp2.read(bufsize)
|
|
if b1 != b2:
|
|
return False
|
|
if not b1:
|
|
return True
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|