We now preserve the last-accessed and last-modified timestamps correctly.
Fixed a bug where lockfiles were being created that were not world-readable. Made archivemail work better when used as a python module so it can integrate better with unittest. Renamed unittest script 'test_archivemail' instead of 'archivemail_test' and added about 20 more tests.
This commit is contained in:
parent
92e86986e5
commit
35a9f14982
|
@ -1,3 +1,11 @@
|
||||||
|
Version 0.3.0 - ???
|
||||||
|
* We now preserve the last-accessed and last-modified timestamps correctly
|
||||||
|
* Fixed a bug where lockfiles were being created that were not
|
||||||
|
world-readable
|
||||||
|
* Made archivemail work better when used as a python module so it can
|
||||||
|
integrate better with unittest.
|
||||||
|
* Budled a unit testing script for archivemail.
|
||||||
|
|
||||||
Version 0.2.1 - 4 April 2002
|
Version 0.2.1 - 4 April 2002
|
||||||
* Since we might not have a parse-able 'Date-Received' or 'Date' field,
|
* Since we might not have a parse-able 'Date-Received' or 'Date' field,
|
||||||
use 5 different ways to guess the date of a message.
|
use 5 different ways to guess the date of a message.
|
||||||
|
|
4
TODO
4
TODO
|
@ -1,9 +1,9 @@
|
||||||
|
|
||||||
Goals for next minor release (0.2.2):
|
Goals for next minor release (0.2.2):
|
||||||
-------------------------------------
|
-------------------------------------
|
||||||
* Test exclusive locking works with another test process
|
|
||||||
* Perserve atime of original mailbox properly
|
|
||||||
* Finish man page
|
* Finish man page
|
||||||
|
* If a mailbox has a mode of 660, preserve it.
|
||||||
|
(Especially in /var/spool/mail with groupid of 'mail')
|
||||||
|
|
||||||
Goals for next major release (0.3.0):
|
Goals for next major release (0.3.0):
|
||||||
-------------------------------------
|
-------------------------------------
|
||||||
|
|
138
archivemail.py
138
archivemail.py
|
@ -16,12 +16,19 @@
|
||||||
# along with this program; if not, write to the Free Software
|
# along with this program; if not, write to the Free Software
|
||||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
############################################################################
|
############################################################################
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Archive and compress old mail in mbox or maildir-format mailboxes.
|
Archive and compress old mail in mbox or maildir-format mailboxes.
|
||||||
Website: http://archivemail.sourceforge.net/
|
Website: http://archivemail.sourceforge.net/
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# global administrivia
|
||||||
|
__version__ = "archivemail v0.3.0"
|
||||||
|
__cvs_id__ = "$Id$"
|
||||||
|
__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
|
||||||
|
This is free software; see the source for copying conditions. There is NO
|
||||||
|
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
|
||||||
|
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
def check_python_version():
|
def check_python_version():
|
||||||
|
@ -51,15 +58,6 @@ import string
|
||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# global administrivia
|
|
||||||
__version__ = "archivemail v0.2.1"
|
|
||||||
__cvs_id__ = "$Id$"
|
|
||||||
__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
|
|
||||||
This is free software; see the source for copying conditions. There is NO
|
|
||||||
warranty; not even for MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE."""
|
|
||||||
|
|
||||||
_stale = None # list of files to delete on abnormal exit
|
|
||||||
|
|
||||||
############## class definitions ###############
|
############## class definitions ###############
|
||||||
|
|
||||||
class Stats:
|
class Stats:
|
||||||
|
@ -83,7 +81,8 @@ class Stats:
|
||||||
assert(final_archive_name)
|
assert(final_archive_name)
|
||||||
self.__start_time = time.time()
|
self.__start_time = time.time()
|
||||||
self.__mailbox_name = mailbox_name
|
self.__mailbox_name = mailbox_name
|
||||||
self.__archive_name = final_archive_name + _options.compressor_extension
|
self.__archive_name = final_archive_name + \
|
||||||
|
_options.compressor_extension()
|
||||||
|
|
||||||
def another_message(self):
|
def another_message(self):
|
||||||
"""Add one to the internal count of total messages processed"""
|
"""Add one to the internal count of total messages processed"""
|
||||||
|
@ -138,8 +137,7 @@ class StaleFiles:
|
||||||
class Options:
|
class Options:
|
||||||
"""Class to store runtime options, including defaults"""
|
"""Class to store runtime options, including defaults"""
|
||||||
archive_suffix = "_archive"
|
archive_suffix = "_archive"
|
||||||
compressor = None
|
compressor = "gzip"
|
||||||
compressor_extension = None
|
|
||||||
days_old_max = 180
|
days_old_max = 180
|
||||||
delete_old_mail = 0
|
delete_old_mail = 0
|
||||||
dry_run = 0
|
dry_run = 0
|
||||||
|
@ -171,6 +169,8 @@ class Options:
|
||||||
"warn-duplicate"])
|
"warn-duplicate"])
|
||||||
except getopt.error, msg:
|
except getopt.error, msg:
|
||||||
user_error(msg)
|
user_error(msg)
|
||||||
|
|
||||||
|
chosen_compressor = None
|
||||||
for o, a in opts:
|
for o, a in opts:
|
||||||
if o == '--delete':
|
if o == '--delete':
|
||||||
self.delete_old_mail = 1
|
self.delete_old_mail = 1
|
||||||
|
@ -195,25 +195,22 @@ class Options:
|
||||||
print __version__ + "\n\n" + __copyright__
|
print __version__ + "\n\n" + __copyright__
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
if o in ('-z', '--gzip'):
|
if o in ('-z', '--gzip'):
|
||||||
if (self.compressor):
|
if (chosen_compressor):
|
||||||
user_error("conflicting compression options")
|
user_error("conflicting compression options")
|
||||||
self.compressor = "gzip"
|
self.compressor = "gzip"
|
||||||
|
chosen_compressor = 1
|
||||||
if o in ('-Z', '--compress'):
|
if o in ('-Z', '--compress'):
|
||||||
if (self.compressor):
|
if (chosen_compressor):
|
||||||
user_error("conflicting compression options")
|
user_error("conflicting compression options")
|
||||||
self.compressor = "compress"
|
self.compressor = "compress"
|
||||||
|
chosen_compressor = 1
|
||||||
if o in ('-I', '--bzip2'):
|
if o in ('-I', '--bzip2'):
|
||||||
if (self.compressor):
|
if (chosen_compressor):
|
||||||
user_error("conflicting compression options")
|
user_error("conflicting compression options")
|
||||||
self.compressor = "bzip2"
|
self.compressor = "bzip2"
|
||||||
|
chosen_compressor = 1
|
||||||
if not self.compressor:
|
if not self.compressor:
|
||||||
self.compressor = "gzip"
|
self.compressor = "gzip"
|
||||||
extensions = {
|
|
||||||
"compress" : ".Z",
|
|
||||||
"gzip" : ".gz",
|
|
||||||
"bzip2" : ".bz2",
|
|
||||||
}
|
|
||||||
self.compressor_extension = extensions[self.compressor]
|
|
||||||
return args
|
return args
|
||||||
|
|
||||||
def sanity_check(self):
|
def sanity_check(self):
|
||||||
|
@ -233,6 +230,15 @@ class Options:
|
||||||
if (self.days_old_max >= 10000):
|
if (self.days_old_max >= 10000):
|
||||||
user_error("argument to -d must be less than 10000")
|
user_error("argument to -d must be less than 10000")
|
||||||
|
|
||||||
|
def compressor_extension(self):
|
||||||
|
extensions = {
|
||||||
|
"compress" : ".Z",
|
||||||
|
"gzip" : ".gz",
|
||||||
|
"bzip2" : ".bz2",
|
||||||
|
}
|
||||||
|
return extensions[self.compressor]
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Mbox(mailbox.PortableUnixMailbox):
|
class Mbox(mailbox.PortableUnixMailbox):
|
||||||
"""Class that allows read/write access to a 'mbox' mailbox.
|
"""Class that allows read/write access to a 'mbox' mailbox.
|
||||||
|
@ -240,18 +246,23 @@ class Mbox(mailbox.PortableUnixMailbox):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
mbox_file = None # file handle for the mbox file
|
mbox_file = None # file handle for the mbox file
|
||||||
|
original_atime = None # last-accessed timestamp
|
||||||
|
original_mtime = None # last-modified timestamp
|
||||||
|
|
||||||
def __init__(self, path_name):
|
def __init__(self, path, mode="r"):
|
||||||
"""Constructor for opening an existing 'mbox' mailbox.
|
"""Constructor for opening an existing 'mbox' mailbox.
|
||||||
Extends constructor for mailbox.PortableUnixMailbox()
|
Extends constructor for mailbox.PortableUnixMailbox()
|
||||||
|
|
||||||
Arguments:
|
Named Arguments:
|
||||||
path_name -- file name of the 'mbox' file to be opened
|
path -- file name of the 'mbox' file to be opened
|
||||||
|
mode -- mode to open the file in (default is read-only)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
assert(path_name)
|
assert(path)
|
||||||
try:
|
try:
|
||||||
self.mbox_file = open(path_name, "r")
|
self.original_atime = os.path.getatime(path)
|
||||||
|
self.original_mtime = os.path.getmtime(path)
|
||||||
|
self.mbox_file = open(path, mode)
|
||||||
except IOError, msg:
|
except IOError, msg:
|
||||||
unexpected_error(msg)
|
unexpected_error(msg)
|
||||||
mailbox.PortableUnixMailbox.__init__(self, self.mbox_file)
|
mailbox.PortableUnixMailbox.__init__(self, self.mbox_file)
|
||||||
|
@ -266,6 +277,8 @@ class Mbox(mailbox.PortableUnixMailbox):
|
||||||
|
|
||||||
"""
|
"""
|
||||||
assert(msg)
|
assert(msg)
|
||||||
|
assert(self.mbox_file)
|
||||||
|
|
||||||
vprint("saving message to file '%s'" % self.mbox_file.name)
|
vprint("saving message to file '%s'" % self.mbox_file.name)
|
||||||
unix_from = msg.unixfrom
|
unix_from = msg.unixfrom
|
||||||
if not unix_from:
|
if not unix_from:
|
||||||
|
@ -300,6 +313,14 @@ class Mbox(mailbox.PortableUnixMailbox):
|
||||||
vprint("closing file '%s'" % self.mbox_file.name)
|
vprint("closing file '%s'" % self.mbox_file.name)
|
||||||
self.mbox_file.close()
|
self.mbox_file.close()
|
||||||
|
|
||||||
|
def reset_time_stamps(self):
|
||||||
|
"""Set the file timestamps to the original value"""
|
||||||
|
assert(self.original_atime)
|
||||||
|
assert(self.original_mtime)
|
||||||
|
assert(self.mbox_file.name)
|
||||||
|
os.utime(self.mbox_file.name, (self.original_atime, \
|
||||||
|
self.original_mtime))
|
||||||
|
|
||||||
def exclusive_lock(self):
|
def exclusive_lock(self):
|
||||||
"""Set an advisory lock on the 'mbox' mailbox"""
|
"""Set an advisory lock on the 'mbox' mailbox"""
|
||||||
vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name)
|
vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name)
|
||||||
|
@ -322,9 +343,11 @@ class Mbox(mailbox.PortableUnixMailbox):
|
||||||
unexpected_error("Giving up waiting for procmail lock '%s'"
|
unexpected_error("Giving up waiting for procmail lock '%s'"
|
||||||
% lock_name)
|
% lock_name)
|
||||||
vprint("writing lockfile '%s'" % lock_name)
|
vprint("writing lockfile '%s'" % lock_name)
|
||||||
|
old_umask = os.umask(022) # is this dodgy?
|
||||||
lock = open(lock_name, "w")
|
lock = open(lock_name, "w")
|
||||||
_stale.procmail_lock = lock_name
|
_stale.procmail_lock = lock_name
|
||||||
lock.close()
|
lock.close()
|
||||||
|
old_umask = os.umask(old_umask)
|
||||||
|
|
||||||
def procmail_unlock(self):
|
def procmail_unlock(self):
|
||||||
"""Delete the procmail lockfile on the 'mbox' mailbox"""
|
"""Delete the procmail lockfile on the 'mbox' mailbox"""
|
||||||
|
@ -342,11 +365,9 @@ class Mbox(mailbox.PortableUnixMailbox):
|
||||||
completely deleted."""
|
completely deleted."""
|
||||||
assert(os.path.isfile(self.mbox_file.name))
|
assert(os.path.isfile(self.mbox_file.name))
|
||||||
vprint("turning '%s' into a zero-length file" % self.mbox_file.name)
|
vprint("turning '%s' into a zero-length file" % self.mbox_file.name)
|
||||||
atime = os.path.getatime(self.mbox_file.name)
|
|
||||||
mtime = os.path.getmtime(self.mbox_file.name)
|
mtime = os.path.getmtime(self.mbox_file.name)
|
||||||
blank_file = open(self.mbox_file.name, "w")
|
blank_file = open(self.mbox_file.name, "w")
|
||||||
blank_file.close()
|
blank_file.close()
|
||||||
os.utime(self.mbox_file.name, (atime, mtime)) # to original timestamps
|
|
||||||
|
|
||||||
|
|
||||||
class RetainMbox(Mbox):
|
class RetainMbox(Mbox):
|
||||||
|
@ -377,8 +398,14 @@ class RetainMbox(Mbox):
|
||||||
"""Overwrite the original mailbox with this temporary mailbox."""
|
"""Overwrite the original mailbox with this temporary mailbox."""
|
||||||
assert(self.__final_name)
|
assert(self.__final_name)
|
||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
|
# make sure that the retained mailbox has the same timestamps and
|
||||||
|
# permission as the original mailbox
|
||||||
atime = os.path.getatime(self.__final_name)
|
atime = os.path.getatime(self.__final_name)
|
||||||
mtime = os.path.getmtime(self.__final_name)
|
mtime = os.path.getmtime(self.__final_name)
|
||||||
|
mode = os.stat(self.__final_name)[stat.ST_MODE]
|
||||||
|
os.chmod(self.mbox_file.name, mode)
|
||||||
|
|
||||||
vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name))
|
vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name))
|
||||||
os.rename(self.mbox_file.name, self.__final_name)
|
os.rename(self.mbox_file.name, self.__final_name)
|
||||||
os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps
|
os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps
|
||||||
|
@ -414,7 +441,7 @@ class ArchiveMbox(Mbox):
|
||||||
"""
|
"""
|
||||||
assert(final_name)
|
assert(final_name)
|
||||||
compressor = _options.compressor
|
compressor = _options.compressor
|
||||||
compressedfilename = final_name + _options.compressor_extension
|
compressedfilename = final_name + _options.compressor_extension()
|
||||||
|
|
||||||
if os.path.isfile(final_name):
|
if os.path.isfile(final_name):
|
||||||
unexpected_error("""There is already a file named '%s'!
|
unexpected_error("""There is already a file named '%s'!
|
||||||
|
@ -445,13 +472,14 @@ manually, and try running me again.""" % final_name)
|
||||||
self.close()
|
self.close()
|
||||||
compressor = _options.compressor
|
compressor = _options.compressor
|
||||||
compressed_archive_name = self.mbox_file.name + \
|
compressed_archive_name = self.mbox_file.name + \
|
||||||
_options.compressor_extension
|
_options.compressor_extension()
|
||||||
compress = compressor + " " + self.mbox_file.name
|
compress = compressor + " " + self.mbox_file.name
|
||||||
vprint("running compressor: '%s'" % compress)
|
vprint("running compressor: '%s'" % compress)
|
||||||
_stale.compressed_archive = compressed_archive_name
|
_stale.compressed_archive = compressed_archive_name
|
||||||
system_or_die(compress)
|
system_or_die(compress)
|
||||||
_stale.archive = None
|
_stale.archive = None
|
||||||
compressed_final_name = self.__final_name + _options.compressor_extension
|
compressed_final_name = self.__final_name + \
|
||||||
|
_options.compressor_extension()
|
||||||
vprint("renaming '%s' to '%s'" % (compressed_archive_name,
|
vprint("renaming '%s' to '%s'" % (compressed_archive_name,
|
||||||
compressed_final_name))
|
compressed_final_name))
|
||||||
os.rename(compressed_archive_name, compressed_final_name)
|
os.rename(compressed_archive_name, compressed_final_name)
|
||||||
|
@ -478,6 +506,7 @@ class IdentityCache:
|
||||||
|
|
||||||
# global class instances
|
# global class instances
|
||||||
_options = Options() # the run-time options object
|
_options = Options() # the run-time options object
|
||||||
|
_stale = StaleFiles() # remember what we have to delete on abnormal exit
|
||||||
|
|
||||||
|
|
||||||
def main(args = sys.argv[1:]):
|
def main(args = sys.argv[1:]):
|
||||||
|
@ -519,17 +548,6 @@ Website: http://archivemail.sourceforge.net/ """ % \
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
_options.sanity_check()
|
_options.sanity_check()
|
||||||
os.umask(077) # saves setting permissions on mailboxes/tempfiles
|
|
||||||
|
|
||||||
# Make sure we clean up nicely - we don't want to leave stale procmail
|
|
||||||
# lockfiles about if something bad happens to us. This is quite
|
|
||||||
# important, even though procmail will delete stale files after a while.
|
|
||||||
_stale = StaleFiles() # remember what we have to delete
|
|
||||||
atexit.register(clean_up) # delete stale files on exceptions/normal exit
|
|
||||||
signal.signal(signal.SIGHUP, clean_up_signal) # signal 1
|
|
||||||
# SIGINT (signal 2) is handled as a python exception
|
|
||||||
signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3
|
|
||||||
signal.signal(signal.SIGTERM, clean_up_signal) # signal 15
|
|
||||||
|
|
||||||
for mailbox_path in args:
|
for mailbox_path in args:
|
||||||
archive(mailbox_path)
|
archive(mailbox_path)
|
||||||
|
@ -599,7 +617,7 @@ def guess_delivery_time(message):
|
||||||
if date:
|
if date:
|
||||||
try:
|
try:
|
||||||
time_message = time.mktime(date)
|
time_message = time.mktime(date)
|
||||||
assert(time_message, 'time.mktime() returned false')
|
assert(time_message)
|
||||||
vprint("using valid time found from '%s' header" % header)
|
vprint("using valid time found from '%s' header" % header)
|
||||||
return time_message
|
return time_message
|
||||||
except (ValueError, OverflowError): pass
|
except (ValueError, OverflowError): pass
|
||||||
|
@ -612,7 +630,7 @@ def guess_delivery_time(message):
|
||||||
if date:
|
if date:
|
||||||
try:
|
try:
|
||||||
time_message = time.mktime(date)
|
time_message = time.mktime(date)
|
||||||
assert(time_message, 'time.mktime() returned false')
|
assert(time_message)
|
||||||
vprint("using valid time found from unix 'From_' header")
|
vprint("using valid time found from unix 'From_' header")
|
||||||
return time_message
|
return time_message
|
||||||
except (ValueError, OverflowError): pass
|
except (ValueError, OverflowError): pass
|
||||||
|
@ -634,7 +652,7 @@ def guess_delivery_time(message):
|
||||||
return time_message
|
return time_message
|
||||||
|
|
||||||
|
|
||||||
def is_too_old(time_message):
|
def is_too_old(time_message, max_days):
|
||||||
"""Return true if a message is too old (and should be archived),
|
"""Return true if a message is too old (and should be archived),
|
||||||
false otherwise.
|
false otherwise.
|
||||||
|
|
||||||
|
@ -643,12 +661,14 @@ def is_too_old(time_message):
|
||||||
since the epoch
|
since the epoch
|
||||||
|
|
||||||
"""
|
"""
|
||||||
assert(time_message)
|
assert(time_message > 0)
|
||||||
|
assert(max_days >= 1)
|
||||||
|
|
||||||
time_now = time.time()
|
time_now = time.time()
|
||||||
if time_message > time_now:
|
if time_message > time_now:
|
||||||
vprint("warning: message has date in the future")
|
vprint("warning: message has date in the future")
|
||||||
return 0
|
return 0
|
||||||
secs_old_max = (_options.days_old_max * 24 * 60 * 60)
|
secs_old_max = (max_days * 24 * 60 * 60)
|
||||||
days_old = (time_now - time_message) / 24 / 60 / 60
|
days_old = (time_now - time_message) / 24 / 60 / 60
|
||||||
vprint("message is %.2f days old" % days_old)
|
vprint("message is %.2f days old" % days_old)
|
||||||
if ((time_message + secs_old_max) < time_now):
|
if ((time_message + secs_old_max) < time_now):
|
||||||
|
@ -670,6 +690,9 @@ def archive(mailbox_name):
|
||||||
"""
|
"""
|
||||||
assert(mailbox_name)
|
assert(mailbox_name)
|
||||||
|
|
||||||
|
set_signal_handlers()
|
||||||
|
os.umask(077) # saves setting permissions on mailboxes/tempfiles
|
||||||
|
|
||||||
final_archive_name = mailbox_name + _options.archive_suffix
|
final_archive_name = mailbox_name + _options.archive_suffix
|
||||||
if _options.output_dir:
|
if _options.output_dir:
|
||||||
final_archive_name = os.path.join(_options.output_dir,
|
final_archive_name = os.path.join(_options.output_dir,
|
||||||
|
@ -730,7 +753,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
||||||
archive = None
|
archive = None
|
||||||
retain = None
|
retain = None
|
||||||
stats = Stats(mailbox_name, final_archive_name)
|
stats = Stats(mailbox_name, final_archive_name)
|
||||||
original = Mbox(mailbox_name)
|
original = Mbox(path=mailbox_name)
|
||||||
cache = IdentityCache(mailbox_name)
|
cache = IdentityCache(mailbox_name)
|
||||||
|
|
||||||
original.procmail_lock()
|
original.procmail_lock()
|
||||||
|
@ -742,7 +765,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
||||||
if _options.warn_duplicates:
|
if _options.warn_duplicates:
|
||||||
cache.warn_if_dupe(msg)
|
cache.warn_if_dupe(msg)
|
||||||
time_message = guess_delivery_time(msg)
|
time_message = guess_delivery_time(msg)
|
||||||
if is_too_old(time_message):
|
if is_too_old(time_message, _options.days_old_max):
|
||||||
stats.another_archived()
|
stats.another_archived()
|
||||||
if _options.delete_old_mail:
|
if _options.delete_old_mail:
|
||||||
vprint("decision: delete message")
|
vprint("decision: delete message")
|
||||||
|
@ -762,6 +785,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
||||||
vprint("finished reading messages")
|
vprint("finished reading messages")
|
||||||
original.exclusive_unlock()
|
original.exclusive_unlock()
|
||||||
original.close()
|
original.close()
|
||||||
|
original.reset_time_stamps()
|
||||||
if not _options.dry_run:
|
if not _options.dry_run:
|
||||||
if retain: retain.close()
|
if retain: retain.close()
|
||||||
if archive: archive.close()
|
if archive: archive.close()
|
||||||
|
@ -772,6 +796,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
||||||
else:
|
else:
|
||||||
# nothing was retained - everything was deleted
|
# nothing was retained - everything was deleted
|
||||||
original.leave_empty()
|
original.leave_empty()
|
||||||
|
original.reset_time_stamps()
|
||||||
elif archive:
|
elif archive:
|
||||||
archive.finalise()
|
archive.finalise()
|
||||||
if retain:
|
if retain:
|
||||||
|
@ -779,6 +804,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
||||||
else:
|
else:
|
||||||
# nothing was retained - everything was deleted
|
# nothing was retained - everything was deleted
|
||||||
original.leave_empty()
|
original.leave_empty()
|
||||||
|
original.reset_time_stamps()
|
||||||
else:
|
else:
|
||||||
# There was nothing to archive
|
# There was nothing to archive
|
||||||
if retain:
|
if retain:
|
||||||
|
@ -816,7 +842,7 @@ def _archive_dir(mailbox_name, final_archive_name, type):
|
||||||
if _options.warn_duplicates:
|
if _options.warn_duplicates:
|
||||||
cache.warn_if_dupe(msg)
|
cache.warn_if_dupe(msg)
|
||||||
time_message = guess_delivery_time(msg)
|
time_message = guess_delivery_time(msg)
|
||||||
if is_too_old(time_message):
|
if is_too_old(time_message, _options.days_old_max):
|
||||||
stats.another_archived()
|
stats.another_archived()
|
||||||
if _options.delete_old_mail:
|
if _options.delete_old_mail:
|
||||||
vprint("decision: delete message")
|
vprint("decision: delete message")
|
||||||
|
@ -868,6 +894,16 @@ def choose_temp_dir(mailbox_name):
|
||||||
return temp_dir
|
return temp_dir
|
||||||
|
|
||||||
|
|
||||||
|
def set_signal_handlers():
|
||||||
|
# Make sure we clean up nicely - we don't want to leave stale procmail
|
||||||
|
# lockfiles about if something bad happens to us. This is quite
|
||||||
|
# important, even though procmail will delete stale files after a while.
|
||||||
|
atexit.register(clean_up) # delete stale files on exceptions/normal exit
|
||||||
|
signal.signal(signal.SIGHUP, clean_up_signal) # signal 1
|
||||||
|
# SIGINT (signal 2) is handled as a python exception
|
||||||
|
signal.signal(signal.SIGQUIT, clean_up_signal) # signal 3
|
||||||
|
signal.signal(signal.SIGTERM, clean_up_signal) # signal 15
|
||||||
|
|
||||||
def clean_up():
|
def clean_up():
|
||||||
"""Delete stale files -- to be registered with atexit.register()"""
|
"""Delete stale files -- to be registered with atexit.register()"""
|
||||||
vprint("cleaning up ...")
|
vprint("cleaning up ...")
|
||||||
|
|
|
@ -1,41 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
import archivemail
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import unittest
|
|
||||||
|
|
||||||
class TempfileTestCase(unittest.TestCase):
|
|
||||||
def setUp(self):
|
|
||||||
self.output_dir = tempfile.mktemp()
|
|
||||||
os.mkdir(self.output_dir)
|
|
||||||
self.sub_dir = tempfile.mktemp()
|
|
||||||
os.mkdir(self.sub_dir)
|
|
||||||
|
|
||||||
def testCurrentDir(self):
|
|
||||||
archivemail._options.output_dir = None
|
|
||||||
dir = archivemail.choose_temp_dir("dummy")
|
|
||||||
self.assertEqual(dir, os.curdir)
|
|
||||||
|
|
||||||
def testSubDir(self):
|
|
||||||
archivemail._options.output_dir = None
|
|
||||||
dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
|
|
||||||
self.assertEqual(dir, self.sub_dir)
|
|
||||||
|
|
||||||
def testOutputDir(self):
|
|
||||||
archivemail._options.output_dir = self.output_dir
|
|
||||||
dir = archivemail.choose_temp_dir("dummy")
|
|
||||||
self.assertEqual(dir, self.output_dir)
|
|
||||||
|
|
||||||
def testSubDirOutputDir(self):
|
|
||||||
archivemail._options.output_dir = self.output_dir
|
|
||||||
dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
|
|
||||||
self.assertEqual(dir, self.output_dir)
|
|
||||||
|
|
||||||
def tearDown(self):
|
|
||||||
os.rmdir(self.output_dir)
|
|
||||||
os.rmdir(self.sub_dir)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
|
@ -0,0 +1,424 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
############################################################################
|
||||||
|
# Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
############################################################################
|
||||||
|
"""
|
||||||
|
Test archivemail works correctly using 'pyunit'.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import fcntl
|
||||||
|
import filecmp
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import stat
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import archivemail
|
||||||
|
|
||||||
|
__version__ = """$Id$"""
|
||||||
|
|
||||||
|
############ Mbox Class testing ##############
|
||||||
|
|
||||||
|
class TestMboxIsEmpty(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.empty_name = make_mbox(messages=0)
|
||||||
|
self.not_empty_name = make_mbox(messages=1)
|
||||||
|
|
||||||
|
def testEmpty(self):
|
||||||
|
mbox = archivemail.Mbox(self.empty_name)
|
||||||
|
assert(mbox.is_empty())
|
||||||
|
|
||||||
|
def testNotEmpty(self):
|
||||||
|
mbox = archivemail.Mbox(self.not_empty_name)
|
||||||
|
assert(not mbox.is_empty())
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.empty_name):
|
||||||
|
os.remove(self.empty_name)
|
||||||
|
if os.path.exists(self.not_empty_name):
|
||||||
|
os.remove(self.not_empty_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxLeaveEmpty(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox()
|
||||||
|
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
||||||
|
|
||||||
|
def testLeaveEmpty(self):
|
||||||
|
self.mbox.leave_empty()
|
||||||
|
assert(os.path.isfile(self.mbox_name))
|
||||||
|
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||||
|
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.assertEqual(new_mode, self.mbox_mode)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxProcmailLock(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox()
|
||||||
|
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
||||||
|
|
||||||
|
def testProcmailLock(self):
|
||||||
|
lock = self.mbox_name + ".lock"
|
||||||
|
self.mbox.procmail_lock()
|
||||||
|
assert(os.path.isfile(lock))
|
||||||
|
assert(is_world_readable(lock))
|
||||||
|
self.mbox.procmail_unlock()
|
||||||
|
assert(not os.path.isfile(lock))
|
||||||
|
|
||||||
|
# TODO: add a test where the lock already exists
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxRemove(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox()
|
||||||
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
||||||
|
|
||||||
|
def testProcmailLock(self):
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
self.mbox.remove()
|
||||||
|
assert(not os.path.exists(self.mbox_name))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxExclusiveLock(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox()
|
||||||
|
self.mbox = archivemail.Mbox(self.mbox_name)
|
||||||
|
|
||||||
|
def testExclusiveLock(self):
|
||||||
|
self.mbox.exclusive_lock()
|
||||||
|
file = open(self.mbox_name, "r+")
|
||||||
|
lock_nb = fcntl.LOCK_EX | fcntl.LOCK_NB
|
||||||
|
self.assertRaises(IOError, fcntl.flock, file, lock_nb)
|
||||||
|
|
||||||
|
self.mbox.exclusive_unlock()
|
||||||
|
fcntl.flock(file, lock_nb)
|
||||||
|
fcntl.flock(file, fcntl.LOCK_UN)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxNext(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.not_empty_name = make_mbox(messages=18)
|
||||||
|
self.empty_name = make_mbox(messages=0)
|
||||||
|
|
||||||
|
def testNextEmpty(self):
|
||||||
|
mbox = archivemail.Mbox(self.empty_name)
|
||||||
|
msg = mbox.next()
|
||||||
|
self.assertEqual(msg, None)
|
||||||
|
|
||||||
|
def testNextNotEmpty(self):
|
||||||
|
mbox = archivemail.Mbox(self.not_empty_name)
|
||||||
|
for count in range(18):
|
||||||
|
msg = mbox.next()
|
||||||
|
assert(msg)
|
||||||
|
msg = mbox.next()
|
||||||
|
self.assertEqual(msg, None)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.not_empty_name):
|
||||||
|
os.remove(self.not_empty_name)
|
||||||
|
if os.path.exists(self.empty_name):
|
||||||
|
os.remove(self.empty_name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestMboxWrite(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_read = make_mbox(messages=3)
|
||||||
|
self.mbox_write = make_mbox(messages=0)
|
||||||
|
|
||||||
|
def testWrite(self):
|
||||||
|
read = archivemail.Mbox(self.mbox_read)
|
||||||
|
write = archivemail.Mbox(self.mbox_write, mode="w")
|
||||||
|
for count in range(3):
|
||||||
|
msg = read.next()
|
||||||
|
write.write(msg)
|
||||||
|
read.close()
|
||||||
|
write.close()
|
||||||
|
assert(filecmp.cmp(self.mbox_read, self.mbox_write))
|
||||||
|
|
||||||
|
def testWriteNone(self):
|
||||||
|
write = archivemail.Mbox(self.mbox_write, mode="w")
|
||||||
|
self.assertRaises(AssertionError, write.write, None)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_write):
|
||||||
|
os.remove(self.mbox_write)
|
||||||
|
if os.path.exists(self.mbox_read):
|
||||||
|
os.remove(self.mbox_read)
|
||||||
|
|
||||||
|
|
||||||
|
########## generic routine testing #################
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsTooOld(unittest.TestCase):
|
||||||
|
def testOld(self):
|
||||||
|
time_msg = time.time() - (15 * 24 * 60 * 60) # 15 days old
|
||||||
|
assert(archivemail.is_too_old(time_message=time_msg, max_days=14))
|
||||||
|
|
||||||
|
def testJustOld(self):
|
||||||
|
time_msg = time.time() - (25 * 60 * 60) # 25 hours old
|
||||||
|
assert(archivemail.is_too_old(time_message=time_msg, max_days=1))
|
||||||
|
|
||||||
|
def testNotOld(self):
|
||||||
|
time_msg = time.time() - (8 * 24 * 60 * 60) # 8 days old
|
||||||
|
assert(not archivemail.is_too_old(time_message=time_msg, max_days=9))
|
||||||
|
|
||||||
|
def testJustNotOld(self):
|
||||||
|
time_msg = time.time() - (23 * 60 * 60) # 23 hours old
|
||||||
|
assert(not archivemail.is_too_old(time_message=time_msg, max_days=1))
|
||||||
|
|
||||||
|
def testFuture(self):
|
||||||
|
time_msg = time.time() + (1 * 24 * 60 * 60) # tomorrow
|
||||||
|
assert(not archivemail.is_too_old(time_message=time_msg, max_days=1))
|
||||||
|
|
||||||
|
|
||||||
|
class TestChooseTempDir(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.output_dir = tempfile.mktemp()
|
||||||
|
os.mkdir(self.output_dir)
|
||||||
|
self.sub_dir = tempfile.mktemp()
|
||||||
|
os.mkdir(self.sub_dir)
|
||||||
|
|
||||||
|
def testCurrentDir(self):
|
||||||
|
archivemail._options.output_dir = None
|
||||||
|
dir = archivemail.choose_temp_dir("dummy")
|
||||||
|
self.assertEqual(dir, os.curdir)
|
||||||
|
|
||||||
|
def testSubDir(self):
|
||||||
|
archivemail._options.output_dir = None
|
||||||
|
dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
|
||||||
|
self.assertEqual(dir, self.sub_dir)
|
||||||
|
|
||||||
|
def testOutputDir(self):
|
||||||
|
archivemail._options.output_dir = self.output_dir
|
||||||
|
dir = archivemail.choose_temp_dir("dummy")
|
||||||
|
self.assertEqual(dir, self.output_dir)
|
||||||
|
|
||||||
|
def testSubDirOutputDir(self):
|
||||||
|
archivemail._options.output_dir = self.output_dir
|
||||||
|
dir = archivemail.choose_temp_dir(os.path.join(self.sub_dir, "dummy"))
|
||||||
|
self.assertEqual(dir, self.output_dir)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
os.rmdir(self.output_dir)
|
||||||
|
os.rmdir(self.sub_dir)
|
||||||
|
|
||||||
|
|
||||||
|
########## proper archival testing ###########
|
||||||
|
|
||||||
|
class TestArchiveMboxTimestampNew(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179))
|
||||||
|
self.mtime = os.path.getmtime(self.mbox_name) - 66
|
||||||
|
self.atime = os.path.getatime(self.mbox_name) - 88
|
||||||
|
os.utime(self.mbox_name, (self.atime, self.mtime))
|
||||||
|
archivemail._options.quiet = 1
|
||||||
|
|
||||||
|
def testTime(self):
|
||||||
|
archivemail._options.compressor = "gzip"
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
new_atime = os.path.getatime(self.mbox_name)
|
||||||
|
new_mtime = os.path.getmtime(self.mbox_name)
|
||||||
|
self.assertEqual(self.mtime, new_mtime)
|
||||||
|
self.assertEqual(self.atime, new_atime)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
archivemail._options.quiet = 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestArchiveMboxTimestampOld(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||||
|
self.mtime = os.path.getmtime(self.mbox_name) - 66
|
||||||
|
self.atime = os.path.getatime(self.mbox_name) - 88
|
||||||
|
os.utime(self.mbox_name, (self.atime, self.mtime))
|
||||||
|
archivemail._options.quiet = 1
|
||||||
|
|
||||||
|
def testTime(self):
|
||||||
|
archivemail._options.compressor = "gzip"
|
||||||
|
archive_name = self.mbox_name + "_archive.gz"
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
new_atime = os.path.getatime(self.mbox_name)
|
||||||
|
new_mtime = os.path.getmtime(self.mbox_name)
|
||||||
|
self.assertEqual(self.mtime, new_mtime)
|
||||||
|
self.assertEqual(self.atime, new_atime)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
for ext in (".gz", ".bz2", ".Z"):
|
||||||
|
if os.path.exists(self.mbox_name + ext):
|
||||||
|
os.remove(self.mbox_name + ext)
|
||||||
|
archivemail._options.quiet = 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestArchiveMboxOld(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||||
|
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.copy_name = tempfile.mktemp()
|
||||||
|
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||||
|
archivemail._options.quiet = 1
|
||||||
|
|
||||||
|
def testArchiveOldGzip(self):
|
||||||
|
archivemail._options.compressor = "gzip"
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||||
|
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.assertEqual(self.mbox_mode, new_mode)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive.gz"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
os.system("gzip -d " + archive_name)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
assert(filecmp.cmp(archive_name, self.copy_name))
|
||||||
|
self.tearDown()
|
||||||
|
self.setUp()
|
||||||
|
|
||||||
|
def testArchiveOldBzip2(self):
|
||||||
|
archivemail._options.compressor = "bzip2"
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||||
|
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.assertEqual(self.mbox_mode, new_mode)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive.bz2"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
os.system("bzip2 -d " + archive_name)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
assert(filecmp.cmp(archive_name, self.copy_name))
|
||||||
|
self.tearDown()
|
||||||
|
self.setUp()
|
||||||
|
|
||||||
|
def testArchiveOldCompress(self):
|
||||||
|
archivemail._options.compressor = "compress"
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||||
|
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.assertEqual(self.mbox_mode, new_mode)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive.Z"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
os.system("compress -d " + archive_name)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive"
|
||||||
|
assert(os.path.exists(archive_name))
|
||||||
|
assert(filecmp.cmp(archive_name, self.copy_name))
|
||||||
|
self.tearDown()
|
||||||
|
self.setUp()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
for ext in (".gz", ".bz2", ".Z"):
|
||||||
|
if os.path.exists(self.mbox_name + ext):
|
||||||
|
os.remove(self.mbox_name + ext)
|
||||||
|
if os.path.exists(self.copy_name):
|
||||||
|
os.remove(self.copy_name)
|
||||||
|
archivemail._options.quiet = 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestArchiveMboxNew(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
archivemail._options.quiet = 1
|
||||||
|
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179))
|
||||||
|
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.copy_name = tempfile.mktemp()
|
||||||
|
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||||
|
|
||||||
|
def testArchiveNew(self):
|
||||||
|
archivemail.archive(self.mbox_name)
|
||||||
|
assert(os.path.exists(self.mbox_name))
|
||||||
|
assert(filecmp.cmp(self.mbox_name, self.copy_name))
|
||||||
|
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||||
|
self.assertEqual(self.mbox_mode, new_mode)
|
||||||
|
|
||||||
|
archive_name = self.mbox_name + "_archive.gz"
|
||||||
|
assert(not os.path.exists(archive_name))
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
archivemail._options.quiet = 0
|
||||||
|
if os.path.exists(self.mbox_name):
|
||||||
|
os.remove(self.mbox_name)
|
||||||
|
if os.path.exists(self.copy_name):
|
||||||
|
os.remove(self.copy_name)
|
||||||
|
|
||||||
|
|
||||||
|
########## helper routines ############
|
||||||
|
|
||||||
|
def make_message(hours_old=0):
|
||||||
|
time_message = time.time() - (60 * 60 * hours_old)
|
||||||
|
time_string = time.asctime(time.localtime(time_message))
|
||||||
|
|
||||||
|
return """From sender@domain %s
|
||||||
|
From: sender@domain
|
||||||
|
To: receipient@domain
|
||||||
|
Subject: This is a dummy message
|
||||||
|
Date: %s
|
||||||
|
|
||||||
|
This is the message body.
|
||||||
|
It's very exciting.
|
||||||
|
|
||||||
|
|
||||||
|
""" % (time_string, time_string)
|
||||||
|
|
||||||
|
def make_mbox(messages=1, hours_old=0):
|
||||||
|
name = tempfile.mktemp()
|
||||||
|
file = open(name, "w")
|
||||||
|
for count in range(messages):
|
||||||
|
file.write(make_message(hours_old=hours_old))
|
||||||
|
file.close()
|
||||||
|
return name
|
||||||
|
|
||||||
|
def is_world_readable(path):
|
||||||
|
"""Return true if the path is world-readable, false otherwise"""
|
||||||
|
assert(path)
|
||||||
|
return (os.stat(path)[stat.ST_MODE] & stat.S_IROTH)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
Loading…
Reference in New Issue