diff --git a/test_archivemail.py b/test_archivemail.py index 01f2c01..4fa827b 100755 --- a/test_archivemail.py +++ b/test_archivemail.py @@ -58,6 +58,7 @@ import stat import tempfile import time import unittest +import gzip try: import archivemail @@ -413,11 +414,7 @@ class TestArchiveMbox(TestCaseInTempdir): new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testOldFromInBody(self): """archiving an old mailbox with 'From ' in the body""" @@ -431,11 +428,7 @@ This is after the ^From line""" assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testDateSystem(self): """test that the --date option works as expected""" @@ -470,11 +463,7 @@ This is after the ^From line""" assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) for option in ('--date=2000-07-27', '-D2000-07-27', '--date="27 Jul 2000"', '--date="27 July 2000"'): self.mbox_name = make_mbox(messages=3, headers=headers) @@ -482,8 +471,7 @@ This is after the ^From line""" shutil.copyfile(self.mbox_name, self.copy_name) run = "./archivemail.py -q %s %s" % (option, self.mbox_name) self.assertEqual(os.system(run), 0) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -502,14 +490,9 @@ This is after the ^From line""" self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0)) + assertEqualContent(self.mbox_name, self.new_mbox) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0)) + assertEqualContent(archive_name, self.old_mbox, zipfirst=True) def testNew(self): """archiving a new mailbox""" @@ -525,8 +508,7 @@ This is after the ^From line""" self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive.gz" @@ -556,11 +538,7 @@ This is after the ^From line""" new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testOldWeirdHeaders(self): """archiving old mailboxes with weird headers""" @@ -611,11 +589,7 @@ This is after the ^From line""" assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def tearDown(self): archivemail.options.quiet = 0 @@ -759,11 +733,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir): assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testOldUnread(self): """archiving an unread mailbox should not create an archive""" @@ -782,8 +752,7 @@ class TestArchiveMboxPreserveStatus(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -829,11 +798,7 @@ class TestArchiveMboxSuffix(TestCaseInTempdir): time.localtime(parsed_suffix_time)) archive_name = self.mbox_name + parsed_suffix + ".gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = re.sub("\.gz$", "", archive_name) - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) os.remove(archive_name) def tearDown(self): @@ -865,8 +830,7 @@ class TestArchiveDryRun(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -902,11 +866,7 @@ class TestArchiveDays(TestCaseInTempdir): assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testNew(self): """specifying the 'days' option on a newer mailbox""" @@ -925,8 +885,7 @@ class TestArchiveDays(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -961,8 +920,7 @@ class TestArchiveDelete(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -981,8 +939,7 @@ class TestArchiveDelete(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0)) + assertEqualContent(self.mbox_name, self.new_mbox) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -1035,11 +992,8 @@ class TestArchiveCopy(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - # mbox must not have changed: - assert(filecmp.cmp(self.mbox_name, self.mbox_backup_name, shallow=0)) + assertEqualContent(self.mbox_name, self.mbox_backup_name) archive_name = self.mbox_name + "_archive.gz" - # There is no archive. assert(not os.path.exists(archive_name)) def testMixed(self): @@ -1059,16 +1013,9 @@ class TestArchiveCopy(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - # mbox must not have changed: - assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0)) - # archive has the old messages. + assertEqualContent(self.mbox_name, self.mbox_backup_name) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0)) + assertEqualContent(archive_name, self.old_mbox, zipfirst=True) def testOld(self): """archiving an old mailbox with the 'copy' option""" @@ -1083,16 +1030,9 @@ class TestArchiveCopy(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - # mbox must not have changed: - assert(filecmp.cmp(self.mbox_backup_name, self.mbox_name, shallow=0)) - # archive has the old messages. + assertEqualContent(self.mbox_name, self.mbox_backup_name) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.mbox_name, shallow=0)) + assertEqualContent(archive_name, self.mbox_name, zipfirst=True) def tearDown(self): archivemail.options.copy_old_mail = 0 @@ -1121,8 +1061,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -1142,8 +1081,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -1166,11 +1104,7 @@ class TestArchiveMboxFlagged(TestCaseInTempdir): assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def tearDown(self): archivemail.options.include_flagged = 0 @@ -1208,11 +1142,7 @@ class TestArchiveMboxOutputDir(TestCaseInTempdir): self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.dir_name + "/" + \ os.path.basename(self.mbox_name) + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = re.sub(".gz$", "", archive_name) - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def tearDown(self): archivemail.options.quiet = 0 @@ -1252,8 +1182,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir): new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name) assert(not os.path.exists(archive_name + ".gz")) def testNew(self): @@ -1271,8 +1200,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive" @@ -1295,11 +1223,9 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.new_mbox, self.mbox_name, shallow=0)) + assertEqualContent(self.mbox_name, self.new_mbox) archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0)) + assertEqualContent(archive_name, self.old_mbox) assert(not os.path.exists(archive_name + ".gz")) def testOldExists(self): @@ -1325,8 +1251,7 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir): new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(self.mbox_mode, new_mode) archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name) assert(not os.path.exists(archive_name + ".gz")) def tearDown(self): @@ -1364,11 +1289,7 @@ class TestArchiveSize(TestCaseInTempdir): assert(os.path.exists(self.mbox_name)) self.assertEqual(os.path.getsize(self.mbox_name), 0) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(archive_name)) - self.assertEqual(os.system("gzip -d %s" % archive_name), 0) - archive_name = self.mbox_name + "_archive" - assert(os.path.exists(archive_name)) - assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) + assertEqualContent(archive_name, self.copy_name, zipfirst=True) def testBigger(self): """giving a size argument bigger than the message""" @@ -1390,8 +1311,7 @@ class TestArchiveSize(TestCaseInTempdir): self.assertEqual(os.system(run), 0) else: sys.exit(1) - assert(os.path.exists(self.mbox_name)) - assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0)) + assertEqualContent(self.mbox_name, self.copy_name) archive_name = self.mbox_name + "_archive.gz" assert(not os.path.exists(archive_name)) @@ -1506,6 +1426,32 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1): file.close() return name +def assertEqualContent(firstfile, secondfile, zipfirst=False): + """Verify that the two files exist and have identical content. If zipfirst + is True, assume that firstfile is gzip-compressed.""" + assert(os.path.exists(firstfile)) + assert(os.path.exists(secondfile)) + if zipfirst: + try: + fp1 = gzip.GzipFile(firstfile, "r") + fp2 = open(secondfile, "r") + assert(cmp_fileobj(fp1, fp2)) + finally: + fp1.close() + fp2.close() + else: + assert(filecmp.cmp(firstfile, secondfile, shallow=0)) + +def cmp_fileobj(fp1, fp2): + """Return if reading the fileobjects yields identical content.""" + bufsize = 8192 + while True: + b1 = fp1.read(bufsize) + b2 = fp2.read(bufsize) + if b1 != b2: + return False + if not b1: + return True if __name__ == "__main__": unittest.main()