Stopped calling gzip externally and started using the 'gzip' module

directly. Removed bzip2 and compress support since they were complicating
things and you don't really need them much anyway.
This commit is contained in:
Paul Rodger 2002-04-12 05:31:59 +00:00
parent bd7e3c7225
commit 9e534977ef
5 changed files with 64 additions and 181 deletions

View File

@ -1,3 +1,11 @@
Version 0.3.1 - ????
* Stopped calling 'gzip' externally and started using the gzip library
so that we can append to a copy of the gzip archive directly.
* Removed 'bzip2' and 'compress' options since they are increasing
complexity without adding much, and needed to be called externally.
Maybe when we get a bzip2 library I will add back an option to
compression archives using bzip2.
Version 0.3.0 - 11 April 2002 Version 0.3.0 - 11 April 2002
* We now preserve the last-accessed and last-modified timestamps correctly * We now preserve the last-accessed and last-modified timestamps correctly
* We now preserve the correct permissions on the original mailbox instead * We now preserve the correct permissions on the original mailbox instead

5
TODO
View File

@ -1,5 +1,5 @@
Goals for next minor release (0.3.1): Goals for next minor release (0.3.2):
------------------------------------- -------------------------------------
* Finish docbook sgml documentation & man page * Finish docbook sgml documentation & man page
@ -8,10 +8,10 @@ Goals for next major release (0.4.0):
* Lock any original .gz files * Lock any original .gz files
- is this necessary? - is this necessary?
* Check for symlink attacks for tempfiles (although we don't use /var/tmp) * Check for symlink attacks for tempfiles (although we don't use /var/tmp)
* Add a lot more unit test. (see top of test_archivemail.py)
Longer Term goals: Longer Term goals:
------------------ ------------------
* Use zlib instead of calling gzip directly
* Add MMDF mailbox support * Add MMDF mailbox support
* Add Babyl mailbox support * Add Babyl mailbox support
* Add option to archive depending on mailbox size threshold * Add option to archive depending on mailbox size threshold
@ -22,4 +22,3 @@ Longer Term goals:
- is this a waste of time? - is this a waste of time?
* Add option - do not compress * Add option - do not compress
- is this useless? - is this useless?
* Use a Makefile/python installation process

View File

