From f90bd01a744ffabb011c1889dfc2e9270d9ed93e Mon Sep 17 00:00:00 2001 From: Paul Rodger Date: Tue, 2 Apr 2002 13:37:49 +0000 Subject: [PATCH] Added MH mailbox support. --- archivemail.py | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/archivemail.py b/archivemail.py index c5c2d70..e96d0df 100755 --- a/archivemail.py +++ b/archivemail.py @@ -50,7 +50,7 @@ import tempfile import time # global administrivia -__version__ = "archivemail v0.1.0" +__version__ = "archivemail v0.2.0" __cvs_id__ = "$Id$" __copyright__ = """Copyright (C) 2002 Paul Rodger This is free software; see the source for copying conditions. There is NO @@ -451,11 +451,15 @@ manually, and try running me again.""" % final_name) class IdentityCache: seen_ids = {} mailbox_name = None + def __init__(self, mailbox_name): assert(mailbox_name) self.mailbox_name = mailbox_name + def warn_if_dupe(self, msg): + assert(msg) message_id = msg.get('Message-ID') + assert(message_id) if self.seen_ids.has_key(message_id): user_warning("duplicate message id: '%s' in mailbox '%s'" % (message_id, self.mailbox_name)) @@ -573,11 +577,13 @@ def make_mbox_from(message): def get_date_mtime(message): """Return the delivery date of an rfc822 message in a maildir mailbox""" + assert(message) vprint("using last-modification time of message file") return os.path.getmtime(message.fp.name) def get_date_headers(message): """Return the delivery date of an rfc822 message in a mbox mailbox""" + assert(message) date = message.getdate('Date') delivery_date = message.getdate('Delivery-date') use_date = None @@ -635,6 +641,8 @@ def archive(mailbox_name): already exists """ + assert(mailbox_name) + tempfile.tempdir = choose_temp_dir(mailbox_name) vprint("set tempfile directory to '%s'" % tempfile.tempdir) @@ -655,10 +663,10 @@ def archive(mailbox_name): new_path = os.path.join(mailbox_name, "new") if os.path.isdir(cur_path) and os.path.isdir(new_path): vprint("guessing mailbox is of type: maildir") - _archive_maildir(mailbox_name, final_archive_name) + _archive_dir(mailbox_name, final_archive_name, "maildir") else: vprint("guessing mailbox is of type: MH") - _archive_mh(mailbox_name, final_archive_name) + _archive_dir(mailbox_name, final_archive_name, "mh") else: user_error("'%s': no such file or directory" % mailbox_name) @@ -672,6 +680,9 @@ def _archive_mbox(mailbox_name, final_archive_name): old messages to - appending if the archive already exists """ + assert(mailbox_name) + assert(final_archive_name) + archive = None retain = None stats = Stats(mailbox_name, final_archive_name) @@ -734,13 +745,24 @@ def _archive_mbox(mailbox_name, final_archive_name): stats.display() -def _archive_maildir(mailbox_name, final_archive_name): - """Archive a 'maildir' style mailbox - used by archive_mailbox()""" +def _archive_dir(mailbox_name, final_archive_name, type): + """Archive a 'maildir' or 'MH' style mailbox - used by archive_mailbox()""" + assert(mailbox_name) + assert(final_archive_name) + assert(type) + original = None archive = None stats = Stats(mailbox_name, final_archive_name) delete_queue = [] - original = mailbox.Maildir(mailbox_name) + + if type == "maildir": + original = mailbox.Maildir(mailbox_name) + elif type == "mh": + original = mailbox.MHMailbox(mailbox_name) + else: + unexpected_error("unknown type: %s" % type) assert(original) + cache = IdentityCache(mailbox_name) msg = original.next() @@ -779,9 +801,6 @@ def _archive_maildir(mailbox_name, final_archive_name): if not _options.quiet: stats.display() -def _archive_mh(mailbox_name, final_archive_name): - """Archive a 'MH' style mailbox - see archive_mailbox()""" - unexpected_error("'MH' type mailbox support not yet implemented") ############### misc functions ############### @@ -812,6 +831,7 @@ def choose_temp_dir(mailbox_path): mailbox_path -- path name to the original mailbox """ + assert(mailbox_path) temp_dir = os.path.dirname(mailbox_path) if _options.output_dir: temp_dir = _options.output_dir @@ -821,6 +841,7 @@ def choose_temp_dir(mailbox_path): def system_or_die(command): """Run the command with os.system(), aborting on non-zero exit""" + assert(command) rv = os.system(command) if (rv != 0): status = os.WEXITSTATUS(rv)