From 35a9f14982ba752e26084cadfbef231ac96b1259 Mon Sep 17 00:00:00 2001 From: Paul Rodger Date: Mon, 8 Apr 2002 13:39:03 +0000 Subject: [PATCH] We now preserve the last-accessed and last-modified timestamps correctly. Fixed a bug where lockfiles were being created that were not world-readable. Made archivemail work better when used as a python module so it can integrate better with unittest. Renamed unittest script 'test_archivemail' instead of 'archivemail_test' and added about 20 more tests. --- CHANGELOG | 8 + TODO | 4 +- archivemail.py | 138 ++++++++------ archivemail_test.py | 41 ----- test_archivemail.py | 424 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 521 insertions(+), 94 deletions(-) delete mode 100755 archivemail_test.py create mode 100755 test_archivemail.py diff --git a/CHANGELOG b/CHANGELOG index c19ebba..554a470 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,11 @@ +Version 0.3.0 - ??? + * We now preserve the last-accessed and last-modified timestamps correctly + * Fixed a bug where lockfiles were being created that were not + world-readable + * Made archivemail work better when used as a python module so it can + integrate better with unittest. + * Budled a unit testing script for archivemail. + Version 0.2.1 - 4 April 2002 * Since we might not have a parse-able 'Date-Received' or 'Date' field, use 5 different ways to guess the date of a message. diff --git a/TODO b/TODO index 35a9431..630e310 100644 --- a/TODO +++ b/TODO @@ -1,9 +1,9 @@ Goals for next minor release (0.2.2): ------------------------------------- -* Test exclusive locking works with another test process -* Perserve atime of original mailbox properly * Finish man page +* If a mailbox has a mode of 660, preserve it. + (Especially in /var/spool/mail with groupid of 'mail') Goals for next major release (0.3.0): ------------------------------------- diff --git a/archivemail.py b/archivemail.py index 3eabe4e..965735a 100755 --- a/archivemail.py +++ b/archivemail.py @@ -16,12 +16,19 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ############################################################################ - """ Archive and compress old mail in mbox or maildir-format mailboxes. Website: http://archivemail.sourceforge.net/ """ +# global administrivia +__version__ = "archivemail v0.3.0" +__cvs_id__ = "$Id$" +__copyright__ = """Copyright (C) 2002 Paul Rodger +This is free software; see the source for copying conditions. There is NO +warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.""" + + import sys def check_python_version(): @@ -51,15 +58,6 @@ import string import tempfile import time -# global administrivia -__version__ = "archivemail v0.2.1" -__cvs_id__ = "$Id$" -__copyright__ = """Copyright (C) 2002 Paul Rodger -This is free software; see the source for copying conditions. There is NO -warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.""" - -_stale = None # list of files to delete on abnormal exit - ############## class definitions ############### class Stats: @@ -83,7 +81,8 @@ class Stats: assert(final_archive_name) self.__start_time = time.time() self.__mailbox_name = mailbox_name - self.__archive_name = final_archive_name + _options.compressor_extension + self.__archive_name = final_archive_name + \ + _options.compressor_extension() def another_message(self): """Add one to the internal count of total messages processed""" @@ -138,8 +137,7 @@ class StaleFiles: class Options: """Class to store runtime options, including defaults""" archive_suffix = "_archive" - compressor = None - compressor_extension = None + compressor = "gzip" days_old_max = 180 delete_old_mail = 0 dry_run = 0 @@ -171,6 +169,8 @@ class Options: "warn-duplicate"]) except getopt.error, msg: user_error(msg) + + chosen_compressor = None for o, a in opts: if o == '--delete': self.delete_old_mail = 1 @@ -195,25 +195,22 @@ class Options: print __version__ + "\n\n" + __copyright__ sys.exit(0) if o in ('-z', '--gzip'): - if (self.compressor): + if (chosen_compressor): user_error("conflicting compression options") self.compressor = "gzip" + chosen_compressor = 1 if o in ('-Z', '--compress'): - if (self.compressor): + if (chosen_compressor): user_error("conflicting compression options") self.compressor = "compress" + chosen_compressor = 1 if o in ('-I', '--bzip2'): - if (self.compressor): + if (chosen_compressor): user_error("conflicting compression options") self.compressor = "bzip2" + chosen_compressor = 1 if not self.compressor: self.compressor = "gzip" - extensions = { - "compress" : ".Z", - "gzip" : ".gz", - "bzip2" : ".bz2", - } - self.compressor_extension = extensions[self.compressor] return args def sanity_check(self): @@ -233,6 +230,15 @@ class Options: if (self.days_old_max >= 10000): user_error("argument to -d must be less than 10000") + def compressor_extension(self): + extensions = { + "compress" : ".Z", + "gzip" : ".gz", + "bzip2" : ".bz2", + } + return extensions[self.compressor] + + class Mbox(mailbox.PortableUnixMailbox): """Class that allows read/write access to a 'mbox' mailbox. @@ -240,18 +246,23 @@ class Mbox(mailbox.PortableUnixMailbox): """ mbox_file = None # file handle for the mbox file + original_atime = None # last-accessed timestamp + original_mtime = None # last-modified timestamp - def __init__(self, path_name): + def __init__(self, path, mode="r"): """Constructor for opening an existing 'mbox' mailbox. Extends constructor for mailbox.PortableUnixMailbox() - Arguments: - path_name -- file name of the 'mbox' file to be opened + Named Arguments: + path -- file name of the 'mbox' file to be opened + mode -- mode to open the file in (default is read-only) """ - assert(path_name) + assert(path) try: - self.mbox_file = open(path_name, "r") + self.original_atime = os.path.getatime(path) + self.original_mtime = os.path.getmtime(path) + self.mbox_file = open(path, mode) except IOError, msg: unexpected_error(msg) mailbox.PortableUnixMailbox.__init__(self, self.mbox_file) @@ -266,6 +277,8 @@ class Mbox(mailbox.PortableUnixMailbox): """ assert(msg) + assert(self.mbox_file) + vprint("saving message to file '%s'" % self.mbox_file.name) unix_from = msg.unixfrom if not unix_from: @@ -300,6 +313,14 @@ class Mbox(mailbox.PortableUnixMailbox): vprint("closing file '%s'" % self.mbox_file.name) self.mbox_file.close() + def reset_time_stamps(self): + """Set the file timestamps to the original value""" + assert(self.original_atime) + assert(self.original_mtime) + assert(self.mbox_file.name) + os.utime(self.mbox_file.name, (self.original_atime, \ + self.original_mtime)) + def exclusive_lock(self): """Set an advisory lock on the 'mbox' mailbox""" vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name) @@ -322,9 +343,11 @@ class Mbox(mailbox.PortableUnixMailbox): unexpected_error("Giving up waiting for procmail lock '%s'" % lock_name) vprint("writing lockfile '%s'" % lock_name) + old_umask = os.umask(022) # is this dodgy? lock = open(lock_name, "w") _stale.procmail_lock = lock_name lock.close() + old_umask = os.umask(old_umask) def procmail_unlock(self): """Delete the procmail lockfile on the 'mbox' mailbox""" @@ -342,11 +365,9 @@ class Mbox(mailbox.PortableUnixMailbox): completely deleted.""" assert(os.path.isfile(self.mbox_file.name)) vprint("turning '%s' into a zero-length file" % self.mbox_file.name) - atime = os.path.getatime(self.mbox_file.name) mtime = os.path.getmtime(self.mbox_file.name) blank_file = open(self.mbox_file.name, "w") blank_file.close() - os.utime(self.mbox_file.name, (atime, mtime)) # to original timestamps class RetainMbox(Mbox): @@ -377,8 +398,14 @@ class RetainMbox(Mbox): """Overwrite the original mailbox with this temporary mailbox.""" assert(self.__final_name) self.close() + + # make sure that the retained mailbox has the same timestamps and + # permission as the original mailbox atime = os.path.getatime(self.__final_name) mtime = os.path.getmtime(self.__final_name) + mode = os.stat(self.__final_name)[stat.ST_MODE] + os.chmod(self.mbox_file.name, mode) + vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name)) os.rename(self.mbox_file.name, self.__final_name) os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps @@ -414,7 +441,7 @@ class ArchiveMbox(Mbox): """ assert(final_name) compressor = _options.compressor - compressedfilename = final_name + _options.compressor_extension + compressedfilename = final_name + _options.compressor_extension() if os.path.isfile(final_name): unexpected_error("""There is already a file named '%s'! @@ -445,13 +472,14 @@ manually, and try running me again.""" % final_name) self.close() compressor = _options.compressor compressed_archive_name = self.mbox_file.name + \ - _options.compressor_extension + _options.compressor_extension() compress = compressor + " " + self.mbox_file.name vprint("running compressor: '%s'" % compress) _stale.compressed_archive = compressed_archive_name system_or_die(compress) _stale.archive = None - compressed_final_name = self.__final_name + _options.compressor_extension + compressed_final_name = self.__final_name + \ + _options.compressor_extension() vprint("renaming '%s' to '%s'" % (compressed_archive_name, compressed_final_name)) os.rename(compressed_archive_name, compressed_final_name) @@ -478,6 +506,7 @@ class IdentityCache: # global class instances _options = Options() # the run-time options object +_stale = StaleFiles() # remember what we have to delete on abnormal exit def main(args = sys.argv[1:]): @@ -519,17 +548,6 @@ Website: http://archivemail.sourceforge.net/ """ % \ sys.exit(1) _options.sanity_check() - os.umask(077) # saves setting permissions on mailboxes/tempfiles - - # Make sure we clean up nicely - we don't want to leave stale procmail - # lockfiles about if something bad happens to us. This is quite - # important, even though procmail will delete stale files after a while. - _stale = StaleFiles() # remember what we have to delete - atexit.register(clean_up) # delete stale files on exceptions/normal exit - signal.signal(signal.SIGHUP, clean_up_signal) # signal 1 - # SIGINT (signal 2) is handled as a python exception - signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3 - signal.signal(signal.SIGTERM, clean_up_signal) # signal 15 for mailbox_path in args: archive(mailbox_path) @@ -599,7 +617,7 @@ def guess_delivery_time(message): if date: try: time_message = time.mktime(date) - assert(time_message, 'time.mktime() returned false') + assert(time_message) vprint("using valid time found from '%s' header" % header) return time_message except (ValueError, OverflowError): pass @@ -612,7 +630,7 @@ def guess_delivery_time(message): if date: try: time_message = time.mktime(date) - assert(time_message, 'time.mktime() returned false') + assert(time_message) vprint("using valid time found from unix 'From_' header") return time_message except (ValueError, OverflowError): pass @@ -634,7 +652,7 @@ def guess_delivery_time(message): return time_message -def is_too_old(time_message): +def is_too_old(time_message, max_days): """Return true if a message is too old (and should be archived), false otherwise. @@ -643,12 +661,14 @@ def is_too_old(time_message): since the epoch """ - assert(time_message) + assert(time_message > 0) + assert(max_days >= 1) + time_now = time.time() if time_message > time_now: vprint("warning: message has date in the future") return 0 - secs_old_max = (_options.days_old_max * 24 * 60 * 60) + secs_old_max = (max_days * 24 * 60 * 60) days_old = (time_now - time_message) / 24 / 60 / 60 vprint("message is %.2f days old" % days_old) if ((time_message + secs_old_max) < time_now): @@ -670,6 +690,9 @@ def archive(mailbox_name): """ assert(mailbox_name) + set_signal_handlers() + os.umask(077) # saves setting permissions on mailboxes/tempfiles + final_archive_name = mailbox_name + _options.archive_suffix if _options.output_dir: final_archive_name = os.path.join(_options.output_dir, @@ -730,7 +753,7 @@ def _archive_mbox(mailbox_name, final_archive_name): archive = None retain = None stats = Stats(mailbox_name, final_archive_name) - original = Mbox(mailbox_name) + original = Mbox(path=mailbox_name) cache = IdentityCache(mailbox_name) original.procmail_lock() @@ -742,7 +765,7 @@ def _archive_mbox(mailbox_name, final_archive_name): if _options.warn_duplicates: cache.warn_if_dupe(msg) time_message = guess_delivery_time(msg) - if is_too_old(time_message): + if is_too_old(time_message, _options.days_old_max): stats.another_archived() if _options.delete_old_mail: vprint("decision: delete message") @@ -762,6 +785,7 @@ def _archive_mbox(mailbox_name, final_archive_name): vprint("finished reading messages") original.exclusive_unlock() original.close() + original.reset_time_stamps() if not _options.dry_run: if retain: retain.close() if archive: archive.close() @@ -772,6 +796,7 @@ def _archive_mbox(mailbox_name, final_archive_name): else: # nothing was retained - everything was deleted original.leave_empty() + original.reset_time_stamps() elif archive: archive.finalise() if retain: @@ -779,6 +804,7 @@ def _archive_mbox(mailbox_name, final_archive_name): else: # nothing was retained - everything was deleted original.leave_empty() + original.reset_time_stamps() else: # There was nothing to archive if retain: @@ -816,7 +842,7 @@ def _archive_dir(mailbox_name, final_archive_name, type): if _options.warn_duplicates: cache.warn_if_dupe(msg) time_message = guess_delivery_time(msg) - if is_too_old(time_message): + if is_too_old(time_message, _options.days_old_max): stats.another_archived() if _options.delete_old_mail: vprint("decision: delete message") @@ -868,6 +894,16 @@ def choose_temp_dir(mailbox_name): return temp_dir +def set_signal_handlers(): + # Make sure we clean up nicely - we don't want to leave stale procmail + # lockfiles about if something bad happens to us. This is quite + # important, even though procmail will delete stale files after a while. + atexit.register(clean_up) # delete stale files on exceptions/normal exit + signal.signal(signal.SIGHUP, clean_up_signal) # signal 1 + # SIGINT (signal 2) is handled as a python exception + signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3 + signal.signal(signal.SIGTERM, clean_up_signal) # signal 15 + def clean_up(): """Delete stale files -- to be registered with atexit.register()""" vprint("cleaning up ...") diff --git a/archivemail_test.py b/archivemail_test.py deleted file mode 100755 index a412dae..0000000 --- a/archivemail_test.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python - -import archivemail -import os -import tempfile -import unittest - -class TempfileTestCase(unittest.TestCase): - def setUp(self): - self.output_dir = tempfile.mktemp() - os.mkdir(self.output_dir) - self.sub_dir = tempfile.mktemp() - os.mkdir(self.sub_dir) - - def testCurrentDir(self): - archivemail._options.output_dir = None - dir = archivemail.choose_temp_dir("dummy") - self.assertEqual(dir, os.curdir) - - def testSubDir(self): - archivemail._options.output_dir = None - dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) - self.assertEqual(dir, self.sub_dir) - - def testOutputDir(self): - archivemail._options.output_dir = self.output_dir - dir = archivemail.choose_temp_dir("dummy") - self.assertEqual(dir, self.output_dir) - - def testSubDirOutputDir(self): - archivemail._options.output_dir = self.output_dir - dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) - self.assertEqual(dir, self.output_dir) - - def tearDown(self): - os.rmdir(self.output_dir) - os.rmdir(self.sub_dir) - - -if __name__ == "__main__": - unittest.main() diff --git a/test_archivemail.py b/test_archivemail.py new file mode 100755 index 0000000..df1677b --- /dev/null +++ b/test_archivemail.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python +############################################################################ +# Copyright (C) 2002 Paul Rodger +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +############################################################################ +""" +Test archivemail works correctly using 'pyunit'. +""" + +import fcntl +import filecmp +import os +import shutil +import stat +import tempfile +import time +import unittest + +import archivemail + +__version__ = """$Id$""" + +############ Mbox Class testing ############## + +class TestMboxIsEmpty(unittest.TestCase): + def setUp(self): + self.empty_name = make_mbox(messages=0) + self.not_empty_name = make_mbox(messages=1) + + def testEmpty(self): + mbox = archivemail.Mbox(self.empty_name) + assert(mbox.is_empty()) + + def testNotEmpty(self): + mbox = archivemail.Mbox(self.not_empty_name) + assert(not mbox.is_empty()) + + def tearDown(self): + if os.path.exists(self.empty_name): + os.remove(self.empty_name) + if os.path.exists(self.not_empty_name): + os.remove(self.not_empty_name) + + +class TestMboxLeaveEmpty(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox() + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.mbox = archivemail.Mbox(self.mbox_name) + + def testLeaveEmpty(self): + self.mbox.leave_empty() + assert(os.path.isfile(self.mbox_name)) + self.assertEqual(os.path.getsize(self.mbox_name), 0) + new_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.assertEqual(new_mode, self.mbox_mode) + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + + +class TestMboxProcmailLock(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox() + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.mbox = archivemail.Mbox(self.mbox_name) + + def testProcmailLock(self): + lock = self.mbox_name + ".lock" + self.mbox.procmail_lock() + assert(os.path.isfile(lock)) + assert(is_world_readable(lock)) + self.mbox.procmail_unlock() + assert(not os.path.isfile(lock)) + + # TODO: add a test where the lock already exists + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + + +class TestMboxRemove(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox() + self.mbox = archivemail.Mbox(self.mbox_name) + + def testProcmailLock(self): + assert(os.path.exists(self.mbox_name)) + self.mbox.remove() + assert(not os.path.exists(self.mbox_name)) + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + + +class TestMboxExclusiveLock(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox() + self.mbox = archivemail.Mbox(self.mbox_name) + + def testExclusiveLock(self): + self.mbox.exclusive_lock() + file = open(self.mbox_name, "r+") + lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB + self.assertRaises(IOError, fcntl.flock, file, lock_nb) + + self.mbox.exclusive_unlock() + fcntl.flock(file, lock_nb) + fcntl.flock(file, fcntl.LOCK_UN) + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + + +class TestMboxNext(unittest.TestCase): + def setUp(self): + self.not_empty_name = make_mbox(messages=18) + self.empty_name = make_mbox(messages=0) + + def testNextEmpty(self): + mbox = archivemail.Mbox(self.empty_name) + msg = mbox.next() + self.assertEqual(msg, None) + + def testNextNotEmpty(self): + mbox = archivemail.Mbox(self.not_empty_name) + for count in range(18): + msg = mbox.next() + assert(msg) + msg = mbox.next() + self.assertEqual(msg, None) + + def tearDown(self): + if os.path.exists(self.not_empty_name): + os.remove(self.not_empty_name) + if os.path.exists(self.empty_name): + os.remove(self.empty_name) + + +class TestMboxWrite(unittest.TestCase): + def setUp(self): + self.mbox_read = make_mbox(messages=3) + self.mbox_write = make_mbox(messages=0) + + def testWrite(self): + read = archivemail.Mbox(self.mbox_read) + write = archivemail.Mbox(self.mbox_write, mode="w") + for count in range(3): + msg = read.next() + write.write(msg) + read.close() + write.close() + assert(filecmp.cmp(self.mbox_read, self.mbox_write)) + + def testWriteNone(self): + write = archivemail.Mbox(self.mbox_write, mode="w") + self.assertRaises(AssertionError, write.write, None) + + def tearDown(self): + if os.path.exists(self.mbox_write): + os.remove(self.mbox_write) + if os.path.exists(self.mbox_read): + os.remove(self.mbox_read) + + +########## generic routine testing ################# + + +class TestIsTooOld(unittest.TestCase): + def testOld(self): + time_msg = time.time() - (15 * 24 * 60 * 60) # 15 days old + assert(archivemail.is_too_old(time_message=time_msg, max_days=14)) + + def testJustOld(self): + time_msg = time.time() - (25 * 60 * 60) # 25 hours old + assert(archivemail.is_too_old(time_message=time_msg, max_days=1)) + + def testNotOld(self): + time_msg = time.time() - (8 * 24 * 60 * 60) # 8 days old + assert(not archivemail.is_too_old(time_message=time_msg, max_days=9)) + + def testJustNotOld(self): + time_msg = time.time() - (23 * 60 * 60) # 23 hours old + assert(not archivemail.is_too_old(time_message=time_msg, max_days=1)) + + def testFuture(self): + time_msg = time.time() + (1 * 24 * 60 * 60) # tomorrow + assert(not archivemail.is_too_old(time_message=time_msg, max_days=1)) + + +class TestChooseTempDir(unittest.TestCase): + def setUp(self): + self.output_dir = tempfile.mktemp() + os.mkdir(self.output_dir) + self.sub_dir = tempfile.mktemp() + os.mkdir(self.sub_dir) + + def testCurrentDir(self): + archivemail._options.output_dir = None + dir = archivemail.choose_temp_dir("dummy") + self.assertEqual(dir, os.curdir) + + def testSubDir(self): + archivemail._options.output_dir = None + dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) + self.assertEqual(dir, self.sub_dir) + + def testOutputDir(self): + archivemail._options.output_dir = self.output_dir + dir = archivemail.choose_temp_dir("dummy") + self.assertEqual(dir, self.output_dir) + + def testSubDirOutputDir(self): + archivemail._options.output_dir = self.output_dir + dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy")) + self.assertEqual(dir, self.output_dir) + + def tearDown(self): + os.rmdir(self.output_dir) + os.rmdir(self.sub_dir) + + +########## proper archival testing ########### + +class TestArchiveMboxTimestampNew(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) + self.mtime = os.path.getmtime(self.mbox_name) - 66 + self.atime = os.path.getatime(self.mbox_name) - 88 + os.utime(self.mbox_name, (self.atime, self.mtime)) + archivemail._options.quiet = 1 + + def testTime(self): + archivemail._options.compressor = "gzip" + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + new_atime = os.path.getatime(self.mbox_name) + new_mtime = os.path.getmtime(self.mbox_name) + self.assertEqual(self.mtime, new_mtime) + self.assertEqual(self.atime, new_atime) + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + archivemail._options.quiet = 0 + + +class TestArchiveMboxTimestampOld(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) + self.mtime = os.path.getmtime(self.mbox_name) - 66 + self.atime = os.path.getatime(self.mbox_name) - 88 + os.utime(self.mbox_name, (self.atime, self.mtime)) + archivemail._options.quiet = 1 + + def testTime(self): + archivemail._options.compressor = "gzip" + archive_name = self.mbox_name + "_archive.gz" + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + new_atime = os.path.getatime(self.mbox_name) + new_mtime = os.path.getmtime(self.mbox_name) + self.assertEqual(self.mtime, new_mtime) + self.assertEqual(self.atime, new_atime) + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + for ext in (".gz", ".bz2", ".Z"): + if os.path.exists(self.mbox_name + ext): + os.remove(self.mbox_name + ext) + archivemail._options.quiet = 0 + + +class TestArchiveMboxOld(unittest.TestCase): + def setUp(self): + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181)) + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.copy_name = tempfile.mktemp() + shutil.copyfile(self.mbox_name, self.copy_name) + archivemail._options.quiet = 1 + + def testArchiveOldGzip(self): + archivemail._options.compressor = "gzip" + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + self.assertEqual(os.path.getsize(self.mbox_name), 0) + new_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.assertEqual(self.mbox_mode, new_mode) + + archive_name = self.mbox_name + "_archive.gz" + assert(os.path.exists(archive_name)) + os.system("gzip -d " + archive_name) + + archive_name = self.mbox_name + "_archive" + assert(os.path.exists(archive_name)) + assert(filecmp.cmp(archive_name, self.copy_name)) + self.tearDown() + self.setUp() + + def testArchiveOldBzip2(self): + archivemail._options.compressor = "bzip2" + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + self.assertEqual(os.path.getsize(self.mbox_name), 0) + new_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.assertEqual(self.mbox_mode, new_mode) + + archive_name = self.mbox_name + "_archive.bz2" + assert(os.path.exists(archive_name)) + os.system("bzip2 -d " + archive_name) + + archive_name = self.mbox_name + "_archive" + assert(os.path.exists(archive_name)) + assert(filecmp.cmp(archive_name, self.copy_name)) + self.tearDown() + self.setUp() + + def testArchiveOldCompress(self): + archivemail._options.compressor = "compress" + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + self.assertEqual(os.path.getsize(self.mbox_name), 0) + new_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.assertEqual(self.mbox_mode, new_mode) + + archive_name = self.mbox_name + "_archive.Z" + assert(os.path.exists(archive_name)) + os.system("compress -d " + archive_name) + + archive_name = self.mbox_name + "_archive" + assert(os.path.exists(archive_name)) + assert(filecmp.cmp(archive_name, self.copy_name)) + self.tearDown() + self.setUp() + + def tearDown(self): + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + for ext in (".gz", ".bz2", ".Z"): + if os.path.exists(self.mbox_name + ext): + os.remove(self.mbox_name + ext) + if os.path.exists(self.copy_name): + os.remove(self.copy_name) + archivemail._options.quiet = 0 + + +class TestArchiveMboxNew(unittest.TestCase): + def setUp(self): + archivemail._options.quiet = 1 + self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179)) + self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.copy_name = tempfile.mktemp() + shutil.copyfile(self.mbox_name, self.copy_name) + + def testArchiveNew(self): + archivemail.archive(self.mbox_name) + assert(os.path.exists(self.mbox_name)) + assert(filecmp.cmp(self.mbox_name, self.copy_name)) + new_mode = os.stat(self.mbox_name)[stat.ST_MODE] + self.assertEqual(self.mbox_mode, new_mode) + + archive_name = self.mbox_name + "_archive.gz" + assert(not os.path.exists(archive_name)) + + def tearDown(self): + archivemail._options.quiet = 0 + if os.path.exists(self.mbox_name): + os.remove(self.mbox_name) + if os.path.exists(self.copy_name): + os.remove(self.copy_name) + + +########## helper routines ############ + +def make_message(hours_old=0): + time_message = time.time() - (60 * 60 * hours_old) + time_string = time.asctime(time.localtime(time_message)) + + return """From sender@domain %s +From: sender@domain +To: receipient@domain +Subject: This is a dummy message +Date: %s + +This is the message body. +It's very exciting. + + +""" % (time_string, time_string) + +def make_mbox(messages=1, hours_old=0): + name = tempfile.mktemp() + file = open(name, "w") + for count in range(messages): + file.write(make_message(hours_old=hours_old)) + file.close() + return name + +def is_world_readable(path): + """Return true if the path is world-readable, false otherwise""" + assert(path) + return (os.stat(path)[stat.ST_MODE] & stat.S_IROTH) + + +if __name__ == "__main__": + unittest.main()