mirror of
https://git.code.sf.net/p/archivemail/code
synced 2025-03-11 23:32:48 +00:00
Added an option '--no-compress' if you don't want gzipped archives.
Added an option '--preserve-unread' to always preserved (don't archive) unread messages.
This commit is contained in:
parent
21193abf5e
commit
ee9757de78
7 changed files with 427 additions and 56 deletions
|
@ -1,4 +1,10 @@
|
|||
Version 0.3.1 - 13 April 2002
|
||||
Version 0.4.0 - 17 April 2002
|
||||
* Added an option --no-compress to make archives but not compress them with
|
||||
gzip.
|
||||
* Added an option --preserve-unread to not archive unread messages.
|
||||
* Added a few more unittests.
|
||||
|
||||
Version 0.3.2 - 13 April 2002
|
||||
* Added a lot more information to the manpage, including examples and
|
||||
notes.
|
||||
* Fixed up the README file and archivemail usage message.
|
||||
|
|
7
Makefile
7
Makefile
|
@ -1,5 +1,5 @@
|
|||
|
||||
VERSION=0.3.2
|
||||
VERSION=0.4.0
|
||||
VERSION_TAG=v$(subst .,_,$(VERSION))
|
||||
|
||||
|
||||
|
@ -9,10 +9,13 @@ default:
|
|||
clean:
|
||||
rm -f *.pyc manpage.links manpage.refs manpage.log
|
||||
|
||||
test:
|
||||
python test_archivemail.py
|
||||
|
||||
clobber: clean
|
||||
rm -rf build dist
|
||||
|
||||
sdist: clobber
|
||||
sdist: clobber doc
|
||||
cp archivemail.py archivemail
|
||||
fakeroot python setup.py sdist
|
||||
rm archivemail
|
||||
|
|
14
TODO
14
TODO
|
@ -1,10 +1,16 @@
|
|||
|
||||
Goals for next major release (0.4.0):
|
||||
Goals for next minor release (0.4.1):
|
||||
-------------------------------------
|
||||
* Add a lot more tests (see top of test_archivemail.py)
|
||||
* Check the sizes of the original mailbox before reading & just before
|
||||
overwriting. They should not have changed - otherwise somebody else is
|
||||
writing to it.
|
||||
|
||||
Goals for next major release (0.5.0):
|
||||
-------------------------------------
|
||||
* Lock any original .gz files
|
||||
- is this necessary?
|
||||
* Check for symlink attacks for tempfiles (although we don't use /var/tmp)
|
||||
* Add a lot more unit tests. (see top of test_archivemail.py)
|
||||
|
||||
Longer Term goals:
|
||||
------------------
|
||||
|
@ -16,5 +22,5 @@ Longer Term goals:
|
|||
- is this a good idea?
|
||||
* Test for missing compression programs
|
||||
- is this a waste of time?
|
||||
* Add option - do not compress
|
||||
- is this useless?
|
||||
* IMAP support
|
||||
- is this outside our scope?
|
||||
|
|
141
archivemail.py
141
archivemail.py
|
@ -22,7 +22,7 @@ Website: http://archivemail.sourceforge.net/
|
|||
"""
|
||||
|
||||
# global administrivia
|
||||
__version__ = "archivemail v0.3.2"
|
||||
__version__ = "archivemail v0.4.0"
|
||||
__cvs_id__ = "$Id$"
|
||||
__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
|
||||
This is free software; see the source for copying conditions. There is NO
|
||||
|
@ -138,8 +138,12 @@ class Options:
|
|||
lockfile_attempts = 5
|
||||
lockfile_extension = ".lock"
|
||||
lockfile_sleep = 1
|
||||
no_compress = 0
|
||||
only_archive_read = 0
|
||||
output_dir = None
|
||||
preserve_unread = 0
|
||||
quiet = 0
|
||||
read_buffer_size = 8192
|
||||
script_name = os.path.basename(sys.argv[0])
|
||||
verbose = 0
|
||||
warn_duplicates = 0
|
||||
|
@ -156,16 +160,21 @@ class Options:
|
|||
|
||||
"""
|
||||
try:
|
||||
opts, args = getopt.getopt(args, '?Vd:hno:qs:v',
|
||||
["days=", "delete", "dry-run", "help",
|
||||
"output-dir=", "quiet", "suffix", "verbose",
|
||||
"version", "warn-duplicate"])
|
||||
opts, args = getopt.getopt(args, '?Vd:hno:qs:uv',
|
||||
["days=", "delete", "dry-run", "help",
|
||||
"preserve-unread", "no-compress", "output-dir=",
|
||||
"quiet", "suffix", "verbose", "version",
|
||||
"warn-duplicate"])
|
||||
except getopt.error, msg:
|
||||
user_error(msg)
|
||||
|
||||
for o, a in opts:
|
||||
if o == '--delete':
|
||||
self.delete_old_mail = 1
|
||||
if o in ('-u', '--preserve-unread'):
|
||||
self.preserve_unread = 1
|
||||
if o == '--no-compress':
|
||||
self.no_compress = 1
|
||||
if o == '--warn-duplicate':
|
||||
self.warn_duplicates = 1
|
||||
if o in ('-n', '--dry-run'):
|
||||
|
@ -210,10 +219,9 @@ class Mbox(mailbox.PortableUnixMailbox):
|
|||
"""Class that allows read/write access to a 'mbox' mailbox.
|
||||
Subclasses the mailbox.PortableUnixMailbox class.
|
||||
"""
|
||||
|
||||
mbox_file = None # file handle for the mbox file
|
||||
mbox_file_name = None # GzipFile has no .name variable
|
||||
mbox_file_closed = 0 # GzipFile has no .closed variable
|
||||
mbox_file_name = None # GzipFile class has no .name variable
|
||||
mbox_file_closed = 0 # GzipFile class has no .closed variable
|
||||
original_atime = None # last-accessed timestamp
|
||||
original_mtime = None # last-modified timestamp
|
||||
original_mode = None # file permissions to preserve
|
||||
|
@ -261,8 +269,9 @@ class Mbox(mailbox.PortableUnixMailbox):
|
|||
|
||||
# The following while loop is about twice as fast in
|
||||
# practice to 'self.mbox_file.writelines(msg.fp.readlines())'
|
||||
assert(options.read_buffer_size > 0)
|
||||
while 1:
|
||||
body = msg.fp.read(8192)
|
||||
body = msg.fp.read(options.read_buffer_size)
|
||||
if not body:
|
||||
break
|
||||
self.mbox_file.write(body)
|
||||
|
@ -395,7 +404,7 @@ class RetainMbox(Mbox):
|
|||
class ArchiveMbox(Mbox):
|
||||
"""Class for holding messages that will be archived from the original
|
||||
mailbox (ie. the messages that are considered 'old'). Extends the 'Mbox'
|
||||
class. This 'mbox' file starts off as a temporary file, extracted from any
|
||||
class. This 'mbox' file starts off as a temporary file, copied from any
|
||||
pre-existing archive. It will eventually overwrite the original archive
|
||||
mailbox if everything is OK.
|
||||
|
||||
|
@ -415,24 +424,45 @@ class ArchiveMbox(Mbox):
|
|||
|
||||
"""
|
||||
assert(final_name)
|
||||
if options.no_compress:
|
||||
self.__init_uncompressed(final_name)
|
||||
else:
|
||||
self.__init_compressed(final_name)
|
||||
self.__final_name = final_name
|
||||
|
||||
def __init_uncompressed(self, final_name):
|
||||
"""Used internally by __init__ when archives are uncompressed"""
|
||||
assert(final_name)
|
||||
compressed_archive = final_name + ".gz"
|
||||
if os.path.isfile(compressed_archive):
|
||||
unexpected_error("""There is already a file named '%s'!
|
||||
Have you been previously compressing this archive? You probably should
|
||||
uncompress it manually, and try running me again.""" % compressed_archive)
|
||||
temp_name = tempfile.mktemp("archivemail_archive")
|
||||
if os.path.isfile(final_name):
|
||||
vprint("file already exists that is named: %s" % final_name)
|
||||
shutil.copy2(final_name, temp_name)
|
||||
_stale.archive = temp_name
|
||||
self.mbox_file = open(temp_name, "a")
|
||||
self.mbox_file_name = temp_name
|
||||
|
||||
def __init_compressed(self, final_name):
|
||||
"""Used internally by __init__ when archives are compressed"""
|
||||
assert(final_name)
|
||||
compressed_filename = final_name + ".gz"
|
||||
|
||||
if os.path.isfile(final_name):
|
||||
unexpected_error("""There is already a file named '%s'!
|
||||
Have you been reading this archive? You probably should re-compress it
|
||||
manually, and try running me again.""" % final_name)
|
||||
|
||||
temp_name = tempfile.mktemp("archivemail_archive.gz")
|
||||
|
||||
if os.path.isfile(compressed_filename):
|
||||
vprint("file already exists that is named: %s" % \
|
||||
compressed_filename)
|
||||
shutil.copy2(compressed_filename, temp_name)
|
||||
|
||||
_stale.archive = temp_name
|
||||
self.mbox_file = gzip.GzipFile(temp_name, "a")
|
||||
self.mbox_file_name = temp_name
|
||||
self.__final_name = final_name
|
||||
|
||||
def finalise(self):
|
||||
"""Close the archive and rename this archive temporary file to the
|
||||
|
@ -442,22 +472,27 @@ manually, and try running me again.""" % final_name)
|
|||
"""
|
||||
assert(self.__final_name)
|
||||
self.close()
|
||||
compressed_final_name = self.__final_name + ".gz"
|
||||
final_name = self.__final_name
|
||||
if not options.no_compress:
|
||||
final_name = final_name + ".gz"
|
||||
vprint("renaming '%s' to '%s'" % (self.mbox_file_name,
|
||||
compressed_final_name))
|
||||
os.rename(self.mbox_file_name, compressed_final_name)
|
||||
final_name))
|
||||
os.rename(self.mbox_file_name, final_name)
|
||||
_stale.archive = None
|
||||
|
||||
|
||||
class IdentityCache:
|
||||
"""Class used to remember Message-IDs and warn if they are seen twice"""
|
||||
seen_ids = {}
|
||||
mailbox_name = None
|
||||
|
||||
def __init__(self, mailbox_name):
|
||||
"""Constructor: takes the mailbox name as an argument"""
|
||||
assert(mailbox_name)
|
||||
self.mailbox_name = mailbox_name
|
||||
|
||||
def warn_if_dupe(self, msg):
|
||||
"""Print a warning message if the message has already appeared"""
|
||||
assert(msg)
|
||||
message_id = msg.get('Message-ID')
|
||||
assert(message_id)
|
||||
|
@ -475,22 +510,24 @@ _stale = StaleFiles() # remember what we have to delete on abnormal exit
|
|||
def main(args = sys.argv[1:]):
|
||||
global _stale
|
||||
|
||||
# this usage message is longer than 24 lines -- bad idea?
|
||||
usage = """Usage: %s [options] mailbox [mailbox...]
|
||||
Moves old mail in mbox, MH or maildir-format mailboxes to an mbox-format
|
||||
mailbox compressed with gzip. This is useful for saving space and keeping your
|
||||
mailbox manageable.
|
||||
mailbox compressed with gzip.
|
||||
|
||||
Options are as follows:
|
||||
-d, --days=<days> archive messages older than <days> days (default: %d)
|
||||
-o, --output-dir=DIR directory where archive files go (default: current)
|
||||
-s, --suffix=NAME suffix for archive filename (default: '%s')
|
||||
-n, --dry-run don't write to anything - just show what would be done
|
||||
--delete delete rather than archive old mail (use with caution!)
|
||||
--warn-duplicate warn about duplicate Message-IDs in the same mailbox
|
||||
-v, --verbose report lots of extra debugging information
|
||||
-q, --quiet quiet mode - print no statistics (suitable for crontab)
|
||||
-V, --version display version information
|
||||
-h, --help display this message
|
||||
-d, --days=<days> archive messages older than <days> days (default: %d)
|
||||
-o, --output-dir=DIR directory to store archives (default: same as original)
|
||||
-s, --suffix=NAME suffix for archive filename (default: '%s')
|
||||
-n, --dry-run don't write to anything - just show what would be done
|
||||
-u, --preserve-unread never archive unread messages
|
||||
--delete delete rather than archive old mail (use with caution!)
|
||||
--no-compress do not compress archives with gzip
|
||||
--warn-duplicate warn about duplicate Message-IDs in the same mailbox
|
||||
-v, --verbose report lots of extra debugging information
|
||||
-q, --quiet quiet mode - print no statistics (suitable for crontab)
|
||||
-V, --version display version information
|
||||
-h, --help display this message
|
||||
|
||||
Example: %s linux-devel
|
||||
This will move all messages older than %s days to a 'mbox' mailbox called
|
||||
|
@ -610,7 +647,44 @@ def guess_delivery_time(message):
|
|||
vprint("using valid time found from '%s' last-modification time" % \
|
||||
file_name)
|
||||
return time_message
|
||||
|
||||
|
||||
|
||||
def is_unread(message):
|
||||
"""return true if the message is unread, false otherwise"""
|
||||
# MH and mbox mailboxes use the 'Status' header to indicate read status
|
||||
status = message.get('Status')
|
||||
if (status == 'RO') or (status == 'OR'):
|
||||
vprint("message has been read (status header='%s')" % status)
|
||||
return 0
|
||||
file_name = None
|
||||
try:
|
||||
file_name = message.fp.name
|
||||
except AttributeError:
|
||||
pass
|
||||
# maildir mailboxes use the filename suffix to indicate read status
|
||||
if file_name and re.search(":2,.*S.*$", file_name):
|
||||
vprint("message has been read (filename info has 'S')")
|
||||
return 0
|
||||
vprint("message is unread")
|
||||
return 1
|
||||
|
||||
|
||||
def should_archive(message):
|
||||
"""Return 1 if we should archive the message, 0 otherwise"""
|
||||
time_message = guess_delivery_time(message)
|
||||
old = is_too_old(time_message, options.days_old_max)
|
||||
# I could probably do this in one if statement, but then I wouldn't
|
||||
# understand it.
|
||||
if old:
|
||||
if options.preserve_unread:
|
||||
if is_unread(message):
|
||||
return 0
|
||||
else:
|
||||
return 1
|
||||
else:
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
def is_too_old(time_message, max_days):
|
||||
"""Return true if a message is too old (and should be archived),
|
||||
|
@ -724,8 +798,7 @@ def _archive_mbox(mailbox_name, final_archive_name):
|
|||
vprint("processing message '%s'" % msg.get('Message-ID'))
|
||||
if options.warn_duplicates:
|
||||
cache.warn_if_dupe(msg)
|
||||
time_message = guess_delivery_time(msg)
|
||||
if is_too_old(time_message, options.days_old_max):
|
||||
if should_archive(msg):
|
||||
stats.another_archived()
|
||||
if options.delete_old_mail:
|
||||
vprint("decision: delete message")
|
||||
|
@ -801,8 +874,7 @@ def _archive_dir(mailbox_name, final_archive_name, type):
|
|||
vprint("processing message '%s'" % msg.get('Message-ID'))
|
||||
if options.warn_duplicates:
|
||||
cache.warn_if_dupe(msg)
|
||||
time_message = guess_delivery_time(msg)
|
||||
if is_too_old(time_message, options.days_old_max):
|
||||
if should_archive(msg):
|
||||
stats.another_archived()
|
||||
if options.delete_old_mail:
|
||||
vprint("decision: delete message")
|
||||
|
@ -855,6 +927,7 @@ def choose_temp_dir(mailbox_name):
|
|||
|
||||
|
||||
def set_signal_handlers():
|
||||
"""set signal handlers to clean up temporary files on unexpected exit"""
|
||||
# Make sure we clean up nicely - we don't want to leave stale procmail
|
||||
# lockfiles about if something bad happens to us. This is quite
|
||||
# important, even though procmail will delete stale files after a while.
|
||||
|
|
|
@ -130,6 +130,23 @@ useful for testing to see how many messages would have been archived.
|
|||
</Para></ListItem>
|
||||
</VarListEntry>
|
||||
|
||||
<VarListEntry>
|
||||
<Term>
|
||||
<Option>-u, --preserve-unread</Option>
|
||||
</Term>
|
||||
<ListItem><Para>
|
||||
Do not archive any messages that have not yet been read. <command/archivemail/
|
||||
determines if a message in a <application/mbox/-format or
|
||||
<application/MH/-format mailbox has been read by looking at the
|
||||
<application/Status/ header (if it exists). If the status
|
||||
header is equal to 'RO' or 'OR' then <application/archivemail/ assumes the
|
||||
message has been read. <command/archivemail/ determines if a
|
||||
<application/maildir/ message has
|
||||
been read by looking at the filename. If the filename contains an 'S' after
|
||||
<filename>:2,</filename> then it assumes the message has been read.
|
||||
</Para></ListItem>
|
||||
</VarListEntry>
|
||||
|
||||
<VarListEntry>
|
||||
<Term>
|
||||
<Option>--delete</Option>
|
||||
|
@ -139,6 +156,15 @@ Delete rather than archive old mail. Use this option with caution!
|
|||
</Para></ListItem>
|
||||
</VarListEntry>
|
||||
|
||||
<VarListEntry>
|
||||
<Term>
|
||||
<Option>--no-compress</Option>
|
||||
</Term>
|
||||
<ListItem><Para>
|
||||
Do not compress any archives using &gzip;.
|
||||
</Para></ListItem>
|
||||
</VarListEntry>
|
||||
|
||||
<VarListEntry>
|
||||
<Term>
|
||||
<Option>--warn-duplicate</Option>
|
||||
|
@ -224,6 +250,12 @@ will use the last-modified file timestamp on <application/MH/ and
|
|||
<application/Maildir/ format mailboxes, or the date on the
|
||||
<application/From/ line on <application/mbox/-format mailboxes.
|
||||
</Para>
|
||||
|
||||
<Para>
|
||||
<Command/archivemail/ will refuse to operate on mailboxes that are symbolic
|
||||
links or create tempfiles or archives in world-writable directories.
|
||||
</Para>
|
||||
|
||||
</RefSect1>
|
||||
|
||||
<RefSect1>
|
||||
|
@ -246,6 +278,24 @@ are older than 30 days:
|
|||
</screen>
|
||||
</Para>
|
||||
|
||||
<Para>
|
||||
To archive all read messages in the mailbox <filename>incoming</filename> that
|
||||
are older than 180 days to a compressed mailbox called
|
||||
<filename>incoming_archive.gz</filename> in the current directory:
|
||||
<screen>
|
||||
<prompt>bash$ </prompt><userinput>archivemail --preserve-unread incoming</userinput>
|
||||
</screen>
|
||||
</Para>
|
||||
|
||||
<Para>
|
||||
To archive all messages in the mailbox <filename>received</filename> that
|
||||
are older than 180 days to an uncompressed mailbox called
|
||||
<filename>received_archive</filename> in the current directory:
|
||||
<screen>
|
||||
<prompt>bash$ </prompt><userinput>archivemail --no-compress received</userinput>
|
||||
</screen>
|
||||
</Para>
|
||||
|
||||
<Para>
|
||||
To archive all mailboxes in the directory <filename>$HOME/Mail</filename>
|
||||
that are older than 90 days to compressed mailboxes in the
|
||||
|
|
6
setup.py
6
setup.py
|
@ -14,13 +14,11 @@ def check_python_version():
|
|||
print too_old_error
|
||||
sys.exit(1)
|
||||
|
||||
check_python_version() # define & run this early because 'distutils.core' is new
|
||||
|
||||
|
||||
check_python_version() # define & run this early - 'distutils.core' is new
|
||||
from distutils.core import setup
|
||||
|
||||
setup(name="archivemail",
|
||||
version="0.3.2",
|
||||
version="0.4.0",
|
||||
description="archive and compress old email",
|
||||
platforms="POSIX",
|
||||
license="GNU GPL",
|
||||
|
|
|
@ -26,10 +26,26 @@ TODO: add tests for:
|
|||
* archiving maildir-format mailboxes
|
||||
* archiving MH-format mailboxes
|
||||
* appending to mbox archive files already existing
|
||||
* add tests where we run archivemail via os.system()
|
||||
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
def check_python_version():
|
||||
"""Abort if we are running on python < v2.0"""
|
||||
too_old_error = "This program requires python v2.0 or greater."
|
||||
try:
|
||||
version = sys.version_info # we might not even have this function! :)
|
||||
if (version[0] < 2):
|
||||
print too_old_error
|
||||
sys.exit(1)
|
||||
except AttributeError:
|
||||
print too_old_error
|
||||
sys.exit(1)
|
||||
|
||||
check_python_version() # define & run this early because 'unittest' is new
|
||||
|
||||
import fcntl
|
||||
import filecmp
|
||||
import os
|
||||
|
@ -222,6 +238,9 @@ class TestOptionDefaults(unittest.TestCase):
|
|||
"""we should not delete old mail by default"""
|
||||
self.assertEqual(archivemail.options.quiet, 0)
|
||||
|
||||
def testNoCompress(self):
|
||||
"""no-compression should be off by default"""
|
||||
self.assertEqual(archivemail.options.no_compress, 0)
|
||||
|
||||
########## archivemail.is_too_old() unit testing #################
|
||||
|
||||
|
@ -300,7 +319,7 @@ class TestChooseTempDir(unittest.TestCase):
|
|||
os.rmdir(self.sub_dir)
|
||||
|
||||
|
||||
########## proper archival testing ###########
|
||||
########## acceptance testing ###########
|
||||
|
||||
class TestArchiveMboxTimestampNew(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -349,6 +368,7 @@ class TestArchiveMboxTimestampMixed(unittest.TestCase):
|
|||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
|
||||
|
||||
class TestArchiveMboxTimestampOld(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
|
@ -373,6 +393,43 @@ class TestArchiveMboxTimestampOld(unittest.TestCase):
|
|||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
|
||||
class TestArchiveMboxExistingArchive(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.copy_name = tempfile.mktemp()
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
shutil.copyfile(self.mbox_name, archive_name) # archive has 3 msgs
|
||||
append_file(self.mbox_name, self.copy_name) # copy now has 6 msgs
|
||||
self.assertEqual(os.system("gzip %s" % archive_name), 0)
|
||||
assert(os.path.exists(archive_name + ".gz"))
|
||||
assert(not os.path.exists(archive_name))
|
||||
archivemail.options.quiet = 1
|
||||
|
||||
def testArchiveOldGzip(self):
|
||||
"""archiving an old mailbox with gzip should create a valid archive"""
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
|
||||
def tearDown(self):
|
||||
archive = self.mbox_name + "_archive"
|
||||
for name in (self.mbox_name, self.copy_name, archive, archive + ".gz"):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
|
||||
|
||||
class TestArchiveMboxOld(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -392,7 +449,7 @@ class TestArchiveMboxOld(unittest.TestCase):
|
|||
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
os.system("gzip -d " + archive_name)
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
|
@ -422,7 +479,7 @@ class TestArchiveMboxMixed(unittest.TestCase):
|
|||
assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0))
|
||||
archive_name = self.mixed_mbox + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
os.system("gzip -d " + archive_name)
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mixed_mbox + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
|
||||
|
@ -461,9 +518,180 @@ class TestArchiveMboxNew(unittest.TestCase):
|
|||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
|
||||
#
|
||||
|
||||
##########################################################################
|
||||
# make sure the --preserve-unread option works
|
||||
##########################################################################
|
||||
|
||||
class TestArchiveMboxPreserveStatus(unittest.TestCase):
|
||||
def setUp(self):
|
||||
archivemail.options.quiet = 1
|
||||
archivemail.options.preserve_unread = 1
|
||||
|
||||
def testOldRead(self):
|
||||
"""archiving an old read mailbox should create an archive"""
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181), \
|
||||
status="RO")
|
||||
self.copy_name = tempfile.mktemp()
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(os.path.exists(archive_name))
|
||||
self.assertEqual(os.system("gzip -d %s" % archive_name), 0)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
|
||||
def testOldUnread(self):
|
||||
"""archiving an unread mailbox should not create an archive"""
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
self.copy_name = tempfile.mktemp()
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
archive_name = self.mbox_name + "_archive.gz"
|
||||
assert(not os.path.exists(archive_name))
|
||||
|
||||
def tearDown(self):
|
||||
archive = self.mbox_name + "_archive"
|
||||
for name in (self.mbox_name, self.copy_name, archive, archive + ".gz"):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
archivemail.options.preserve_unread = 0
|
||||
|
||||
|
||||
##########################################################################
|
||||
# make sure that the --no-compress option works
|
||||
##########################################################################
|
||||
|
||||
class TestArchiveMboxUncompressedOld(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.copy_name = tempfile.mktemp()
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
archivemail.options.quiet = 1
|
||||
archivemail.options.no_compress = 1
|
||||
|
||||
def testArchiveUncompressedOld(self):
|
||||
"""archiving an old mailbox uncompressed should create an ok archive"""
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def tearDown(self):
|
||||
archive = self.mbox_name + "_archive"
|
||||
for name in (self.mbox_name, self.copy_name, archive):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
archivemail.options.no_compress = 0
|
||||
|
||||
|
||||
class TestArchiveMboxUncompressedNew(unittest.TestCase):
|
||||
def setUp(self):
|
||||
archivemail.options.quiet = 1
|
||||
archivemail.options.no_compress = 1
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 179))
|
||||
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.copy_name = tempfile.mktemp()
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
|
||||
def testArchiveNew(self):
|
||||
"""archiving a new mailbox uncompressed should not create an archive"""
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
assert(filecmp.cmp(self.mbox_name, self.copy_name, shallow=0))
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(not os.path.exists(archive_name))
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def tearDown(self):
|
||||
archivemail.options.no_compress = 0
|
||||
archivemail.options.quiet = 0
|
||||
for name in (self.mbox_name, self.copy_name):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
|
||||
|
||||
class TestArchiveMboxUncompressedMixed(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.new_mbox = make_mbox(messages=3, hours_old=(24 * 179))
|
||||
self.old_mbox = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
self.mixed_mbox = tempfile.mktemp()
|
||||
shutil.copyfile(self.new_mbox, self.mixed_mbox)
|
||||
append_file(self.old_mbox, self.mixed_mbox)
|
||||
archivemail.options.quiet = 1
|
||||
archivemail.options.no_compress = 1
|
||||
|
||||
def testArchiveMixed(self):
|
||||
"""archiving a mixed mailbox should make an archive"""
|
||||
archivemail.archive(self.mixed_mbox)
|
||||
assert(os.path.exists(self.mixed_mbox))
|
||||
assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0))
|
||||
archive_name = self.mixed_mbox + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def tearDown(self):
|
||||
archive = self.mixed_mbox + "_archive"
|
||||
for name in (self.mixed_mbox, self.old_mbox, self.new_mbox, archive):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
|
||||
class TestArchiveMboxOldExistingUncompressed(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mbox_name = make_mbox(messages=3, hours_old=(24 * 181))
|
||||
self.mbox_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.copy_name = tempfile.mktemp()
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
shutil.copyfile(self.mbox_name, self.copy_name)
|
||||
shutil.copyfile(self.mbox_name, archive_name) # archive has 3 msgs
|
||||
append_file(self.mbox_name, self.copy_name) # copy now has 6 msgs
|
||||
archivemail.options.quiet = 1
|
||||
archivemail.options.no_compress = 1
|
||||
|
||||
def testArchiveOldGzip(self):
|
||||
"""archiving an old mailbox without compressing with an existing archive"""
|
||||
archivemail.archive(self.mbox_name)
|
||||
assert(os.path.exists(self.mbox_name))
|
||||
self.assertEqual(os.path.getsize(self.mbox_name), 0)
|
||||
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
|
||||
self.assertEqual(self.mbox_mode, new_mode)
|
||||
archive_name = self.mbox_name + "_archive"
|
||||
assert(os.path.exists(archive_name))
|
||||
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
|
||||
assert(not os.path.exists(archive_name + ".gz"))
|
||||
|
||||
def tearDown(self):
|
||||
archive = self.mbox_name + "_archive"
|
||||
for name in (self.mbox_name, self.copy_name, archive):
|
||||
if os.path.exists(name):
|
||||
os.remove(name)
|
||||
archivemail.options.quiet = 0
|
||||
archivemail.options.no_compress = 0
|
||||
|
||||
|
||||
##########################################################################
|
||||
# Test the file mode (permissions) of the original mailbox after archival
|
||||
#
|
||||
##########################################################################
|
||||
|
||||
class TestArchiveMboxMode(unittest.TestCase):
|
||||
def setUp(self):
|
||||
archivemail.options.quiet = 1
|
||||
|
@ -504,21 +732,29 @@ class TestArchiveMboxMode(unittest.TestCase):
|
|||
|
||||
########## helper routines ############
|
||||
|
||||
def make_message(hours_old=0):
|
||||
def make_message(hours_old=0, status=None):
|
||||
time_message = time.time() - (60 * 60 * hours_old)
|
||||
time_string = time.asctime(time.localtime(time_message))
|
||||
|
||||
return """From sender@domain %s
|
||||
msg = """From sender@domain %s
|
||||
From: sender@domain
|
||||
To: receipient@domain
|
||||
Subject: This is a dummy message
|
||||
Date: %s
|
||||
""" % (time_string, time_string)
|
||||
|
||||
if status:
|
||||
msg = msg + ("Status: %s\n" % status)
|
||||
|
||||
msg = msg + """
|
||||
|
||||
This is the message body.
|
||||
It's very exciting.
|
||||
|
||||
|
||||
""" % (time_string, time_string)
|
||||
"""
|
||||
return msg
|
||||
|
||||
|
||||
def append_file(source, dest):
|
||||
"""appends the file named 'source' to the file named 'dest'"""
|
||||
|
@ -530,11 +766,11 @@ def append_file(source, dest):
|
|||
read.close()
|
||||
write.close()
|
||||
|
||||
def make_mbox(messages=1, hours_old=0):
|
||||
def make_mbox(messages=1, hours_old=0, status=None):
|
||||
name = tempfile.mktemp()
|
||||
file = open(name, "w")
|
||||
for count in range(messages):
|
||||
file.write(make_message(hours_old=hours_old))
|
||||
file.write(make_message(hours_old=hours_old, status=status))
|
||||
file.close()
|
||||
return name
|
||||
|
||||
|
@ -543,6 +779,5 @@ def is_world_readable(path):
|
|||
assert(path)
|
||||
return (os.stat(path)[stat.ST_MODE] & stat.S_IROTH)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
Loading…
Add table
Reference in a new issue