diff --git a/test_archivemail.py b/test_archivemail.py index a65f6cc..3615712 100755 --- a/test_archivemail.py +++ b/test_archivemail.py @@ -473,12 +473,81 @@ class TestParseIMAPUrl(unittest.TestCase): ########## acceptance testing ########### -class TestArchiveMbox(TestCaseInTempdir): +class TestArchive(TestCaseInTempdir): + """Base class defining helper functions for doing test archiving runs.""" + mbox = None + good_archive = good_mbox = None + + def verify(self): + assert(os.path.exists(self.mbox)) + if self.good_mbox is not None: + assertEqualContent(self.mbox, self.good_mbox) + else: + self.assertEqual(os.path.getsize(self.mbox), 0) + archive_name = self.mbox + "_archive" + if not archivemail.options.no_compress: + archive_name += ".gz" + iszipped = True + else: + assert(not os.path.exists(archive_name + ".gz")) + iszipped = False + if self.good_archive is not None: + assertEqualContent(archive_name, self.good_archive, iszipped) + else: + assert(not os.path.exists(archive_name)) + + def make_old_mbox(self, body=None, headers=None, messages=1, old_archive=False): + """Make an old mbox, optionally an existing archive, and make a reference + archive like it should look like after archivemail has run.""" + self.mbox = make_mbox(body, headers, 181*24, messages) + archive_changes = not (archivemail.options.dry_run or + archivemail.options.delete_old_mail) + mbox_no_changes = archivemail.options.dry_run or archivemail.options.copy_old_mail + if old_archive: + self.good_archive = make_old_archive(self.mbox) + if archive_changes: + append_file(self.mbox, self.good_archive) + if mbox_no_changes: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + return + if archive_changes: + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + if mbox_no_changes: + self.good_mbox = self.good_archive + elif mbox_no_changes: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + + def make_mixed_mbox(self, body=None, headers=None, messages=1, old_archive=False): + """Make a mixed mbox, optionally an existing archive, a reference archive + like it should look like after archivemail has run, and a reference mbox + like it should remain after archivemail has run.""" + self.make_old_mbox(body, headers, messages=messages, old_archive=old_archive) + new_mbox_name = make_mbox(body, headers, 179*24, messages) + append_file(new_mbox_name, self.mbox) + if self.good_mbox is None: + self.good_mbox = new_mbox_name + else: + if self.good_mbox == self.good_archive: + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + else: + append_file(new_mbox_name, self.good_mbox) + + def make_new_mbox(self, body=None, headers=None, messages=1, old_archive=False): + """Make a new mbox, optionally an exiting archive, and a reference mbox + like it should remain after archivemail has run.""" + self.mbox = make_mbox(body, headers, 179*24, messages) + self.good_mbox = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_mbox) + if old_archive: + self.good_archive = make_old_archive(self.mbox) + + +class TestArchiveMbox(TestArchive): """archiving should work based on the date of messages given""" - old_mbox = None - new_mbox = None - copy_name = None - mbox_name = None def setUp(self): self.oldopts = copy.copy(archivemail.options) @@ -487,31 +556,18 @@ class TestArchiveMbox(TestCaseInTempdir): def testOld(self): """archiving an old mailbox""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.copy_name, zipfirst=True) + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testOldFromInBody(self): """archiving an old mailbox with 'From ' in the body""" body = """This is a message with ^From at the start of a line From is on this line This is after the ^From line""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181), body=body) - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.copy_name, zipfirst=True) + self.make_old_mbox(messages=3, body=body) + archivemail.archive(self.mbox) + self.verify() def testDateSystem(self): """test that the --date option works as expected""" @@ -546,49 +602,21 @@ This is after the ^From line""" def testMixed(self): """archiving a mixed mailbox""" - self.new_mbox = make_mbox(messages=3, hours_old=(24 * 179)) - self.old_mbox = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_name = tempfile.mkstemp()[1] - shutil.copyfile(self.new_mbox, self.mbox_name) - append_file(self.old_mbox, self.mbox_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.new_mbox) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.old_mbox, zipfirst=True) + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testNew(self): """archiving a new mailbox""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.copy_name) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) - + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testOldExisting(self): """archiving an old mailbox with an existing archive""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - archive_name = self.mbox_name + "_archive.gz" - shutil.copyfile(self.mbox_name, self.copy_name) - fp1 = open(self.mbox_name, "r") - fp2 = gzip.GzipFile(archive_name, "w") - shutil.copyfileobj(fp1, fp2) # archive has 3 msgs - fp2.close() - fp1.close() - append_file(self.mbox_name, self.copy_name) # copy now has 6 msgs - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - assertEqualContent(archive_name, self.copy_name, zipfirst=True) + self.make_old_mbox(messages=3, old_archive=True) + archivemail.archive(self.mbox) + self.verify() def testOldWeirdHeaders(self): """archiving old mailboxes with weird headers""" @@ -631,19 +659,16 @@ This is after the ^From line""" 'Resent-Date' : '', }, ) - fd, self.mbox_name = tempfile.mkstemp() + fd, self.mbox = tempfile.mkstemp() fp = os.fdopen(fd, "w") for headers in weird_headers: msg_text = make_message(default_headers=headers) fp.write(msg_text*2) fp.close() - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.copy_name, zipfirst=True) + self.good_archive = tempfile.mkstemp()[1] + shutil.copyfile(self.mbox, self.good_archive) + archivemail.archive(self.mbox) + self.verify() def tearDown(self): archivemail.options = self.oldopts @@ -655,42 +680,24 @@ class TestArchiveMboxTimestamp(TestCaseInTempdir): def setUp(self): super(TestArchiveMboxTimestamp, self).setUp() archivemail.options.quiet = 1 + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 180)) + self.mtime = os.path.getmtime(self.mbox_name) - 66 + self.atime = os.path.getatime(self.mbox_name) - 88 + os.utime(self.mbox_name, (self.atime, self.mtime)) def testNew(self): """mbox timestamps should not change after no archival""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) - self.mtime = os.path.getmtime(self.mbox_name) - 66 - self.atime = os.path.getatime(self.mbox_name) - 88 - os.utime(self.mbox_name, (self.atime, self.mtime)) + archivemail.options.days_old_max = 181 archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - new_atime = os.path.getatime(self.mbox_name) - new_mtime = os.path.getmtime(self.mbox_name) - self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) - self.assertAlmostEqual(self.atime, new_atime, utimes_precision) - - def testMixed(self): - """mbox timestamps should not change after semi-archival""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mtime = os.path.getmtime(self.mbox_name) - 66 - self.atime = os.path.getatime(self.mbox_name) - 88 - os.utime(self.mbox_name, (self.atime, self.mtime)) - archive_name = self.mbox_name + "_archive.gz" - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - new_atime = os.path.getatime(self.mbox_name) - new_mtime = os.path.getmtime(self.mbox_name) - self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) - self.assertAlmostEqual(self.atime, new_atime, utimes_precision) + self.verify() def testOld(self): """mbox timestamps should not change after archival""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mtime = os.path.getmtime(self.mbox_name) - 66 - self.atime = os.path.getatime(self.mbox_name) - 88 - os.utime(self.mbox_name, (self.atime, self.mtime)) - archive_name = self.mbox_name + "_archive.gz" + archivemail.options.days_old_max = 179 archivemail.archive(self.mbox_name) + self.verify() + + def verify(self): assert(os.path.exists(self.mbox_name)) new_atime = os.path.getatime(self.mbox_name) new_mtime = os.path.getmtime(self.mbox_name) @@ -699,6 +706,8 @@ class TestArchiveMboxTimestamp(TestCaseInTempdir): def tearDown(self): archivemail.options.quiet = 0 + archivemail.options.days_old_max = 180 + os.remove(self.mbox_name) super(TestArchiveMboxTimestamp, self).tearDown() @@ -767,7 +776,7 @@ class TestArchiveMboxSuffix(unittest.TestCase): archivemail.options.archive_suffix = self.old_suffix -class TestArchiveDryRun(TestCaseInTempdir): +class TestArchiveDryRun(TestArchive): """make sure the 'dry-run' option works""" def setUp(self): super(TestArchiveDryRun, self).setUp() @@ -776,13 +785,9 @@ class TestArchiveDryRun(TestCaseInTempdir): def testOld(self): """archiving an old mailbox with the 'dry-run' option""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.copy_name) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def tearDown(self): archivemail.options.dry_run = 0 @@ -790,13 +795,8 @@ class TestArchiveDryRun(TestCaseInTempdir): super(TestArchiveDryRun, self).tearDown() -class TestArchiveDelete(TestCaseInTempdir): +class TestArchiveDelete(TestArchive): """make sure the 'delete' option works""" - old_mbox = None - new_mbox = None - copy_name = None - mbox_name = None - def setUp(self): super(TestArchiveDelete, self).setUp() archivemail.options.quiet = 1 @@ -804,36 +804,21 @@ class TestArchiveDelete(TestCaseInTempdir): def testNew(self): """archiving a new mailbox with the 'delete' option""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.copy_name) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testMixed(self): """archiving a mixed mailbox with the 'delete' option""" - self.new_mbox = make_mbox(messages=3, hours_old=(24 * 179)) - self.old_mbox = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_name = tempfile.mkstemp()[1] - shutil.copyfile(self.new_mbox, self.mbox_name) - append_file(self.old_mbox, self.mbox_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.new_mbox) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testOld(self): """archiving an old mailbox with the 'delete' option""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def tearDown(self): archivemail.options.delete_old_mail = 0 @@ -841,13 +826,8 @@ class TestArchiveDelete(TestCaseInTempdir): super(TestArchiveDelete, self).tearDown() -class TestArchiveCopy(TestCaseInTempdir): +class TestArchiveCopy(TestArchive): """make sure the 'copy' option works""" - old_mbox = None - new_mbox = None - mbox_backup_name = None - mbox_name = None - def setUp(self): super(TestArchiveCopy, self).setUp() archivemail.options.quiet = 1 @@ -855,38 +835,21 @@ class TestArchiveCopy(TestCaseInTempdir): def testNew(self): """archiving a new mailbox with the 'copy' option""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) - self.mbox_backup_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.mbox_backup_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.mbox_backup_name) - archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) - self.tearDown() + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testMixed(self): """archiving a mixed mailbox with the 'copy' option""" - self.new_mbox = make_mbox(messages=3, hours_old=(24 * 179)) - self.old_mbox = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_name = tempfile.mkstemp()[1] - shutil.copyfile(self.new_mbox, self.mbox_name) - append_file(self.old_mbox, self.mbox_name) - self.mbox_backup_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.mbox_backup_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.mbox_backup_name) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.old_mbox, zipfirst=True) + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testOld(self): """archiving an old mailbox with the 'copy' option""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_backup_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.mbox_backup_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.mbox_backup_name) - archive_name = self.mbox_name + "_archive.gz" - assertEqualContent(archive_name, self.mbox_name, zipfirst=True) + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def tearDown(self): archivemail.options.copy_old_mail = 0 @@ -941,7 +904,7 @@ class TestArchiveMboxOutputDir(unittest.TestCase): archivemail.options.output_dir = None -class TestArchiveMboxUncompressed(TestCaseInTempdir): +class TestArchiveMboxUncompressed(TestArchive): """make sure that the 'no_compress' option works""" mbox_name = None new_mbox = None @@ -955,63 +918,27 @@ class TestArchiveMboxUncompressed(TestCaseInTempdir): def testOld(self): """archiving an old mailbox uncompressed""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - archive_name = self.mbox_name + "_archive" - assertEqualContent(archive_name, self.copy_name) - assert(not os.path.exists(archive_name + ".gz")) + self.make_old_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testNew(self): """archiving a new mailbox uncompressed""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - shutil.copyfile(self.mbox_name, self.copy_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.copy_name) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - archive_name = self.mbox_name + "_archive" - assert(not os.path.exists(archive_name)) - assert(not os.path.exists(archive_name + ".gz")) + self.make_new_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testMixed(self): """archiving a mixed mailbox uncompressed""" - self.new_mbox = make_mbox(messages=3, hours_old=(24 * 179)) - self.old_mbox = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_name = tempfile.mkstemp()[1] - shutil.copyfile(self.new_mbox, self.mbox_name) - append_file(self.old_mbox, self.mbox_name) - archivemail.archive(self.mbox_name) - assertEqualContent(self.mbox_name, self.new_mbox) - archive_name = self.mbox_name + "_archive" - assertEqualContent(archive_name, self.old_mbox) - assert(not os.path.exists(archive_name + ".gz")) + self.make_mixed_mbox(messages=3) + archivemail.archive(self.mbox) + self.verify() def testOldExists(self): """archiving an old mailbox uncopressed with an existing archive""" - self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) - self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.copy_name = tempfile.mkstemp()[1] - archive_name = self.mbox_name + "_archive" - shutil.copyfile(self.mbox_name, self.copy_name) - shutil.copyfile(self.mbox_name, archive_name) # archive has 3 msgs - append_file(self.mbox_name, self.copy_name) # copy now has 6 msgs - archivemail.archive(self.mbox_name) - assert(os.path.exists(self.mbox_name)) - self.assertEqual(os.path.getsize(self.mbox_name), 0) - new_mode = os.stat(self.mbox_name)[stat.ST_MODE] - self.assertEqual(self.mbox_mode, new_mode) - archive_name = self.mbox_name + "_archive" - assertEqualContent(archive_name, self.copy_name) - assert(not os.path.exists(archive_name + ".gz")) + self.make_old_mbox(messages=3, old_archive=True) + archivemail.archive(self.mbox) + self.verify() def tearDown(self): archivemail.options.quiet = 0 @@ -1120,7 +1047,7 @@ def append_file(source, dest): assert(os.path.isfile(dest)) read = open(source, "r") write = open(dest, "a+") - write.writelines(read.readlines()) + shutil.copyfileobj(read,write) read.close() write.close() @@ -1136,6 +1063,31 @@ def make_mbox(body=None, headers=None, hours_old=0, messages=1): file.close() return name +def make_old_archive(mailbox_name): + """Make an mbox archive like if archivemail has already archived the given + mailbox in the past. Also make an uncompressed copy of this archive and + return its name.""" + old_archive_name = archivemail.make_archive_name(mailbox_name) + fd, archive_name = tempfile.mkstemp() + fp = os.fdopen(fd, "w") + if archivemail.options.no_compress: + oldfd = os.open(old_archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + oldfp = os.fdopen(oldfd, "w") + else: + old_archive_name += ".gz" + oldfd = os.open(old_archive_name, os.O_WRONLY|os.O_EXCL|os.O_CREAT) + oldrawfp = os.fdopen(oldfd, "w") + oldfp = gzip.GzipFile(fileobj=oldrawfp) + for count in range(3): + msg = make_message(hours_old=24*360) + oldfp.write(msg) + fp.write(msg) + oldfp.close() + fp.close() + if not archivemail.options.no_compress: + oldrawfp.close() + return archive_name + def assertEqualContent(firstfile, secondfile, zipfirst=False): """Verify that the two files exist and have identical content. If zipfirst is True, assume that firstfile is gzip-compressed."""