test suite: define and use assertEqualContent() to compare files
This eliminates a lot of copy-and-paste code, and switches from os.system("gzip <...>") to gzip.GzipFile.
This commit is contained in:
parent
9d9f13a440
commit
8c6f4b99c1
|
@ -58,6 +58,7 @@ import stat
|
|||
import tempfile
|
||||
import time
|
||||
import unittest
|
||||
import gzip
|
||||
|
||||
try:
|
||||
import archivemail
|
||||
|
@ -413,11 +414,7 @@ class TestArchiveMbox(TestCaseInTempdir):
|
|||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testOldFromInBody(self):
|
||||
"""archiving an old mailbox with 'From ' in the body"""
|
||||
|
@ -431,11 +428,7 @@ This is after the ^From line"""
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testDateSystem(self):
|
||||
"""test that the --date option works as expected"""
|
||||
|
@ -470,11 +463,7 @@ This is after the ^From line"""
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
for option in ('--date=2000-07-27', '-D2000-07-27',
|
||||
'--date="27 Jul 2000"', '--date="27 July 2000"'):
|
||||
self.mbox_name = make_mbox(messages=3, headers=headers)
|
||||
|
@ -482,8 +471,7 @@ This is after the ^From line"""
|
|||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
run = "./archivemail.py -q %s %s" % (option, self.mbox_name)
|
||||
self.assertEqual(os.system(run), 0)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -502,14 +490,9 @@ This is after the ^From line"""
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.new_mbox)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
|
||||
assertEqualContent(archive_name, self.old_mbox, zipfirst=True)
|
||||
|
||||
def testNew(self):
|
||||
"""archiving a new mailbox"""
|
||||
|
@ -525,8 +508,7 @@ This is after the ^From line"""
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
|
@ -556,11 +538,7 @@ This is after the ^From line"""
|
|||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testOldWeirdHeaders(self):
|
||||
"""archiving old mailboxes with weird headers"""
|
||||
|
@ -611,11 +589,7 @@ This is after the ^From line"""
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def tearDown(self):
|
||||
archivemail.options.quiet = 0
|
||||
|
@ -759,11 +733,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir):
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testOldUnread(self):
|
||||
"""archiving an unread mailbox should not create an archive"""
|
||||
|
@ -782,8 +752,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -829,11 +798,7 @@ class TestArchiveMboxSuffix(TestCaseInTempdir):
|
|||
time.localtime(parsed_suffix_time))
|
||||
|
||||
archive_name = self.mbox_name + parsed_suffix + ".gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = re.sub("\.gz$", "", archive_name)
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
os.remove(archive_name)
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -865,8 +830,7 @@ class TestArchiveDryRun(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -902,11 +866,7 @@ class TestArchiveDays(TestCaseInTempdir):
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testNew(self):
|
||||
"""specifying the 'days' option on a newer mailbox"""
|
||||
|
@ -925,8 +885,7 @@ class TestArchiveDays(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -961,8 +920,7 @@ class TestArchiveDelete(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -981,8 +939,7 @@ class TestArchiveDelete(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.new_mbox)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -1035,11 +992,8 @@ class TestArchiveCopy(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
# mbox must not have changed:
|
||||
assert(filecmp.cmp(self.mbox_name, self.mbox_backup_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.mbox_backup_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
# There is no archive.
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
def testMixed(self):
|
||||
|
@ -1059,16 +1013,9 @@ class TestArchiveCopy(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
# mbox must not have changed:
|
||||
assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0))
|
||||
# archive has the old messages.
|
||||
assertEqualContent(self.mbox_name, self.mbox_backup_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
|
||||
assertEqualContent(archive_name, self.old_mbox, zipfirst=True)
|
||||
|
||||
def testOld(self):
|
||||
"""archiving an old mailbox with the 'copy' option"""
|
||||
|
@ -1083,16 +1030,9 @@ class TestArchiveCopy(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
# mbox must not have changed:
|
||||
assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0))
|
||||
# archive has the old messages.
|
||||
assertEqualContent(self.mbox_name, self.mbox_backup_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.mbox_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.mbox_name, zipfirst=True)
|
||||
|
||||
def tearDown(self):
|
||||
archivemail.options.copy_old_mail = 0
|
||||
|
@ -1121,8 +1061,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -1142,8 +1081,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -1166,11 +1104,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir):
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def tearDown(self):
|
||||
archivemail.options.include_flagged = 0
|
||||
|
@ -1208,11 +1142,7 @@ class TestArchiveMboxOutputDir(TestCaseInTempdir):
|
|||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.dir_name + "/" + \
|
||||
os.path.basename(self.mbox_name) + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = re.sub(".gz$", "", archive_name)
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def tearDown(self):
|
||||
archivemail.options.quiet = 0
|
||||
|
@ -1252,8 +1182,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
|
|||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name)
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def testNew(self):
|
||||
|
@ -1271,8 +1200,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
|
@ -1295,11 +1223,9 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.new_mbox)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
|
||||
assertEqualContent(archive_name, self.old_mbox)
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def testOldExists(self):
|
||||
|
@ -1325,8 +1251,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir):
|
|||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name)
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -1364,11 +1289,7 @@ class TestArchiveSize(TestCaseInTempdir):
|
|||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(archive_name, self.copy_name, zipfirst=True)
|
||||
|
||||
def testBigger(self):
|
||||
"""giving a size argument bigger than the message"""
|
||||
|
@ -1390,8 +1311,7 @@ class TestArchiveSize(TestCaseInTempdir):
|
|||
self.assertEqual(os.system(run), 0)
|
||||
else:
|
||||
sys.exit(1)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
assertEqualContent(self.mbox_name, self.copy_name)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
|
@ -1506,6 +1426,32 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1):
|
|||
file.close()
|
||||
return name
|
||||
|
||||
def assertEqualContent(firstfile, secondfile, zipfirst=False):
|
||||
"""Verify that the two files exist and have identical content. If zipfirst
|
||||
is True, assume that firstfile is gzip-compressed."""
|
||||
assert(os.path.exists(firstfile))
|
||||
assert(os.path.exists(secondfile))
|
||||
if zipfirst:
|
||||
try:
|
||||
fp1 = gzip.GzipFile(firstfile, "r")
|
||||
fp2 = open(secondfile, "r")
|
||||
assert(cmp_fileobj(fp1, fp2))
|
||||
finally:
|
||||
fp1.close()
|
||||
fp2.close()
|
||||
else:
|
||||
assert(filecmp.cmp(firstfile, secondfile, shallow=0))
|
||||
|
||||
def cmp_fileobj(fp1, fp2):
|
||||
"""Return if reading the fileobjects yields identical content."""
|
||||
bufsize = 8192
|
||||
while True:
|
||||
b1 = fp1.read(bufsize)
|
||||
b2 = fp2.read(bufsize)
|
||||
if b1 != b2:
|
||||
return False
|
||||
if not b1:
|
||||
return True
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue