if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
self._dump_message(message, f)
if isinstance(message, MHMessage):
self._dump_sequences(message, key)
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
f = open(os.path.join(self._path, str(key)), 'rb+')
f = open(os.path.join(self._path, str(key)), 'rb')
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
for name, key_list in self.get_sequences().items():
def get_bytes(self, key):
"""Return a bytes representation or raise a KeyError."""
f = open(os.path.join(self._path, str(key)), 'rb+')
f = open(os.path.join(self._path, str(key)), 'rb')
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
return f.read().replace(linesep, b'\n')
"""Return a file-like representation or raise a KeyError."""
f = open(os.path.join(self._path, str(key)), 'rb')
if e.errno == errno.ENOENT:
raise KeyError('No message with key: %s' % key)
"""Return an iterator over keys."""
return iter(sorted(int(entry) for entry in os.listdir(self._path)
def __contains__(self, key):
"""Return True if the keyed message exists, False otherwise."""
return os.path.exists(os.path.join(self._path, str(key)))
"""Return a count of messages in the mailbox."""
return len(list(self.iterkeys()))
self._file = open(os.path.join(self._path, '.mh_sequences'), 'rb+')
"""Unlock the mailbox if it is locked."""
"""Write any pending changes to the disk."""
"""Flush and close the mailbox."""
"""Return a list of folder names."""
for entry in os.listdir(self._path):
if os.path.isdir(os.path.join(self._path, entry)):
def get_folder(self, folder):
"""Return an MH instance for the named folder."""
return MH(os.path.join(self._path, folder),
factory=self._factory, create=False)
def add_folder(self, folder):
"""Create a folder and return an MH instance representing it."""
return MH(os.path.join(self._path, folder),
def remove_folder(self, folder):
"""Delete the named folder, which must be empty."""
path = os.path.join(self._path, folder)
entries = os.listdir(path)
if entries == ['.mh_sequences']:
os.remove(os.path.join(path, '.mh_sequences'))
raise NotEmptyError('Folder not empty: %s' % self._path)
"""Return a name-to-key-list dictionary to define each sequence."""
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
all_keys = set(self.keys())
name, contents = line.split(':')
for spec in contents.split():
start, stop = (int(x) for x in spec.split('-'))
keys.update(range(start, stop + 1))
results[name] = [key for key in sorted(keys) \
if len(results[name]) == 0:
raise FormatError('Invalid sequence specification: %s' %
def set_sequences(self, sequences):
"""Set sequences using the given name-to-key-list dictionary."""
f = open(os.path.join(self._path, '.mh_sequences'), 'r+', encoding='ASCII')
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
for name, keys in sequences.items():
for key in sorted(set(keys)):
f.write('%s %s' % (prev, key))
f.write(str(prev) + '\n')
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
sequences = self.get_sequences()
for key in self.iterkeys():
changes.append((key, prev + 1))
os.link(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
except (AttributeError, PermissionError):
os.rename(os.path.join(self._path, str(key)),
os.path.join(self._path, str(prev + 1)))
os.unlink(os.path.join(self._path, str(key)))
self._next_key = prev + 1
for name, key_list in sequences.items():
key_list[key_list.index(old)] = new
self.set_sequences(sequences)
def _dump_sequences(self, message, key):
"""Inspect a new MHMessage and update sequences appropriately."""
pending_sequences = message.get_sequences()
all_sequences = self.get_sequences()
for name, key_list in all_sequences.items():
if name in pending_sequences:
del key_list[key_list.index(key)]
for sequence in pending_sequences:
if sequence not in all_sequences:
all_sequences[sequence] = [key]
self.set_sequences(all_sequences)
class Babyl(_singlefileMailbox):
"""An Rmail-style Babyl mailbox."""
_special_labels = frozenset({'unseen', 'deleted', 'filed', 'answered',
'forwarded', 'edited', 'resent'})
def __init__(self, path, factory=None, create=True):
"""Initialize a Babyl mailbox."""
_singlefileMailbox.__init__(self, path, factory, create)
"""Add message and return assigned key."""
key = _singlefileMailbox.add(self, message)
if isinstance(message, BabylMessage):
self._labels[key] = message.get_labels()
"""Remove the keyed message; raise KeyError if it doesn't exist."""
_singlefileMailbox.remove(self, key)
def __setitem__(self, key, message):
"""Replace the keyed message; raise KeyError if it doesn't exist."""
_singlefileMailbox.__setitem__(self, key, message)
if isinstance(message, BabylMessage):
self._labels[key] = message.get_labels()
def get_message(self, key):
"""Return a Message representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
original_headers.write(line.replace(linesep, b'\n'))
visible_headers = io.BytesIO()
line = self._file.readline()
if line == linesep or not line:
visible_headers.write(line.replace(linesep, b'\n'))
# Read up to the stop, or to the end
n = stop - self._file.tell()
body = self._file.read(n)
body = body.replace(linesep, b'\n')
msg = BabylMessage(original_headers.getvalue() + body)
msg.set_visible(visible_headers.getvalue())
msg.set_labels(self._labels[key])
def get_bytes(self, key):
"""Return a string representation or raise a KeyError."""
start, stop = self._lookup(key)
self._file.readline() # Skip b'1,' line specifying labels.
original_headers = io.BytesIO()
line = self._file.readline()
if line == b'*** EOOH ***' + linesep or not line:
original_headers.write(line.replace(linesep, b'\n'))
line = self._file.readline()
if line == linesep or not line:
headers = original_headers.getvalue()
n = stop - self._file.tell()
data = self._file.read(n)
data = data.replace(linesep, b'\n')
"""Return a file-like representation or raise a KeyError."""
return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
"""Return a list of user-defined labels in the mailbox."""
for label_list in self._labels.values():
labels.update(label_list)
labels.difference_update(self._special_labels)
"""Generate key-to-(start, stop) table of contents."""
line = self._file.readline()
next_pos = self._file.tell()
if line == b'\037\014' + linesep:
if len(stops) < len(starts):
stops.append(line_pos - len(linesep))
labels = [label.strip() for label
in self._file.readline()[1:].split(b',')
label_lists.append(labels)
elif line == b'\037' or line == b'\037' + linesep:
if len(stops) < len(starts):
stops.append(line_pos - len(linesep))
stops.append(line_pos - len(linesep))
self._toc = dict(enumerate(zip(starts, stops)))
self._labels = dict(enumerate(label_lists))
self._next_key = len(self._toc)
self._file_length = self._file.tell()
def _pre_mailbox_hook(self, f):
"""Called before writing the mailbox to file f."""
babyl = b'BABYL OPTIONS:' + linesep
babyl += b'Version: 5' + linesep
labels = self.get_labels()
labels = (label.encode() for label in labels)
babyl += b'Labels:' + b','.join(labels) + linesep
def _pre_message_hook(self, f):
"""Called before writing each message to file f."""
f.write(b'\014' + linesep)
def _post_message_hook(self, f):
"""Called after writing each message to file f."""
f.write(linesep + b'\037')
def _install_message(self, message):
"""Write message contents and return (start, stop)."""
start = self._file.tell()
if isinstance(message, BabylMessage):
for label in message.get_labels():
if label in self._special_labels:
special_labels.append(label)
for label in special_labels:
self._file.write(b', ' + label.encode())
self._file.write(b' ' + label.encode() + b',')
self._file.write(linesep)
self._file.write(b'1,,' + linesep)
if isinstance(message, email.message.Message):
orig_buffer = io.BytesIO()
orig_generator = email.generator.BytesGenerator(orig_buffer, False, 0)
orig_generator.flatten(message)
line = orig_buffer.readline()
self._file.write(line.replace(b'\n', linesep))
if line == b'\n' or not line:
self._file.write(b'*** EOOH ***' + linesep)
if isinstance(message, BabylMessage):
vis_buffer = io.BytesIO()
vis_generator = email.generator.BytesGenerator(vis_buffer, False, 0)
vis_generator.flatten(message.get_visible())
line = vis_buffer.readline()
self._file.write(line.replace(b'\n', linesep))
if line == b'\n' or not line:
line = orig_buffer.readline()
self._file.write(line.replace(b'\n', linesep))
if line == b'\n' or not line:
buffer = orig_buffer.read(4096) # Buffer size is arbitrary.
self._file.write(buffer.replace(b'\n', linesep))
elif isinstance(message, (bytes, str, io.StringIO)):
if isinstance(message, io.StringIO):
warnings.warn("Use of StringIO input is deprecated, "
"use BytesIO instead", DeprecationWarning, 3)
message = message.getvalue()
if isinstance(message, str):
message = self._string_to_bytes(message)
body_start = message.find(b'\n\n') + 2
self._file.write(message[:body_start].replace(b'\n', linesep))
self._file.write(b'*** EOOH ***' + linesep)
self._file.write(message[:body_start].replace(b'\n', linesep))
self._file.write(message[body_start:].replace(b'\n', linesep))
self._file.write(b'*** EOOH ***' + linesep + linesep)
self._file.write(message.replace(b'\n', linesep))
elif hasattr(message, 'readline'):
if hasattr(message, 'buffer'):
warnings.warn("Use of text mode files is deprecated, "
"use a binary mode file instead", DeprecationWarning, 3)
original_pos = message.tell()
line = message.readline()
# Universal newline support.
if line.endswith(b'\r\n'):
elif line.endswith(b'\r'):
self._file.write(line.replace(b'\n', linesep))
if line == b'\n' or not line:
self._file.write(b'*** EOOH ***' + linesep)
message.seek(original_pos)
line = message.readline()
# Universal newline support.
if line.endswith(b'\r\n'):
line = line[:-2] + linesep
elif line.endswith(b'\r'):
line = line[:-1] + linesep
elif line.endswith(b'\n'):
line = line[:-1] + linesep
raise TypeError('Invalid message type: %s' % type(message))
class Message(email.message.Message):
"""Message with mailbox-format-specific properties."""
def __init__(self, message=None):
"""Initialize a Message instance."""
if isinstance(message, email.message.Message):
self._become_message(copy.deepcopy(message))
if isinstance(message, Message):
message._explain_to(self)
elif isinstance(message, bytes):
self._become_message(email.message_from_bytes(message))
elif isinstance(message, str):
self._become_message(email.message_from_string(message))
elif isinstance(message, io.TextIOWrapper):
self._become_message(email.message_from_file(message))