"""Read/write support for Maildir, mbox, MH, Babyl, and MMDF mailboxes."""
# Notes for authors of new mailbox subclasses:
# Remember to fsync() changes to disk before closing a modified file
# or returning from a flush() method. See functions _sync_flush() and
__all__ = ['Mailbox', 'Maildir', 'mbox', 'MH', 'Babyl', 'MMDF',
'Message', 'MaildirMessage', 'mboxMessage', 'MHMessage',
'BabylMessage', 'MMDFMessage', 'Error', 'NoSuchMailboxError',
'NotEmptyError', 'ExternalClashError', 'FormatError']
linesep = os.linesep.encode('ascii')
"""A group of messages in a particular place."""
def __init__(self, path, factory=None, create=True):
"""Initialize a Mailbox instance."""
self._path = os.path.abspath(os.path.expanduser(path))
"""Add message and return assigned key."""
raise NotImplementedError('Method must be implemented by subclass')
"""Remove the keyed message; raise KeyError if it doesn't exist."""
raise NotImplementedError('Method must be implemented by subclass')
def __delitem__(self, key):
"""If the keyed message exists, remove it."""
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
raise NotImplementedError('Method must be implemented by subclass')
def get(self, key, default=None):
"""Return the keyed message, or default if it doesn't exist."""
return self.__getitem__(key)
def __getitem__(self, key):
"""Return the keyed message; raise KeyError if it doesn't exist."""
return self.get_message(key)
with contextlib.closing(self.get_file(key)) as file:
return self._factory(file)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
raise NotImplementedError('Method must be implemented by subclass')
def get_string(self, key):
"""Return a string representation or raise a KeyError.
Uses email.message.Message to create a 7bit clean string
representation of the message."""
return email.message_from_bytes(self.get_bytes(key)).as_string()
def get_bytes(self, key):
"""Return a byte string representation or raise a KeyError."""
raise NotImplementedError('Method must be implemented by subclass')
"""Return a file-like representation or raise a KeyError."""
raise NotImplementedError('Method must be implemented by subclass')
"""Return an iterator over keys."""
raise NotImplementedError('Method must be implemented by subclass')
"""Return a list of keys."""
return list(self.iterkeys())
"""Return an iterator over all messages."""
for key in self.iterkeys():
"""Return a list of messages. Memory intensive."""
return list(self.itervalues())
"""Return an iterator over (key, message) tuples."""
for key in self.iterkeys():
"""Return a list of (key, message) tuples. Memory intensive."""
return list(self.iteritems())
def __contains__(self, key):
"""Return True if the keyed message exists, False otherwise."""
raise NotImplementedError('Method must be implemented by subclass')
"""Return a count of messages in the mailbox."""
raise NotImplementedError('Method must be implemented by subclass')
"""Delete all messages."""
def pop(self, key, default=None):
"""Delete the keyed message and return it, or default."""
"""Delete an arbitrary (key, message) pair and return it."""
for key in self.iterkeys():
return (key, self.pop(key)) # This is only run once.
raise KeyError('No messages in mailbox')
def update(self, arg=None):
"""Change the messages that correspond to certain keys."""
if hasattr(arg, 'iteritems'):
elif hasattr(arg, 'items'):
for key, message in source:
raise KeyError('No message with key(s)')
"""Write any pending changes to the disk."""
raise NotImplementedError('Method must be implemented by subclass')
raise NotImplementedError('Method must be implemented by subclass')
"""Unlock the mailbox if it is locked."""
raise NotImplementedError('Method must be implemented by subclass')
"""Flush and close the mailbox."""
raise NotImplementedError('Method must be implemented by subclass')
def _string_to_bytes(self, message):
# If a message is not 7bit clean, we refuse to handle it since it
# likely came from reading invalid messages in text mode, and that way
return message.encode('ascii')
raise ValueError("String input must be ASCII-only; "
"use bytes or a Message instead")
# Whether each message must end in a newline
def _dump_message(self, message, target, mangle_from_=False):
# This assumes the target file is open in binary mode.
"""Dump message contents to target file."""
if isinstance(message, email.message.Message):
gen = email.generator.BytesGenerator(buffer, mangle_from_, 0)
data = data.replace(b'\n', linesep)
if self._append_newline and not data.endswith(linesep):
# Make sure the message ends with a newline
elif isinstance(message, (str, bytes, io.StringIO)):
if isinstance(message, io.StringIO):
warnings.warn("Use of StringIO input is deprecated, "
"use BytesIO instead", DeprecationWarning, 3)
message = message.getvalue()
if isinstance(message, str):
message = self._string_to_bytes(message)
message = message.replace(b'\nFrom ', b'\n>From ')
message = message.replace(b'\n', linesep)
if self._append_newline and not message.endswith(linesep):
# Make sure the message ends with a newline
elif hasattr(message, 'read'):
if hasattr(message, 'buffer'):
warnings.warn("Use of text mode files is deprecated, "
"use a binary mode file instead", DeprecationWarning, 3)
line = message.readline()
# Universal newline support.
if line.endswith(b'\r\n'):
elif line.endswith(b'\r'):
if mangle_from_ and line.startswith(b'From '):
line = b'>From ' + line[5:]
line = line.replace(b'\n', linesep)
if self._append_newline and lastline and not lastline.endswith(linesep):
# Make sure the message ends with a newline
raise TypeError('Invalid message type: %s' % type(message))
"""A qmail-style Maildir mailbox."""
def __init__(self, dirname, factory=None, create=True):
"""Initialize a Maildir instance."""
Mailbox.__init__(self, dirname, factory, create)
'tmp': os.path.join(self._path, 'tmp'),
'new': os.path.join(self._path, 'new'),
'cur': os.path.join(self._path, 'cur'),
if not os.path.exists(self._path):
os.mkdir(self._path, 0o700)
for path in self._paths.values():
raise NoSuchMailboxError(self._path)
self._toc_mtimes = {'cur': 0, 'new': 0}
self._last_read = 0 # Records last time we read cur/new
self._skewfactor = 0.1 # Adjust if os/fs clocks are skewing
"""Add message and return assigned key."""
tmp_file = self._create_tmp()
self._dump_message(message, tmp_file)
if isinstance(message, MaildirMessage):
subdir = message.get_subdir()
suffix = self.colon + message.get_info()
uniq = os.path.basename(tmp_file.name).split(self.colon)[0]
dest = os.path.join(self._path, subdir, uniq + suffix)
if isinstance(message, MaildirMessage):
(os.path.getatime(tmp_file.name), message.get_date()))
# No file modification should be done after the file is moved to its
# final position in order to prevent race conditions with changes
os.link(tmp_file.name, dest)
except (AttributeError, PermissionError):
os.rename(tmp_file.name, dest)
if e.errno == errno.EEXIST:
raise ExternalClashError('Name clash with existing message: %s'
"""Remove the keyed message; raise KeyError if it doesn't exist."""
os.remove(os.path.join(self._path, self._lookup(key)))
"""If the keyed message exists, remove it."""
# This overrides an inapplicable implementation in the superclass.
except (KeyError, FileNotFoundError):
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
old_subpath = self._lookup(key)
temp_key = self.add(message)
temp_subpath = self._lookup(temp_key)
if isinstance(message, MaildirMessage):
# temp's subdir and suffix were specified by message.
dominant_subpath = temp_subpath
# temp's subdir and suffix were defaults from add().
dominant_subpath = old_subpath
subdir = os.path.dirname(dominant_subpath)
if self.colon in dominant_subpath:
suffix = self.colon + dominant_subpath.split(self.colon)[-1]
tmp_path = os.path.join(self._path, temp_subpath)
new_path = os.path.join(self._path, subdir, key + suffix)
if isinstance(message, MaildirMessage):
(os.path.getatime(tmp_path), message.get_date()))
# No file modification should be done after the file is moved to its
# final position in order to prevent race conditions with changes
os.rename(tmp_path, new_path)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
subpath = self._lookup(key)
with open(os.path.join(self._path, subpath), 'rb') as f:
subdir, name = os.path.split(subpath)
msg.set_info(name.split(self.colon)[-1])
msg.set_date(os.path.getmtime(os.path.join(self._path, subpath)))
def get_bytes(self, key):
"""Return a bytes representation or raise a KeyError."""
with open(os.path.join(self._path, self._lookup(key)), 'rb') as f:
return f.read().replace(linesep, b'\n')
"""Return a file-like representation or raise a KeyError."""
f = open(os.path.join(self._path, self._lookup(key)), 'rb')
"""Return an iterator over keys."""
def __contains__(self, key):
"""Return True if the keyed message exists, False otherwise."""
"""Return a count of messages in the mailbox."""
"""Write any pending changes to disk."""
# Maildir changes are always written immediately, so there's nothing
"""Unlock the mailbox if it is locked."""
"""Flush and close the mailbox."""
"""Return a list of folder names."""
for entry in os.listdir(self._path):
if len(entry) > 1 and entry[0] == '.' and \
os.path.isdir(os.path.join(self._path, entry)):
def get_folder(self, folder):
"""Return a Maildir instance for the named folder."""
return Maildir(os.path.join(self._path, '.' + folder),
def add_folder(self, folder):
"""Create a folder and return a Maildir instance representing it."""
path = os.path.join(self._path, '.' + folder)
result = Maildir(path, factory=self._factory)
maildirfolder_path = os.path.join(path, 'maildirfolder')
if not os.path.exists(maildirfolder_path):
os.close(os.open(maildirfolder_path, os.O_CREAT | os.O_WRONLY,
def remove_folder(self, folder):
"""Delete the named folder, which must be empty."""
path = os.path.join(self._path, '.' + folder)
for entry in os.listdir(os.path.join(path, 'new')) + \
os.listdir(os.path.join(path, 'cur')):
if len(entry) < 1 or entry[0] != '.':
raise NotEmptyError('Folder contains message(s): %s' % folder)
for entry in os.listdir(path):
if entry != 'new' and entry != 'cur' and entry != 'tmp' and \
os.path.isdir(os.path.join(path, entry)):
raise NotEmptyError("Folder contains subdirectory '%s': %s" %
for root, dirs, files in os.walk(path, topdown=False):
os.remove(os.path.join(root, entry))
os.rmdir(os.path.join(root, entry))
"""Delete old files in "tmp"."""
for entry in os.listdir(os.path.join(self._path, 'tmp')):
path = os.path.join(self._path, 'tmp', entry)
if now - os.path.getatime(path) > 129600: # 60 * 60 * 36
_count = 1 # This is used to generate unique file names.
"""Create a file in the tmp subdirectory and open and return it."""
hostname = socket.gethostname()
hostname = hostname.replace('/', r'\057')
hostname = hostname.replace(':', r'\072')
uniq = "%s.M%sP%sQ%s.%s" % (int(now), int(now % 1 * 1e6), os.getpid(),
Maildir._count, hostname)
path = os.path.join(self._path, 'tmp', uniq)
except FileNotFoundError: