diff --git a/archivemail.py b/archivemail.py index a575929..b98a552 100755 --- a/archivemail.py +++ b/archivemail.py @@ -103,8 +103,8 @@ class Stats: compression extension (eg .gz) """ - assert(mailbox_name) - assert(final_archive_name) + assert mailbox_name + assert final_archive_name self.__start_time = time.time() self.__mailbox_name = mailbox_name self.__archive_name = final_archive_name + ".gz" @@ -326,7 +326,7 @@ class LockableMboxMixin: def lock(self): """Lock this mbox with both a dotlock and a posix lock.""" - assert(not self._locked) + assert not self._locked attempt = 1 while True: try: @@ -347,7 +347,7 @@ class LockableMboxMixin: def unlock(self): """Unlock this mbox.""" - assert(self._locked) + assert self._locked self._dotlock_unlock() self._posix_unlock() self._locked = False @@ -411,7 +411,7 @@ class LockableMboxMixin: def _dotlock_unlock(self): """Delete the dotlock file for the 'mbox' mailbox.""" - assert(self.mbox_file_name) + assert self.mbox_file_name lock_name = self.mbox_file_name + options.lockfile_extension vprint("removing lockfile '%s'" % lock_name) os.remove(lock_name) @@ -425,7 +425,7 @@ class LockableMboxMixin: def close(self): """Close the mbox file""" vprint("closing file '%s'" % self.mbox_file_name) - assert(not self._locked) + assert not self._locked self.mbox_file.close() @@ -440,7 +440,7 @@ class Mbox(mailbox.UnixMailbox, LockableMboxMixin): Named Arguments: path -- file name of the 'mbox' file to be opened """ - assert(path) + assert path fd = safe_open_existing(path) st = os.fstat(fd) self.original_atime = st.st_atime @@ -453,9 +453,9 @@ class Mbox(mailbox.UnixMailbox, LockableMboxMixin): def reset_timestamps(self): """Set the file timestamps to the original values""" - assert(self.original_atime) - assert(self.original_mtime) - assert(self.mbox_file_name) + assert self.original_atime + assert self.original_mtime + assert self.mbox_file_name os.utime(self.mbox_file_name, (self.original_atime, \ self.original_mtime)) @@ -481,7 +481,7 @@ class ArchiveMbox(LockableMboxMixin): def append(self, filename): """Append the content of the given file to the mbox.""" - assert(self._locked) + assert self._locked fin = open(filename, "r") oldsize = os.fstat(self.mbox_file.fileno()).st_size try: @@ -516,8 +516,8 @@ class TempMbox: msg -- rfc822 message object to be written """ - assert(msg) - assert(self.mbox_file) + assert msg + assert self.mbox_file self.empty = False vprint("saving message to file '%s'" % self.mbox_file_name) @@ -528,13 +528,13 @@ class TempMbox: msg_has_mbox_format = False unix_from = make_mbox_from(msg) self.mbox_file.write(unix_from) - assert(msg.headers) + assert msg.headers self.mbox_file.writelines(msg.headers) self.mbox_file.write(os.linesep) # The following while loop is about twice as fast in # practice to 'self.mbox_file.writelines(msg.fp.readlines())' - assert(options.read_buffer_size > 0) + assert options.read_buffer_size > 0 linebuf = "" while 1: body = msg.fp.read(options.read_buffer_size) @@ -606,14 +606,14 @@ class IdentityCache: def __init__(self, mailbox_name): """Constructor: takes the mailbox name as an argument""" - assert(mailbox_name) + assert mailbox_name self.mailbox_name = mailbox_name def warn_if_dupe(self, msg): """Print a warning message if the message has already appeared""" - assert(msg) + assert msg message_id = msg.get('Message-ID') - assert(message_id) + assert message_id if self.seen_ids.has_key(message_id): user_warning("duplicate message id: '%s' in mailbox '%s'" % (message_id, self.mailbox_name)) @@ -724,11 +724,11 @@ def make_mbox_from(message): message -- the rfc822 message object """ - assert(message) + assert message address = guess_return_path(message) time_message = guess_delivery_time(message) date = time.localtime(time_message) - assert(date) + assert date date_string = time.asctime(date) mbox_from = "From %s %s\n" % (address, date_string) return mbox_from @@ -736,7 +736,7 @@ def make_mbox_from(message): def guess_return_path(message): """Return a guess at the Return Path address of an rfc822 message""" - assert(message) + assert message for header in ('Return-path', 'From'): address_header = message.get(header) @@ -747,13 +747,13 @@ def guess_return_path(message): # argh, we can't find any valid 'Return-path' guesses - just # just use the current unix username like mutt does login = pwd.getpwuid(os.getuid())[0] - assert(login) + assert login return login def guess_delivery_time(message): """Return a guess at the delivery date of an rfc822 message""" - assert(message) + assert message # try to guess the delivery date from various headers # get more desparate as we go through the array for header in 'Delivery-date', 'Received', 'Resent-Date', 'Date': @@ -928,7 +928,7 @@ def is_unread(message): def sizeof_message(message): """Return size of message in bytes (octets).""" - assert(message) + assert message file_name = None message_size = None try: @@ -955,8 +955,8 @@ def sizeof_message(message): def is_smaller(message, size): """Return true if the message is smaller than size bytes, false otherwise""" - assert(message) - assert(size > 0) + assert message + assert size > 0 message_size = sizeof_message(message) if message_size < size: vprint("message is too small (%d bytes), minimum bytes : %d" % \ @@ -1064,13 +1064,13 @@ def archive(mailbox_name): Arguments: mailbox_name -- the filename/dirname/url of the mailbox to be archived """ - assert(mailbox_name) + assert mailbox_name # strip any trailing slash (we could be archiving a maildir or MH format # mailbox and somebody was pressing in bash) - we don't want to use # the trailing slash in the archive name mailbox_name = mailbox_name.rstrip("/") - assert(mailbox_name) + assert mailbox_name set_signal_handlers() os.umask(077) # saves setting permissions on mailboxes/tempfiles @@ -1099,7 +1099,7 @@ def archive(mailbox_name): # create a temporary directory for us to work in securely tempfile.tempdir = None new_temp_dir = tempfile.mkdtemp('archivemail') - assert(new_temp_dir) + assert new_temp_dir _stale.temp_dir = new_temp_dir tempfile.tempdir = new_temp_dir vprint("set tempfile directory to '%s'" % new_temp_dir) @@ -1139,8 +1139,8 @@ def _archive_mbox(mailbox_name, final_archive_name): old messages to - appending if the archive already exists """ - assert(mailbox_name) - assert(final_archive_name) + assert mailbox_name + assert final_archive_name stats = Stats(mailbox_name, final_archive_name) cache = IdentityCache(mailbox_name) original = Mbox(path=mailbox_name) @@ -1215,9 +1215,9 @@ def _archive_mbox(mailbox_name, final_archive_name): def _archive_dir(mailbox_name, final_archive_name, type): """Archive a 'maildir' or 'MH' style mailbox - used by archive_mailbox()""" - assert(mailbox_name) - assert(final_archive_name) - assert(type) + assert mailbox_name + assert final_archive_name + assert type stats = Stats(mailbox_name, final_archive_name) delete_queue = [] @@ -1267,8 +1267,8 @@ def _archive_dir(mailbox_name, final_archive_name, type): def _archive_imap(mailbox_name, final_archive_name): """Archive an imap mailbox - used by archive_mailbox()""" - assert(mailbox_name) - assert(final_archive_name) + assert mailbox_name + assert final_archive_name import imaplib import cStringIO import getpass @@ -1456,7 +1456,7 @@ def imap_getdelim(imap_server): def imap_get_namespace(srv): """Return the IMAP namespace prefixes and hierarchy delimiters.""" - assert('NAMESPACE' in srv.capabilities) + assert 'NAMESPACE' in srv.capabilities result, response = srv.namespace() if result != 'OK': unexpected_error("Cannot retrieve IMAP namespace; server says: '%s'" @@ -1637,7 +1637,7 @@ def make_archive_name(mailbox_name): def check_sane_destdir(dir): """Do a very primitive check if the given directory looks like a reasonable destination directory and bail out if it doesn't.""" - assert(dir) + assert dir if not os.path.isdir(dir): user_error("output directory does not exist: '%s'" % dir) if not os.access(dir, os.W_OK): @@ -1685,7 +1685,7 @@ def get_filename(msg): # (msg from mailbox.Maildir, Python >= 2.5) # File object is msg.fp._file, we do want that. if msg.fp.__class__ == mailbox._ProxyFile: - assert(hasattr(mailbox, "_PartialFile")) + assert hasattr(mailbox, "_PartialFile") return msg.fp._file.name raise diff --git a/test_archivemail.py b/test_archivemail.py index 738a5c2..329bd33 100755 --- a/test_archivemail.py +++ b/test_archivemail.py @@ -92,12 +92,12 @@ class TestCaseInTempdir(unittest.TestCase): def setUp(self): if not self.temproot: - assert(not tempfile.tempdir) + assert not tempfile.tempdir self.temproot = tempfile.tempdir = \ tempfile.mkdtemp(prefix="test-archivemail") def tearDown(self): - assert(tempfile.tempdir == self.temproot) + assert tempfile.tempdir == self.temproot if self.temproot: shutil.rmtree(self.temproot) tempfile.tempdir = self.temproot = None @@ -116,9 +116,9 @@ class TestMboxDotlock(TestCaseInTempdir): """dotlock_lock/unlock should create/delete a lockfile""" lock = self.mbox_name + ".lock" self.mbox._dotlock_lock() - assert(os.path.isfile(lock)) + assert os.path.isfile(lock) self.mbox._dotlock_unlock() - assert(not os.path.isfile(lock)) + assert not os.path.isfile(lock) def testDotlockingSucceedsUponEACCES(self): """A dotlock should silently be omitted upon EACCES.""" @@ -192,7 +192,7 @@ class TestMboxNext(TestCaseInTempdir): mbox = archivemail.Mbox(self.not_empty_name) for count in range(18): msg = mbox.next() - assert(msg) + assert msg msg = mbox.next() self.assertEqual(msg, None) @@ -214,7 +214,7 @@ class TestTempMboxWrite(TestCaseInTempdir): mbox_write.write(msg) mbox_read.close() mbox_write.close() - assert(filecmp.cmp(read_file, write_file, shallow=0)) + assert filecmp.cmp(read_file, write_file, shallow=0) def testWriteNone(self): """calling mbox.write() with no message should raise AssertionError""" @@ -229,9 +229,9 @@ class TestTempMboxRemove(TestCaseInTempdir): def testMboxRemove(self): """remove() should delete a mbox mailbox""" - assert(os.path.exists(self.mbox_name)) + assert os.path.exists(self.mbox_name) self.mbox.remove() - assert(not os.path.exists(self.mbox_name)) + assert not os.path.exists(self.mbox_name) @@ -289,27 +289,27 @@ class TestOptionParser(unittest.TestCase): def testOptionPreserveUnread(self): """--preserve-unread option is parsed correctly""" archivemail.options.parse_args(["--preserve-unread"], "") - assert(archivemail.options.preserve_unread) + assert archivemail.options.preserve_unread archivemail.options.preserve_unread = 0 archivemail.options.parse_args(["-u"], "") - assert(archivemail.options.preserve_unread) + assert archivemail.options.preserve_unread def testOptionSuffix(self): """--suffix and -s options are parsed correctly""" for suffix in ("_static_", "_%B_%Y", "-%Y-%m-%d"): archivemail.options.parse_args(["--suffix="+suffix], "") - assert(archivemail.options.archive_suffix == suffix) + assert archivemail.options.archive_suffix == suffix archivemail.options.suffix = None archivemail.options.parse_args(["-s", suffix], "") - assert(archivemail.options.archive_suffix == suffix) + assert archivemail.options.archive_suffix == suffix def testOptionDryrun(self): """--dry-run option is parsed correctly""" archivemail.options.parse_args(["--dry-run"], "") - assert(archivemail.options.dry_run) + assert archivemail.options.dry_run archivemail.options.preserve_unread = 0 archivemail.options.parse_args(["-n"], "") - assert(archivemail.options.dry_run) + assert archivemail.options.dry_run def testOptionDays(self): """--days and -d options are parsed correctly""" @@ -322,12 +322,12 @@ class TestOptionParser(unittest.TestCase): def testOptionDelete(self): """--delete option is parsed correctly""" archivemail.options.parse_args(["--delete"], "") - assert(archivemail.options.delete_old_mail) + assert archivemail.options.delete_old_mail def testOptionCopy(self): """--copy option is parsed correctly""" archivemail.options.parse_args(["--copy"], "") - assert(archivemail.options.copy_old_mail) + assert archivemail.options.copy_old_mail def testOptionOutputdir(self): """--output-dir and -o options are parsed correctly""" @@ -341,7 +341,7 @@ class TestOptionParser(unittest.TestCase): def testOptionNocompress(self): """--no-compress option is parsed correctly""" archivemail.options.parse_args(["--no-compress"], "") - assert(archivemail.options.no_compress) + assert archivemail.options.no_compress def testOptionSize(self): """--size and -S options are parsed correctly""" @@ -361,43 +361,43 @@ class TestIsTooOld(unittest.TestCase): """with max_days=360, should be true for these dates > 1 year""" for years in range(1, 10): time_msg = time.time() - (years * 365 * 24 * 60 * 60) - assert(archivemail.is_older_than_days(time_message=time_msg, - max_days=360)) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=360) def testOld(self): """with max_days=14, should be true for these dates > 14 days""" for days in range(14, 360): time_msg = time.time() - (days * 24 * 60 * 60) - assert(archivemail.is_older_than_days(time_message=time_msg, - max_days=14)) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=14) def testJustOld(self): """with max_days=1, should be true for these dates >= 1 day""" for minutes in range(0, 61): time_msg = time.time() - (25 * 60 * 60) + (minutes * 60) - assert(archivemail.is_older_than_days(time_message=time_msg, - max_days=1)) + assert archivemail.is_older_than_days(time_message=time_msg, + max_days=1) def testNotOld(self): """with max_days=9, should be false for these dates < 9 days""" for days in range(0, 9): time_msg = time.time() - (days * 24 * 60 * 60) - assert(not archivemail.is_older_than_days(time_message=time_msg, - max_days=9)) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=9) def testJustNotOld(self): """with max_days=1, should be false for these hours <= 1 day""" for minutes in range(0, 60): time_msg = time.time() - (23 * 60 * 60) - (minutes * 60) - assert(not archivemail.is_older_than_days(time_message=time_msg, - max_days=1)) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) def testFuture(self): """with max_days=1, should be false for times in the future""" for minutes in range(0, 60): time_msg = time.time() + (minutes * 60) - assert(not archivemail.is_older_than_days(time_message=time_msg, - max_days=1)) + assert not archivemail.is_older_than_days(time_message=time_msg, + max_days=1) ########## archivemail.parse_imap_url() unit testing ################# @@ -469,7 +469,7 @@ class TestParseIMAPUrl(unittest.TestCase): self.assertEqual(result, mbstr[2]) for mbstr in self.urls_onlywithpass: url = mbstr[0][mbstr[0].find('://')+3:] - self.assertRaises(archivemail.UnexpectedError, + self.assertRaises(archivemail.UnexpectedError, archivemail.parse_imap_url, url) def tearDown(self): @@ -485,7 +485,7 @@ class TestArchive(TestCaseInTempdir): good_archive = good_mbox = None def verify(self): - assert(os.path.exists(self.mbox)) + assert os.path.exists(self.mbox) if self.good_mbox is not None: assertEqualContent(self.mbox, self.good_mbox) else: @@ -495,12 +495,12 @@ class TestArchive(TestCaseInTempdir): archive_name += ".gz" iszipped = True else: - assert(not os.path.exists(archive_name + ".gz")) + assert not os.path.exists(archive_name + ".gz") iszipped = False if self.good_archive is not None: assertEqualContent(archive_name, self.good_archive, iszipped) else: - assert(not os.path.exists(archive_name)) + assert not os.path.exists(archive_name) def make_old_mbox(self, body=None, headers=None, messages=1, old_archive=False): """Make an old mbox, optionally an existing archive, and make a reference @@ -601,10 +601,10 @@ This is after the ^From line""" msg = make_message(default_headers=headers, wantobj=True) date = time.strptime("2000-07-29", "%Y-%m-%d") archivemail.options.date_old_max = time.mktime(date) - assert(archivemail.should_archive(msg)) + assert archivemail.should_archive(msg) date = time.strptime("2000-07-27", "%Y-%m-%d") archivemail.options.date_old_max = time.mktime(date) - assert(not archivemail.should_archive(msg)) + assert not archivemail.should_archive(msg) def testMixed(self): """archiving a mixed mailbox""" @@ -704,7 +704,7 @@ class TestArchiveMboxTimestamp(TestCaseInTempdir): self.verify() def verify(self): - assert(os.path.exists(self.mbox_name)) + assert os.path.exists(self.mbox_name) new_atime = os.path.getatime(self.mbox_name) new_mtime = os.path.getmtime(self.mbox_name) self.assertAlmostEqual(self.mtime, new_mtime, utimes_precision) @@ -725,12 +725,12 @@ class TestArchiveMboxAll(unittest.TestCase): def testNew(self): """new messages should be archived with --all""" self.msg = make_message(hours_old=24*179, wantobj=True) - assert(archivemail.should_archive(self.msg)) + assert archivemail.should_archive(self.msg) def testOld(self): """old messages should be archived with --all""" self.msg = make_message(hours_old=24*181, wantobj=True) - assert(archivemail.should_archive(self.msg)) + assert archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = 0 @@ -746,12 +746,12 @@ class TestArchiveMboxPreserveUnread(unittest.TestCase): def testOldRead(self): """old read messages should be archived with --preserve-unread""" self.msg["Status"] = "RO" - assert(archivemail.should_archive(self.msg)) + assert archivemail.should_archive(self.msg) def testOldUnread(self): """old unread messages should not be archived with --preserve-unread""" self.msg["Status"] = "O" - assert(not archivemail.should_archive(self.msg)) + assert not archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = 0 @@ -873,20 +873,20 @@ class TestArchiveMboxFlagged(unittest.TestCase): """by default, old flagged messages should not be archived""" msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*181, wantobj=True) - assert(not archivemail.should_archive(msg)) + assert not archivemail.should_archive(msg) def testIncludeFlaggedNew(self): """new flagged messages should not be archived with include_flagged""" msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*179, wantobj=True) - assert(not archivemail.should_archive(msg)) + assert not archivemail.should_archive(msg) def testIncludeFlaggedOld(self): """old flagged messages should be archived with include_flagged""" archivemail.options.include_flagged = 1 msg = make_message(default_headers={"X-Status": "F"}, hours_old=24*181, wantobj=True) - assert(archivemail.should_archive(msg)) + assert archivemail.should_archive(msg) def tearDown(self): archivemail.options.include_flagged = 0 @@ -964,12 +964,12 @@ class TestArchiveSize(unittest.TestCase): def testSmaller(self): """giving a size argument smaller than the message""" archivemail.options.min_size = self.msg_size - 1 - assert(archivemail.should_archive(self.msg)) + assert archivemail.should_archive(self.msg) def testBigger(self): """giving a size argument bigger than the message""" archivemail.options.min_size = self.msg_size + 1 - assert(not archivemail.should_archive(self.msg)) + assert not archivemail.should_archive(self.msg) def tearDown(self): archivemail.options.quiet = 0 @@ -989,8 +989,8 @@ class TestArchiveMboxMode(TestCaseInTempdir): os.chmod(self.mbox_name, mode) archivemail.archive(self.mbox_name) archive_name = self.mbox_name + "_archive.gz" - assert(os.path.exists(self.mbox_name)) - assert(os.path.exists(archive_name)) + assert os.path.exists(self.mbox_name) + assert os.path.exists(archive_name) new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(mode, stat.S_IMODE(new_mode)) archive_mode = os.stat(archive_name)[stat.ST_MODE] @@ -1003,8 +1003,8 @@ class TestArchiveMboxMode(TestCaseInTempdir): os.chmod(self.mbox_name, mode) archivemail.archive(self.mbox_name) archive_name = self.mbox_name + "_archive.gz" - assert(not os.path.exists(archive_name)) - assert(os.path.exists(self.mbox_name)) + assert not os.path.exists(archive_name) + assert os.path.exists(self.mbox_name) new_mode = os.stat(self.mbox_name)[stat.ST_MODE] self.assertEqual(mode, stat.S_IMODE(new_mode)) os.remove(self.mbox_name) @@ -1049,8 +1049,8 @@ def make_message(body=None, default_headers={}, hours_old=None, wantobj=False): def append_file(source, dest): """appends the file named 'source' to the file named 'dest'""" - assert(os.path.isfile(source)) - assert(os.path.isfile(dest)) + assert os.path.isfile(source) + assert os.path.isfile(dest) read = open(source, "r") write = open(dest, "a+") shutil.copyfileobj(read,write) @@ -1059,7 +1059,7 @@ def append_file(source, dest): def make_mbox(body=None, headers=None, hours_old=0, messages=1): - assert(tempfile.tempdir) + assert tempfile.tempdir fd, name = tempfile.mkstemp() file = os.fdopen(fd, "w") for count in range(messages): @@ -1097,18 +1097,18 @@ def make_old_archive(mailbox_name): def assertEqualContent(firstfile, secondfile, zipfirst=False): """Verify that the two files exist and have identical content. If zipfirst is True, assume that firstfile is gzip-compressed.""" - assert(os.path.exists(firstfile)) - assert(os.path.exists(secondfile)) + assert os.path.exists(firstfile) + assert os.path.exists(secondfile) if zipfirst: try: fp1 = gzip.GzipFile(firstfile, "r") fp2 = open(secondfile, "r") - assert(cmp_fileobj(fp1, fp2)) + assert cmp_fileobj(fp1, fp2) finally: fp1.close() fp2.close() else: - assert(filecmp.cmp(firstfile, secondfile, shallow=0)) + assert filecmp.cmp(firstfile, secondfile, shallow=0) def cmp_fileobj(fp1, fp2): """Return if reading the fileobjects yields identical content."""