@ -22,7 +22,7 @@ Website: http://archivemail.sourceforge.net/
""" """
# global administrivia # global administrivia
__version__ = "archivemail v0.3.0" __version__ = "archivemail v0.3.1"
__cvs_id__ = "$Id$" __cvs_id__ = "$Id$"
__copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com> __copyright__ = """Copyright (C) 2002 Paul Rodger <paul@paulrodger.com>
This is free software; see the source for copying conditions. There is NO This is free software; see the source for copying conditions. There is NO
@ -48,10 +48,12 @@ check_python_version() # define & run this early because 'atexit' is new
import atexit import atexit
import fcntl import fcntl
import getopt import getopt
import gzip
import mailbox import mailbox
import os import os
import re import re
import rfc822 import rfc822
import shutil
import signal import signal
import stat import stat
import string import string
@ -81,8 +83,7 @@ class Stats:
assert(final_archive_name) assert(final_archive_name)
self.__start_time = time.time() self.__start_time = time.time()
self.__mailbox_name = mailbox_name self.__mailbox_name = mailbox_name
self.__archive_name = final_archive_name + \ self.__archive_name = final_archive_name + ".gz"
options.compressor_extension()
def another_message(self): def another_message(self):
"""Add one to the internal count of total messages processed""" """Add one to the internal count of total messages processed"""
@ -109,7 +110,6 @@ class Stats:
class StaleFiles: class StaleFiles:
"""Class to keep track of files to be deleted on abnormal exit""" """Class to keep track of files to be deleted on abnormal exit"""
archive = None # tempfile for messages to be archived archive = None # tempfile for messages to be archived
compressed_archive = None # compressed version of the above
procmail_lock = None # original_mailbox.lock procmail_lock = None # original_mailbox.lock
retain = None # tempfile for messages to be retained retain = None # tempfile for messages to be retained
@ -127,17 +127,11 @@ class StaleFiles:
vprint("removing stale archive file '%s'" % self.archive) vprint("removing stale archive file '%s'" % self.archive)
try: os.remove(self.archive) try: os.remove(self.archive)
except (IOError, OSError): pass except (IOError, OSError): pass
if self.compressed_archive:
vprint("removing stale compressed archive file '%s'" %
self.compressed_archive)
try: os.remove(self.compressed_archive)
except (IOError, OSError): pass
class Options: class Options:
"""Class to store runtime options, including defaults""" """Class to store runtime options, including defaults"""
archive_suffix = "_archive" archive_suffix = "_archive"
compressor = "gzip"
days_old_max = 180 days_old_max = 180
delete_old_mail = 0 delete_old_mail = 0
dry_run = 0 dry_run = 0
@ -162,15 +156,13 @@ class Options:
""" """
try: try:
opts, args = getopt.getopt(args, '?IVZd:hno:qs:vz', opts, args = getopt.getopt(args, '?Vd:hno:qs:v',
["bzip2", "compress", "days=", "delete", ["days=", "delete", "dry-run", "help",
"dry-run", "gzip", "help", "output-dir=", "output-dir=", "quiet", "suffix", "verbose",
"quiet", "suffix", "verbose", "version", "version", "warn-duplicate"])
"warn-duplicate"])
except getopt.error, msg: except getopt.error, msg:
user_error(msg) user_error(msg)
chosen_compressor = None
for o, a in opts: for o, a in opts:
if o == '--delete': if o == '--delete':
self.delete_old_mail = 1 self.delete_old_mail = 1
@ -194,23 +186,6 @@ class Options:
if o in ('-V', '--version'): if o in ('-V', '--version'):
print __version__ + "\n\n" + __copyright__ print __version__ + "\n\n" + __copyright__
sys.exit(0) sys.exit(0)
if o in ('-z', '--gzip'):
if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "gzip"
chosen_compressor = 1
if o in ('-Z', '--compress'):
if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "compress"
chosen_compressor = 1
if o in ('-I', '--bzip2'):
if (chosen_compressor):
user_error("conflicting compression options")
self.compressor = "bzip2"
chosen_compressor = 1
if not self.compressor:
self.compressor = "gzip"
return args return args
def sanity_check(self): def sanity_check(self):
@ -230,15 +205,6 @@ class Options:
if (self.days_old_max >= 10000): if (self.days_old_max >= 10000):
user_error("argument to -d must be less than 10000") user_error("argument to -d must be less than 10000")
def compressor_extension(self):
extensions = {
"compress" : ".Z",
"gzip" : ".gz",
"bzip2" : ".bz2",
}
return extensions[self.compressor]
class Mbox(mailbox.PortableUnixMailbox): class Mbox(mailbox.PortableUnixMailbox):
"""Class that allows read/write access to a 'mbox' mailbox. """Class that allows read/write access to a 'mbox' mailbox.
@ -246,6 +212,8 @@ class Mbox(mailbox.PortableUnixMailbox):
""" """
mbox_file = None # file handle for the mbox file mbox_file = None # file handle for the mbox file
mbox_file_name = None # GzipFile has no .name variable
mbox_file_closed = 0 # GzipFile has no .closed variable
original_atime = None # last-accessed timestamp original_atime = None # last-accessed timestamp
original_mtime = None # last-modified timestamp original_mtime = None # last-modified timestamp
original_mode = None # file permissions to preserve original_mode = None # file permissions to preserve
@ -267,6 +235,7 @@ class Mbox(mailbox.PortableUnixMailbox):
self.mbox_file = open(path, mode) self.mbox_file = open(path, mode)
except IOError, msg: except IOError, msg:
unexpected_error(msg) unexpected_error(msg)
self.mbox_file_name = path
mailbox.PortableUnixMailbox.__init__(self, self.mbox_file) mailbox.PortableUnixMailbox.__init__(self, self.mbox_file)
def write(self, msg): def write(self, msg):
@ -281,7 +250,7 @@ class Mbox(mailbox.PortableUnixMailbox):
assert(msg) assert(msg)
assert(self.mbox_file) assert(self.mbox_file)
vprint("saving message to file '%s'" % self.mbox_file.name) vprint("saving message to file '%s'" % self.mbox_file_name)
unix_from = msg.unixfrom unix_from = msg.unixfrom
if not unix_from: if not unix_from:
unix_from = make_mbox_from(msg) unix_from = make_mbox_from(msg)
@ -300,44 +269,45 @@ class Mbox(mailbox.PortableUnixMailbox):
def remove(self): def remove(self):
"""Close and delete the 'mbox' mailbox file""" """Close and delete the 'mbox' mailbox file"""
file_name = self.mbox_file.name file_name = self.mbox_file_name
self.close() self.close()
vprint("removing file '%s'" % self.mbox_file.name) vprint("removing file '%s'" % self.mbox_file_name)
os.remove(file_name) os.remove(file_name)
def is_empty(self): def is_empty(self):
"""Return true if the 'mbox' file is empty, false otherwise""" """Return true if the 'mbox' file is empty, false otherwise"""
return (os.path.getsize(self.mbox_file.name) == 0) return (os.path.getsize(self.mbox_file_name) == 0)
def close(self): def close(self):
"""Close the mbox file""" """Close the mbox file"""
if not self.mbox_file.closed: if not self.mbox_file_closed:
vprint("closing file '%s'" % self.mbox_file.name) vprint("closing file '%s'" % self.mbox_file_name)
self.mbox_file.close() self.mbox_file.close()
self.mbox_file_closed = 1
def reset_stat(self): def reset_stat(self):
"""Set the file timestamps and mode to the original value""" """Set the file timestamps and mode to the original value"""
assert(self.original_atime) assert(self.original_atime)
assert(self.original_mtime) assert(self.original_mtime)
assert(self.mbox_file.name) assert(self.mbox_file_name)
assert(self.original_mode) # I doubt this will be 000? assert(self.original_mode) # I doubt this will be 000?
os.utime(self.mbox_file.name, (self.original_atime, \ os.utime(self.mbox_file_name, (self.original_atime, \
self.original_mtime)) self.original_mtime))
os.chmod(self.mbox_file.name, self.original_mode) os.chmod(self.mbox_file_name, self.original_mode)
def exclusive_lock(self): def exclusive_lock(self):
"""Set an advisory lock on the 'mbox' mailbox""" """Set an advisory lock on the 'mbox' mailbox"""
vprint("obtaining exclusive lock on file '%s'" % self.mbox_file.name) vprint("obtaining exclusive lock on file '%s'" % self.mbox_file_name)
fcntl.flock(self.mbox_file, fcntl.LOCK_EX) fcntl.flock(self.mbox_file, fcntl.LOCK_EX)
def exclusive_unlock(self): def exclusive_unlock(self):
"""Unset any advisory lock on the 'mbox' mailbox""" """Unset any advisory lock on the 'mbox' mailbox"""
vprint("dropping exclusive lock on file '%s'" % self.mbox_file.name) vprint("dropping exclusive lock on file '%s'" % self.mbox_file_name)
fcntl.flock(self.mbox_file, fcntl.LOCK_UN) fcntl.flock(self.mbox_file, fcntl.LOCK_UN)
def procmail_lock(self): def procmail_lock(self):
"""Create a procmail lockfile on the 'mbox' mailbox""" """Create a procmail lockfile on the 'mbox' mailbox"""
lock_name = self.mbox_file.name + options.lockfile_extension lock_name = self.mbox_file_name + options.lockfile_extension
attempt = 0 attempt = 0
while os.path.isfile(lock_name): while os.path.isfile(lock_name):
vprint("lockfile '%s' exists - sleeping..." % lock_name) vprint("lockfile '%s' exists - sleeping..." % lock_name)
@ -355,8 +325,8 @@ class Mbox(mailbox.PortableUnixMailbox):
def procmail_unlock(self): def procmail_unlock(self):
"""Delete the procmail lockfile on the 'mbox' mailbox""" """Delete the procmail lockfile on the 'mbox' mailbox"""
assert(self.mbox_file.name) assert(self.mbox_file_name)
lock_name = self.mbox_file.name + options.lockfile_extension lock_name = self.mbox_file_name + options.lockfile_extension
vprint("removing lockfile '%s'" % lock_name) vprint("removing lockfile '%s'" % lock_name)
os.remove(lock_name) os.remove(lock_name)
_stale.procmail_lock = None _stale.procmail_lock = None
@ -367,10 +337,10 @@ class Mbox(mailbox.PortableUnixMailbox):
This will leave a zero-length mailbox file so that mail This will leave a zero-length mailbox file so that mail
reading programs don't get upset that the mailbox has been reading programs don't get upset that the mailbox has been
completely deleted.""" completely deleted."""
assert(os.path.isfile(self.mbox_file.name)) assert(os.path.isfile(self.mbox_file_name))
vprint("turning '%s' into a zero-length file" % self.mbox_file.name) vprint("turning '%s' into a zero-length file" % self.mbox_file_name)
mtime = os.path.getmtime(self.mbox_file.name) mtime = os.path.getmtime(self.mbox_file_name)
blank_file = open(self.mbox_file.name, "w") blank_file = open(self.mbox_file_name, "w")
blank_file.close() blank_file.close()
@ -394,8 +364,9 @@ class RetainMbox(Mbox):
assert(final_name) assert(final_name)
temp_name = tempfile.mktemp("archivemail_retain") temp_name = tempfile.mktemp("archivemail_retain")
self.mbox_file = open(temp_name, "w") self.mbox_file = open(temp_name, "w")
self.mbox_file_name = temp_name
_stale.retain = temp_name _stale.retain = temp_name
vprint("opened temporary retain file '%s'" % self.mbox_file.name) vprint("opened temporary retain file '%s'" % self.mbox_file_name)
self.__final_name = final_name self.__final_name = final_name
def finalise(self): def finalise(self):
@ -408,10 +379,10 @@ class RetainMbox(Mbox):
atime = os.path.getatime(self.__final_name) atime = os.path.getatime(self.__final_name)
mtime = os.path.getmtime(self.__final_name) mtime = os.path.getmtime(self.__final_name)
mode = os.stat(self.__final_name)[stat.ST_MODE] mode = os.stat(self.__final_name)[stat.ST_MODE]
os.chmod(self.mbox_file.name, mode) os.chmod(self.mbox_file_name, mode)
vprint("renaming '%s' to '%s'" % (self.mbox_file.name, self.__final_name)) vprint("renaming '%s' to '%s'" % (self.mbox_file_name, self.__final_name))
os.rename(self.mbox_file.name, self.__final_name) os.rename(self.mbox_file_name, self.__final_name)
os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps os.utime(self.__final_name, (atime, mtime)) # reset to original timestamps
_stale.retain = None _stale.retain = None
@ -432,62 +403,50 @@ class ArchiveMbox(Mbox):
__final_name = None __final_name = None
def __init__(self, final_name): def __init__(self, final_name):
"""Constructor -- extract any pre-existing compressed archive to a """Constructor -- copy any pre-existing compressed archive to a
temporary file which we use as the new 'mbox' archive for this temporary file which we use as the new 'mbox' archive for this
mailbox. mailbox.
Arguments: Arguments:
final_name -- the final name for this archive mailbox. This function final_name -- the final name for this archive mailbox. This function
will check to see if the filename already exists, and will check to see if the filename already exists, and
extract it to a temporary file if it does. It will also copy it to a temporary file if it does. It will also
rename itself to this name when we call finalise() rename itself to this name when we call finalise()
""" """
assert(final_name) assert(final_name)
compressor = options.compressor compressed_filename = final_name + ".gz"
compressedfilename = final_name + options.compressor_extension()
if os.path.isfile(final_name): if os.path.isfile(final_name):
unexpected_error("""There is already a file named '%s'! unexpected_error("""There is already a file named '%s'!
Have you been reading this archive? You probably should re-compress it Have you been reading this archive? You probably should re-compress it
manually, and try running me again.""" % final_name) manually, and try running me again.""" % final_name)
temp_name = tempfile.mktemp("archivemail_archive") temp_name = tempfile.mktemp("archivemail_archive.gz")
if os.path.isfile(compressedfilename): if os.path.isfile(compressed_filename):
vprint("file already exists that is named: %s" % compressedfilename) vprint("file already exists that is named: %s" % \
uncompress = "%s -d -c %s > %s" % (compressor, compressed_filename)
compressedfilename, temp_name) shutil.copy2(compressed_filename, temp_name)
vprint("running uncompressor: %s" % uncompress)
_stale.archive = temp_name
system_or_die(uncompress)
_stale.archive = temp_name _stale.archive = temp_name
self.mbox_file = open(temp_name, "a") self.mbox_file = gzip.GzipFile(temp_name, "a")
self.mbox_file_name = temp_name
self.__final_name = final_name self.__final_name = final_name
def finalise(self): def finalise(self):
"""Compress the archive and rename this archive temporary file to the """Close the archive and rename this archive temporary file to the
final archive filename, overwriting any pre-existing archive if it final archive filename, overwriting any pre-existing archive if it
exists. exists.
""" """
assert(self.__final_name) assert(self.__final_name)
self.close() self.close()
compressor = options.compressor compressed_final_name = self.__final_name + ".gz"
compressed_archive_name = self.mbox_file.name + \ vprint("renaming '%s' to '%s'" % (self.mbox_file_name,
options.compressor_extension()
compress = compressor + " " + self.mbox_file.name
vprint("running compressor: '%s'" % compress)
_stale.compressed_archive = compressed_archive_name
system_or_die(compress)
_stale.archive = None
compressed_final_name = self.__final_name + \
options.compressor_extension()
vprint("renaming '%s' to '%s'" % (compressed_archive_name,
compressed_final_name)) compressed_final_name))
os.rename(compressed_archive_name, compressed_final_name) os.rename(self.mbox_file_name, compressed_final_name)
_stale.compressed_archive = None _stale.archive = None
class IdentityCache: class IdentityCache:
@ -517,7 +476,7 @@ def main(args = sys.argv[1:]):
global _stale global _stale
usage = """Usage: %s [options] mailbox [mailbox...] usage = """Usage: %s [options] mailbox [mailbox...]
Moves old mail messages in mbox or maildir-format mailboxes to compressed Moves old mail messages in mbox or maildir-format mailboxes to gzipped
'mbox' mailbox archives. This is useful for saving space and keeping your 'mbox' mailbox archives. This is useful for saving space and keeping your
mailbox manageable. mailbox manageable.
@ -526,9 +485,6 @@ Options are as follows:
-o, --output-dir=DIR directory where archive files go (default: current) -o, --output-dir=DIR directory where archive files go (default: current)
-s, --suffix=NAME suffix for archive filename (default: '%s') -s, --suffix=NAME suffix for archive filename (default: '%s')
-n, --dry-run don't write to anything - just show what would be done -n, --dry-run don't write to anything - just show what would be done
-z, --gzip compress the archive(s) using gzip (default)
-I, --bzip2 compress the archive(s) using bzip2
-Z, --compress compress the archive(s) using compress
--delete delete rather than archive old mail (use with caution!) --delete delete rather than archive old mail (use with caution!)
--warn-duplicate warn about duplicate Message-IDs in the same mailbox --warn-duplicate warn about duplicate Message-IDs in the same mailbox
-v, --verbose report lots of extra debugging information -v, --verbose report lots of extra debugging information
@ -934,15 +890,6 @@ def is_world_writable(path):
return (os.stat(path)[stat.ST_MODE] & stat.S_IWOTH) return (os.stat(path)[stat.ST_MODE] & stat.S_IWOTH)
def system_or_die(command):
"""Run the command with os.system(), aborting on non-zero exit"""
assert(command)
rv = os.system(command)
if (rv != 0):
status = os.WEXITSTATUS(rv)
unexpected_error("command '%s' returned status %d" % (command, status))
# this is where it all happens, folks # this is where it all happens, folks
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -20,7 +20,7 @@ check_python_version() # define & run this early because 'distutils.core' is ne
from distutils.core import setup from distutils.core import setup
setup(name="archivemail", setup(name="archivemail",
version="0.3.0", version="0.3.1",
description="archive and compress old email", description="archive and compress old email",
platforms="POSIX", platforms="POSIX",
license="GNU GPL", license="GNU GPL",

View File

@ -20,9 +20,6 @@
""" """
Unit-test archivemail using 'PyUnit'. Unit-test archivemail using 'PyUnit'.
You will need all three of 'gzip', 'bzip2' and 'compress' in your path to
(hopefully) pass all tests.
TODO: add tests for: TODO: add tests for:
* procmail locks already existing * procmail locks already existing
* messages with corrupted date headers * messages with corrupted date headers
@ -208,10 +205,6 @@ class TestMboxWrite(unittest.TestCase):
class TestOptionDefaults(unittest.TestCase): class TestOptionDefaults(unittest.TestCase):
def testCompressor(self):
"""gzip should be default compressor"""
self.assertEqual(archivemail.options.compressor, "gzip")
def testVerbose(self): def testVerbose(self):
"""verbose should be off by default""" """verbose should be off by default"""
self.assertEqual(archivemail.options.verbose, 0) self.assertEqual(archivemail.options.verbose, 0)
@ -318,7 +311,6 @@ class TestArchiveMboxTimestampNew(unittest.TestCase):
def testTime(self): def testTime(self):
"""mbox timestamps should not change after no archival""" """mbox timestamps should not change after no archival"""
archivemail.options.compressor = "gzip"
archivemail.archive(self.mbox_name) archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name)) assert(os.path.exists(self.mbox_name))
new_atime = os.path.getatime(self.mbox_name) new_atime = os.path.getatime(self.mbox_name)
@ -342,7 +334,6 @@ class TestArchiveMboxTimestampMixed(unittest.TestCase):
def testTime(self): def testTime(self):
"""mbox timestamps should not change after semi-archival""" """mbox timestamps should not change after semi-archival"""
archivemail.options.compressor = "gzip"
archive_name = self.mbox_name + "_archive.gz" archive_name = self.mbox_name + "_archive.gz"
archivemail.archive(self.mbox_name) archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name)) assert(os.path.exists(self.mbox_name))
@ -367,7 +358,6 @@ class TestArchiveMboxTimestampOld(unittest.TestCase):
def testTime(self): def testTime(self):
"""mbox timestamps should not change after archival""" """mbox timestamps should not change after archival"""
archivemail.options.compressor = "gzip"
archive_name = self.mbox_name + "_archive.gz" archive_name = self.mbox_name + "_archive.gz"
archivemail.archive(self.mbox_name) archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name)) assert(os.path.exists(self.mbox_name))
@ -393,7 +383,6 @@ class TestArchiveMboxOld(unittest.TestCase):
def testArchiveOldGzip(self): def testArchiveOldGzip(self):
"""archiving an old mailbox with gzip should create a valid archive""" """archiving an old mailbox with gzip should create a valid archive"""
archivemail.options.compressor = "gzip"
archivemail.archive(self.mbox_name) archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name)) assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0) self.assertEqual(os.path.getsize(self.mbox_name), 0)
@ -408,42 +397,9 @@ class TestArchiveMboxOld(unittest.TestCase):
assert(os.path.exists(archive_name)) assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0)) assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
def testArchiveOldBzip2(self):
"""archiving an old mailbox with bzip2 should create a valid archive"""
archivemail.options.compressor = "bzip2"
archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive.bz2"
assert(os.path.exists(archive_name))
os.system("bzip2 -d " + archive_name)
archive_name = self.mbox_name + "_archive"
assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
def testArchiveOldCompress(self):
"""archiving a mixed mailbox with compress should make an archive"""
archivemail.options.compressor = "compress"
archivemail.archive(self.mbox_name)
assert(os.path.exists(self.mbox_name))
self.assertEqual(os.path.getsize(self.mbox_name), 0)
new_mode = os.stat(self.mbox_name)[stat.ST_MODE]
self.assertEqual(self.mbox_mode, new_mode)
archive_name = self.mbox_name + "_archive.Z"
assert(os.path.exists(archive_name))
os.system("compress -d " + archive_name)
archive_name = self.mbox_name + "_archive"
assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.copy_name, shallow=0))
def tearDown(self): def tearDown(self):
archive = self.mbox_name + "_archive" archive = self.mbox_name + "_archive"
for name in (self.mbox_name, self.copy_name, archive, \ for name in (self.mbox_name, self.copy_name, archive, archive + ".gz"):
archive + ".gz", archive + ".bz2", archive + ".Z"):
if os.path.exists(name): if os.path.exists(name):
os.remove(name) os.remove(name)
archivemail.options.quiet = 0 archivemail.options.quiet = 0
@ -460,7 +416,6 @@ class TestArchiveMboxMixed(unittest.TestCase):
def testArchiveMixedGzip(self): def testArchiveMixedGzip(self):
"""archiving a mixed mailbox with gzip should make an archive""" """archiving a mixed mailbox with gzip should make an archive"""
archivemail.options.compressor = "gzip"
archivemail.archive(self.mixed_mbox) archivemail.archive(self.mixed_mbox)
assert(os.path.exists(self.mixed_mbox)) assert(os.path.exists(self.mixed_mbox))
assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0)) assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0))
@ -471,36 +426,10 @@ class TestArchiveMboxMixed(unittest.TestCase):
assert(os.path.exists(archive_name)) assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0)) assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
def testArchiveMixedBzip2(self):
"""archiving a mixed mailbox with bzip2 should make an archive"""
archivemail.options.compressor = "bzip2"
archivemail.archive(self.mixed_mbox)
assert(os.path.exists(self.mixed_mbox))
assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0))
archive_name = self.mixed_mbox + "_archive.bz2"
assert(os.path.exists(archive_name))
os.system("bzip2 -d " + archive_name)
archive_name = self.mixed_mbox + "_archive"
assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
def testArchiveMixedCompress(self):
"""archiving a mixed mailbox with compress should make an archive"""
archivemail.options.compressor = "compress"
archivemail.archive(self.mixed_mbox)
assert(os.path.exists(self.mixed_mbox))
assert(filecmp.cmp(self.new_mbox, self.mixed_mbox, shallow=0))
archive_name = self.mixed_mbox + "_archive.Z"
assert(os.path.exists(archive_name))
os.system("compress -d " + archive_name)
archive_name = self.mixed_mbox + "_archive"
assert(os.path.exists(archive_name))
assert(filecmp.cmp(archive_name, self.old_mbox, shallow=0))
def tearDown(self): def tearDown(self):
archive = self.mixed_mbox + "_archive" archive = self.mixed_mbox + "_archive"
for name in (self.mixed_mbox, self.old_mbox, self.new_mbox, archive, \ for name in (self.mixed_mbox, self.old_mbox, self.new_mbox, archive, \
archive + ".gz", archive + ".bz2", archive + ".Z"): archive + ".gz"):
if os.path.exists(name): if os.path.exists(name):
os.remove(name) os.remove(name)
archivemail.options.quiet = 0 archivemail.options.quiet = 0