elif hasattr(message, "read"):
self._become_message(email.message_from_binary_file(message))
email.message.Message.__init__(self)
raise TypeError('Invalid message type: %s' % type(message))
def _become_message(self, message):
"""Assume the non-format-specific state of message."""
type_specific = getattr(message, '_type_specific_attributes', [])
for name in message.__dict__:
if name not in type_specific:
self.__dict__[name] = message.__dict__[name]
def _explain_to(self, message):
"""Copy format-specific state to message insofar as possible."""
if isinstance(message, Message):
return # There's nothing format-specific to explain.
raise TypeError('Cannot convert to specified type')
class MaildirMessage(Message):
"""Message with Maildir-specific properties."""
_type_specific_attributes = ['_subdir', '_info', '_date']
def __init__(self, message=None):
"""Initialize a MaildirMessage instance."""
Message.__init__(self, message)
"""Return 'new' or 'cur'."""
def set_subdir(self, subdir):
"""Set subdir to 'new' or 'cur'."""
if subdir == 'new' or subdir == 'cur':
raise ValueError("subdir must be 'new' or 'cur': %s" % subdir)
"""Return as a string the flags that are set."""
if self._info.startswith('2,'):
def set_flags(self, flags):
"""Set the given flags and unset all others."""
self._info = '2,' + ''.join(sorted(flags))
def add_flag(self, flag):
"""Set the given flag(s) without changing others."""
self.set_flags(''.join(set(self.get_flags()) | set(flag)))
def remove_flag(self, flag):
"""Unset the given string flag(s) without changing others."""
self.set_flags(''.join(set(self.get_flags()) - set(flag)))
"""Return delivery date of message, in seconds since the epoch."""
def set_date(self, date):
"""Set delivery date of message, in seconds since the epoch."""
raise TypeError("can't convert to float: %s" % date) from None
"""Get the message's "info" as a string."""
def set_info(self, info):
"""Set the message's "info" string."""
if isinstance(info, str):
raise TypeError('info must be a string: %s' % type(info))
def _explain_to(self, message):
"""Copy Maildir-specific state to message insofar as possible."""
if isinstance(message, MaildirMessage):
message.set_flags(self.get_flags())
message.set_subdir(self.get_subdir())
message.set_date(self.get_date())
elif isinstance(message, _mboxMMDFMessage):
flags = set(self.get_flags())
if self.get_subdir() == 'cur':
message.set_from('MAILER-DAEMON', time.gmtime(self.get_date()))
elif isinstance(message, MHMessage):
flags = set(self.get_flags())
message.add_sequence('unseen')
message.add_sequence('replied')
message.add_sequence('flagged')
elif isinstance(message, BabylMessage):
flags = set(self.get_flags())
message.add_label('unseen')
message.add_label('deleted')
message.add_label('answered')
message.add_label('forwarded')
elif isinstance(message, Message):
raise TypeError('Cannot convert to specified type: %s' %
class _mboxMMDFMessage(Message):
"""Message with mbox- or MMDF-specific properties."""
_type_specific_attributes = ['_from']
def __init__(self, message=None):
"""Initialize an mboxMMDFMessage instance."""
self.set_from('MAILER-DAEMON', True)
if isinstance(message, email.message.Message):
unixfrom = message.get_unixfrom()
if unixfrom is not None and unixfrom.startswith('From '):
self.set_from(unixfrom[5:])
Message.__init__(self, message)
"""Return contents of "From " line."""
def set_from(self, from_, time_=None):
"""Set "From " line, formatting and appending time_ if specified."""
from_ += ' ' + time.asctime(time_)
"""Return as a string the flags that are set."""
return self.get('Status', '') + self.get('X-Status', '')
def set_flags(self, flags):
"""Set the given flags and unset all others."""
status_flags, xstatus_flags = '', ''
for flag in ('D', 'F', 'A'):
xstatus_flags += ''.join(sorted(flags))
self.replace_header('Status', status_flags)
self.add_header('Status', status_flags)
self.replace_header('X-Status', xstatus_flags)
self.add_header('X-Status', xstatus_flags)
def add_flag(self, flag):
"""Set the given flag(s) without changing others."""
self.set_flags(''.join(set(self.get_flags()) | set(flag)))
def remove_flag(self, flag):
"""Unset the given string flag(s) without changing others."""
if 'Status' in self or 'X-Status' in self:
self.set_flags(''.join(set(self.get_flags()) - set(flag)))
def _explain_to(self, message):
"""Copy mbox- or MMDF-specific state to message insofar as possible."""
if isinstance(message, MaildirMessage):
flags = set(self.get_flags())
message.set_subdir('cur')
maybe_date = ' '.join(self.get_from().split()[-5:])
message.set_date(calendar.timegm(time.strptime(maybe_date,
'%a %b %d %H:%M:%S %Y')))
except (ValueError, OverflowError):
elif isinstance(message, _mboxMMDFMessage):
message.set_flags(self.get_flags())
message.set_from(self.get_from())
elif isinstance(message, MHMessage):
flags = set(self.get_flags())
message.add_sequence('unseen')
message.add_sequence('replied')
message.add_sequence('flagged')
elif isinstance(message, BabylMessage):
flags = set(self.get_flags())
message.add_label('unseen')
message.add_label('deleted')
message.add_label('answered')
elif isinstance(message, Message):
raise TypeError('Cannot convert to specified type: %s' %
class mboxMessage(_mboxMMDFMessage):
"""Message with mbox-specific properties."""
class MHMessage(Message):
"""Message with MH-specific properties."""
_type_specific_attributes = ['_sequences']
def __init__(self, message=None):
"""Initialize an MHMessage instance."""
Message.__init__(self, message)
"""Return a list of sequences that include the message."""
return self._sequences[:]
def set_sequences(self, sequences):
"""Set the list of sequences that include the message."""
self._sequences = list(sequences)
def add_sequence(self, sequence):
"""Add sequence to list of sequences including the message."""
if isinstance(sequence, str):
if not sequence in self._sequences:
self._sequences.append(sequence)
raise TypeError('sequence type must be str: %s' % type(sequence))
def remove_sequence(self, sequence):
"""Remove sequence from the list of sequences including the message."""
self._sequences.remove(sequence)
def _explain_to(self, message):
"""Copy MH-specific state to message insofar as possible."""
if isinstance(message, MaildirMessage):
sequences = set(self.get_sequences())
if 'unseen' in sequences:
message.set_subdir('cur')
message.set_subdir('cur')
if 'flagged' in sequences:
if 'replied' in sequences:
elif isinstance(message, _mboxMMDFMessage):
sequences = set(self.get_sequences())
if 'unseen' not in sequences:
if 'flagged' in sequences:
if 'replied' in sequences:
elif isinstance(message, MHMessage):
for sequence in self.get_sequences():
message.add_sequence(sequence)
elif isinstance(message, BabylMessage):
sequences = set(self.get_sequences())
if 'unseen' in sequences:
message.add_label('unseen')
if 'replied' in sequences:
message.add_label('answered')
elif isinstance(message, Message):
raise TypeError('Cannot convert to specified type: %s' %
class BabylMessage(Message):
"""Message with Babyl-specific properties."""
_type_specific_attributes = ['_labels', '_visible']
def __init__(self, message=None):
"""Initialize a BabylMessage instance."""
self._visible = Message()
Message.__init__(self, message)
"""Return a list of labels on the message."""
def set_labels(self, labels):
"""Set the list of labels on the message."""
self._labels = list(labels)
def add_label(self, label):
"""Add label to list of labels on the message."""
if isinstance(label, str):
if label not in self._labels:
self._labels.append(label)
raise TypeError('label must be a string: %s' % type(label))
def remove_label(self, label):
"""Remove label from the list of labels on the message."""
self._labels.remove(label)
"""Return a Message representation of visible headers."""
return Message(self._visible)
def set_visible(self, visible):
"""Set the Message representation of visible headers."""
self._visible = Message(visible)
def update_visible(self):
"""Update and/or sensibly generate a set of visible headers."""
for header in self._visible.keys():
self._visible.replace_header(header, self[header])
del self._visible[header]
for header in ('Date', 'From', 'Reply-To', 'To', 'CC', 'Subject'):
if header in self and header not in self._visible:
self._visible[header] = self[header]
def _explain_to(self, message):
"""Copy Babyl-specific state to message insofar as possible."""
if isinstance(message, MaildirMessage):
labels = set(self.get_labels())
message.set_subdir('cur')
message.set_subdir('cur')
if 'forwarded' in labels or 'resent' in labels:
elif isinstance(message, _mboxMMDFMessage):
labels = set(self.get_labels())
if 'unseen' not in labels:
elif isinstance(message, MHMessage):
labels = set(self.get_labels())
message.add_sequence('unseen')
message.add_sequence('replied')
elif isinstance(message, BabylMessage):
message.set_visible(self.get_visible())
for label in self.get_labels():
elif isinstance(message, Message):
raise TypeError('Cannot convert to specified type: %s' %
class MMDFMessage(_mboxMMDFMessage):
"""Message with MMDF-specific properties."""
"""A read-only wrapper of a file."""
def __init__(self, f, pos=None):
"""Initialize a _ProxyFile."""
def read(self, size=None):
return self._read(size, self._file.read)
def read1(self, size=None):
return self._read(size, self._file.read1)
def readline(self, size=None):
return self._read(size, self._file.readline)
def readlines(self, sizehint=None):
"""Read multiple lines."""
"""Iterate over lines."""
"""Return the position."""
def seek(self, offset, whence=0):
self._file.seek(self._pos)
self._file.seek(offset, whence)
self._pos = self._file.tell()
if hasattr(self, '_file'):
if hasattr(self._file, 'close'):
def _read(self, size, read_method):
"""Read size bytes using read_method."""
self._file.seek(self._pos)
result = read_method(size)
self._pos = self._file.tell()
"""Context management protocol support."""
def __exit__(self, *exc):
return self._file.readable()