Edit File by line
/home/barbar84/www/wp-conte.../plugins/sujqvwi/AnonR/smanonr..../lib64/python3....
File: mailbox.py
if e.errno == errno.ENOENT:
[1000] Fix | Delete
raise KeyError('No message with key: %s' % key)
[1001] Fix | Delete
else:
[1002] Fix | Delete
raise
[1003] Fix | Delete
try:
[1004] Fix | Delete
if self._locked:
[1005] Fix | Delete
_lock_file(f)
[1006] Fix | Delete
try:
[1007] Fix | Delete
os.close(os.open(path, os.O_WRONLY | os.O_TRUNC))
[1008] Fix | Delete
self._dump_message(message, f)
[1009] Fix | Delete
if isinstance(message, MHMessage):
[1010] Fix | Delete
self._dump_sequences(message, key)
[1011] Fix | Delete
finally:
[1012] Fix | Delete
if self._locked:
[1013] Fix | Delete
_unlock_file(f)
[1014] Fix | Delete
finally:
[1015] Fix | Delete
_sync_close(f)
[1016] Fix | Delete
[1017] Fix | Delete
def get_message(self, key):
[1018] Fix | Delete
"""Return a Message representation or raise a KeyError."""
[1019] Fix | Delete
try:
[1020] Fix | Delete
if self._locked:
[1021] Fix | Delete
f = open(os.path.join(self._path, str(key)), 'rb+')
[1022] Fix | Delete
else:
[1023] Fix | Delete
f = open(os.path.join(self._path, str(key)), 'rb')
[1024] Fix | Delete
except OSError as e:
[1025] Fix | Delete
if e.errno == errno.ENOENT:
[1026] Fix | Delete
raise KeyError('No message with key: %s' % key)
[1027] Fix | Delete
else:
[1028] Fix | Delete
raise
[1029] Fix | Delete
with f:
[1030] Fix | Delete
if self._locked:
[1031] Fix | Delete
_lock_file(f)
[1032] Fix | Delete
try:
[1033] Fix | Delete
msg = MHMessage(f)
[1034] Fix | Delete
finally:
[1035] Fix | Delete
if self._locked:
[1036] Fix | Delete
_unlock_file(f)
[1037] Fix | Delete
for name, key_list in self.get_sequences().items():
[1038] Fix | Delete
if key in key_list:
[1039] Fix | Delete
msg.add_sequence(name)
[1040] Fix | Delete
return msg
[1041] Fix | Delete
[1042] Fix | Delete
def get_bytes(self, key):
[1043] Fix | Delete
"""Return a bytes representation or raise a KeyError."""
[1044] Fix | Delete
try:
[1045] Fix | Delete
if self._locked:
[1046] Fix | Delete
f = open(os.path.join(self._path, str(key)), 'rb+')
[1047] Fix | Delete
else:
[1048] Fix | Delete
f = open(os.path.join(self._path, str(key)), 'rb')
[1049] Fix | Delete
except OSError as e:
[1050] Fix | Delete
if e.errno == errno.ENOENT:
[1051] Fix | Delete
raise KeyError('No message with key: %s' % key)
[1052] Fix | Delete
else:
[1053] Fix | Delete
raise
[1054] Fix | Delete
with f:
[1055] Fix | Delete
if self._locked:
[1056] Fix | Delete
_lock_file(f)
[1057] Fix | Delete
try:
[1058] Fix | Delete
return f.read().replace(linesep, b'\n')
[1059] Fix | Delete
finally:
[1060] Fix | Delete
if self._locked:
[1061] Fix | Delete
_unlock_file(f)
[1062] Fix | Delete
[1063] Fix | Delete
def get_file(self, key):
[1064] Fix | Delete
"""Return a file-like representation or raise a KeyError."""
[1065] Fix | Delete
try:
[1066] Fix | Delete
f = open(os.path.join(self._path, str(key)), 'rb')
[1067] Fix | Delete
except OSError as e:
[1068] Fix | Delete
if e.errno == errno.ENOENT:
[1069] Fix | Delete
raise KeyError('No message with key: %s' % key)
[1070] Fix | Delete
else:
[1071] Fix | Delete
raise
[1072] Fix | Delete
return _ProxyFile(f)
[1073] Fix | Delete
[1074] Fix | Delete
def iterkeys(self):
[1075] Fix | Delete
"""Return an iterator over keys."""
[1076] Fix | Delete
return iter(sorted(int(entry) for entry in os.listdir(self._path)
[1077] Fix | Delete
if entry.isdigit()))
[1078] Fix | Delete
[1079] Fix | Delete
def __contains__(self, key):
[1080] Fix | Delete
"""Return True if the keyed message exists, False otherwise."""
[1081] Fix | Delete
return os.path.exists(os.path.join(self._path, str(key)))
[1082] Fix | Delete
[1083] Fix | Delete
def __len__(self):
[1084] Fix | Delete
"""Return a count of messages in the mailbox."""
[1085] Fix | Delete
return len(list(self.iterkeys()))
[1086] Fix | Delete
[1087] Fix | Delete
def lock(self):
[1088] Fix | Delete
"""Lock the mailbox."""
[1089] Fix | Delete
if not self._locked:
[1090] Fix | Delete
self._file = open(os.path.join(self._path, '.mh_sequences'), 'rb+')
[1091] Fix | Delete
_lock_file(self._file)
[1092] Fix | Delete
self._locked = True
[1093] Fix | Delete
[1094] Fix | Delete
def unlock(self):
[1095] Fix | Delete
"""Unlock the mailbox if it is locked."""
[1096] Fix | Delete
if self._locked:
[1097] Fix | Delete
_unlock_file(self._file)
[1098] Fix | Delete
_sync_close(self._file)
[1099] Fix | Delete
del self._file
[1100] Fix | Delete
self._locked = False
[1101] Fix | Delete
[1102] Fix | Delete
def flush(self):
[1103] Fix | Delete
"""Write any pending changes to the disk."""
[1104] Fix | Delete
return
[1105] Fix | Delete
[1106] Fix | Delete
def close(self):
[1107] Fix | Delete
"""Flush and close the mailbox."""
[1108] Fix | Delete
if self._locked:
[1109] Fix | Delete
self.unlock()
[1110] Fix | Delete
[1111] Fix | Delete
def list_folders(self):
[1112] Fix | Delete
"""Return a list of folder names."""
[1113] Fix | Delete
result = []
[1114] Fix | Delete
for entry in os.listdir(self._path):
[1115] Fix | Delete
if os.path.isdir(os.path.join(self._path, entry)):
[1116] Fix | Delete
result.append(entry)
[1117] Fix | Delete
return result
[1118] Fix | Delete
[1119] Fix | Delete
def get_folder(self, folder):
[1120] Fix | Delete
"""Return an MH instance for the named folder."""
[1121] Fix | Delete
return MH(os.path.join(self._path, folder),
[1122] Fix | Delete
factory=self._factory, create=False)
[1123] Fix | Delete
[1124] Fix | Delete
def add_folder(self, folder):
[1125] Fix | Delete
"""Create a folder and return an MH instance representing it."""
[1126] Fix | Delete
return MH(os.path.join(self._path, folder),
[1127] Fix | Delete
factory=self._factory)
[1128] Fix | Delete
[1129] Fix | Delete
def remove_folder(self, folder):
[1130] Fix | Delete
"""Delete the named folder, which must be empty."""
[1131] Fix | Delete
path = os.path.join(self._path, folder)
[1132] Fix | Delete
entries = os.listdir(path)
[1133] Fix | Delete
if entries == ['.mh_sequences']:
[1134] Fix | Delete
os.remove(os.path.join(path, '.mh_sequences'))
[1135] Fix | Delete
elif entries == []:
[1136] Fix | Delete
pass
[1137] Fix | Delete
else:
[1138] Fix | Delete
raise NotEmptyError('Folder not empty: %s' % self._path)
[1139] Fix | Delete
os.rmdir(path)
[1140] Fix | Delete
[1141] Fix | Delete
def get_sequences(self):
[1142] Fix | Delete
"""Return a name-to-key-list dictionary to define each sequence."""
[1143] Fix | Delete
results = {}
[1144] Fix | Delete
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
[1145] Fix | Delete
all_keys = set(self.keys())
[1146] Fix | Delete
for line in f:
[1147] Fix | Delete
try:
[1148] Fix | Delete
name, contents = line.split(':')
[1149] Fix | Delete
keys = set()
[1150] Fix | Delete
for spec in contents.split():
[1151] Fix | Delete
if spec.isdigit():
[1152] Fix | Delete
keys.add(int(spec))
[1153] Fix | Delete
else:
[1154] Fix | Delete
start, stop = (int(x) for x in spec.split('-'))
[1155] Fix | Delete
keys.update(range(start, stop + 1))
[1156] Fix | Delete
results[name] = [key for key in sorted(keys) \
[1157] Fix | Delete
if key in all_keys]
[1158] Fix | Delete
if len(results[name]) == 0:
[1159] Fix | Delete
del results[name]
[1160] Fix | Delete
except ValueError:
[1161] Fix | Delete
raise FormatError('Invalid sequence specification: %s' %
[1162] Fix | Delete
line.rstrip())
[1163] Fix | Delete
return results
[1164] Fix | Delete
[1165] Fix | Delete
def set_sequences(self, sequences):
[1166] Fix | Delete
"""Set sequences using the given name-to-key-list dictionary."""
[1167] Fix | Delete
f = open(os.path.join(self._path, '.mh_sequences'), 'r+', encoding='ASCII')
[1168] Fix | Delete
try:
[1169] Fix | Delete
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
[1170] Fix | Delete
for name, keys in sequences.items():
[1171] Fix | Delete
if len(keys) == 0:
[1172] Fix | Delete
continue
[1173] Fix | Delete
f.write(name + ':')
[1174] Fix | Delete
prev = None
[1175] Fix | Delete
completing = False
[1176] Fix | Delete
for key in sorted(set(keys)):
[1177] Fix | Delete
if key - 1 == prev:
[1178] Fix | Delete
if not completing:
[1179] Fix | Delete
completing = True
[1180] Fix | Delete
f.write('-')
[1181] Fix | Delete
elif completing:
[1182] Fix | Delete
completing = False
[1183] Fix | Delete
f.write('%s %s' % (prev, key))
[1184] Fix | Delete
else:
[1185] Fix | Delete
f.write(' %s' % key)
[1186] Fix | Delete
prev = key
[1187] Fix | Delete
if completing:
[1188] Fix | Delete
f.write(str(prev) + '\n')
[1189] Fix | Delete
else:
[1190] Fix | Delete
f.write('\n')
[1191] Fix | Delete
finally:
[1192] Fix | Delete
_sync_close(f)
[1193] Fix | Delete
[1194] Fix | Delete
def pack(self):
[1195] Fix | Delete
"""Re-name messages to eliminate numbering gaps. Invalidates keys."""
[1196] Fix | Delete
sequences = self.get_sequences()
[1197] Fix | Delete
prev = 0
[1198] Fix | Delete
changes = []
[1199] Fix | Delete
for key in self.iterkeys():
[1200] Fix | Delete
if key - 1 != prev:
[1201] Fix | Delete
changes.append((key, prev + 1))
[1202] Fix | Delete
try:
[1203] Fix | Delete
os.link(os.path.join(self._path, str(key)),
[1204] Fix | Delete
os.path.join(self._path, str(prev + 1)))
[1205] Fix | Delete
except (AttributeError, PermissionError):
[1206] Fix | Delete
os.rename(os.path.join(self._path, str(key)),
[1207] Fix | Delete
os.path.join(self._path, str(prev + 1)))
[1208] Fix | Delete
else:
[1209] Fix | Delete
os.unlink(os.path.join(self._path, str(key)))
[1210] Fix | Delete
prev += 1
[1211] Fix | Delete
self._next_key = prev + 1
[1212] Fix | Delete
if len(changes) == 0:
[1213] Fix | Delete
return
[1214] Fix | Delete
for name, key_list in sequences.items():
[1215] Fix | Delete
for old, new in changes:
[1216] Fix | Delete
if old in key_list:
[1217] Fix | Delete
key_list[key_list.index(old)] = new
[1218] Fix | Delete
self.set_sequences(sequences)
[1219] Fix | Delete
[1220] Fix | Delete
def _dump_sequences(self, message, key):
[1221] Fix | Delete
"""Inspect a new MHMessage and update sequences appropriately."""
[1222] Fix | Delete
pending_sequences = message.get_sequences()
[1223] Fix | Delete
all_sequences = self.get_sequences()
[1224] Fix | Delete
for name, key_list in all_sequences.items():
[1225] Fix | Delete
if name in pending_sequences:
[1226] Fix | Delete
key_list.append(key)
[1227] Fix | Delete
elif key in key_list:
[1228] Fix | Delete
del key_list[key_list.index(key)]
[1229] Fix | Delete
for sequence in pending_sequences:
[1230] Fix | Delete
if sequence not in all_sequences:
[1231] Fix | Delete
all_sequences[sequence] = [key]
[1232] Fix | Delete
self.set_sequences(all_sequences)
[1233] Fix | Delete
[1234] Fix | Delete
[1235] Fix | Delete
class Babyl(_singlefileMailbox):
[1236] Fix | Delete
"""An Rmail-style Babyl mailbox."""
[1237] Fix | Delete
[1238] Fix | Delete
_special_labels = frozenset({'unseen', 'deleted', 'filed', 'answered',
[1239] Fix | Delete
'forwarded', 'edited', 'resent'})
[1240] Fix | Delete
[1241] Fix | Delete
def __init__(self, path, factory=None, create=True):
[1242] Fix | Delete
"""Initialize a Babyl mailbox."""
[1243] Fix | Delete
_singlefileMailbox.__init__(self, path, factory, create)
[1244] Fix | Delete
self._labels = {}
[1245] Fix | Delete
[1246] Fix | Delete
def add(self, message):
[1247] Fix | Delete
"""Add message and return assigned key."""
[1248] Fix | Delete
key = _singlefileMailbox.add(self, message)
[1249] Fix | Delete
if isinstance(message, BabylMessage):
[1250] Fix | Delete
self._labels[key] = message.get_labels()
[1251] Fix | Delete
return key
[1252] Fix | Delete
[1253] Fix | Delete
def remove(self, key):
[1254] Fix | Delete
"""Remove the keyed message; raise KeyError if it doesn't exist."""
[1255] Fix | Delete
_singlefileMailbox.remove(self, key)
[1256] Fix | Delete
if key in self._labels:
[1257] Fix | Delete
del self._labels[key]
[1258] Fix | Delete
[1259] Fix | Delete
def __setitem__(self, key, message):
[1260] Fix | Delete
"""Replace the keyed message; raise KeyError if it doesn't exist."""
[1261] Fix | Delete
_singlefileMailbox.__setitem__(self, key, message)
[1262] Fix | Delete
if isinstance(message, BabylMessage):
[1263] Fix | Delete
self._labels[key] = message.get_labels()
[1264] Fix | Delete
[1265] Fix | Delete
def get_message(self, key):
[1266] Fix | Delete
"""Return a Message representation or raise a KeyError."""
[1267] Fix | Delete
start, stop = self._lookup(key)
[1268] Fix | Delete
self._file.seek(start)
[1269] Fix | Delete
self._file.readline() # Skip b'1,' line specifying labels.
[1270] Fix | Delete
original_headers = io.BytesIO()
[1271] Fix | Delete
while True:
[1272] Fix | Delete
line = self._file.readline()
[1273] Fix | Delete
if line == b'*** EOOH ***' + linesep or not line:
[1274] Fix | Delete
break
[1275] Fix | Delete
original_headers.write(line.replace(linesep, b'\n'))
[1276] Fix | Delete
visible_headers = io.BytesIO()
[1277] Fix | Delete
while True:
[1278] Fix | Delete
line = self._file.readline()
[1279] Fix | Delete
if line == linesep or not line:
[1280] Fix | Delete
break
[1281] Fix | Delete
visible_headers.write(line.replace(linesep, b'\n'))
[1282] Fix | Delete
# Read up to the stop, or to the end
[1283] Fix | Delete
n = stop - self._file.tell()
[1284] Fix | Delete
assert n >= 0
[1285] Fix | Delete
body = self._file.read(n)
[1286] Fix | Delete
body = body.replace(linesep, b'\n')
[1287] Fix | Delete
msg = BabylMessage(original_headers.getvalue() + body)
[1288] Fix | Delete
msg.set_visible(visible_headers.getvalue())
[1289] Fix | Delete
if key in self._labels:
[1290] Fix | Delete
msg.set_labels(self._labels[key])
[1291] Fix | Delete
return msg
[1292] Fix | Delete
[1293] Fix | Delete
def get_bytes(self, key):
[1294] Fix | Delete
"""Return a string representation or raise a KeyError."""
[1295] Fix | Delete
start, stop = self._lookup(key)
[1296] Fix | Delete
self._file.seek(start)
[1297] Fix | Delete
self._file.readline() # Skip b'1,' line specifying labels.
[1298] Fix | Delete
original_headers = io.BytesIO()
[1299] Fix | Delete
while True:
[1300] Fix | Delete
line = self._file.readline()
[1301] Fix | Delete
if line == b'*** EOOH ***' + linesep or not line:
[1302] Fix | Delete
break
[1303] Fix | Delete
original_headers.write(line.replace(linesep, b'\n'))
[1304] Fix | Delete
while True:
[1305] Fix | Delete
line = self._file.readline()
[1306] Fix | Delete
if line == linesep or not line:
[1307] Fix | Delete
break
[1308] Fix | Delete
headers = original_headers.getvalue()
[1309] Fix | Delete
n = stop - self._file.tell()
[1310] Fix | Delete
assert n >= 0
[1311] Fix | Delete
data = self._file.read(n)
[1312] Fix | Delete
data = data.replace(linesep, b'\n')
[1313] Fix | Delete
return headers + data
[1314] Fix | Delete
[1315] Fix | Delete
def get_file(self, key):
[1316] Fix | Delete
"""Return a file-like representation or raise a KeyError."""
[1317] Fix | Delete
return io.BytesIO(self.get_bytes(key).replace(b'\n', linesep))
[1318] Fix | Delete
[1319] Fix | Delete
def get_labels(self):
[1320] Fix | Delete
"""Return a list of user-defined labels in the mailbox."""
[1321] Fix | Delete
self._lookup()
[1322] Fix | Delete
labels = set()
[1323] Fix | Delete
for label_list in self._labels.values():
[1324] Fix | Delete
labels.update(label_list)
[1325] Fix | Delete
labels.difference_update(self._special_labels)
[1326] Fix | Delete
return list(labels)
[1327] Fix | Delete
[1328] Fix | Delete
def _generate_toc(self):
[1329] Fix | Delete
"""Generate key-to-(start, stop) table of contents."""
[1330] Fix | Delete
starts, stops = [], []
[1331] Fix | Delete
self._file.seek(0)
[1332] Fix | Delete
next_pos = 0
[1333] Fix | Delete
label_lists = []
[1334] Fix | Delete
while True:
[1335] Fix | Delete
line_pos = next_pos
[1336] Fix | Delete
line = self._file.readline()
[1337] Fix | Delete
next_pos = self._file.tell()
[1338] Fix | Delete
if line == b'\037\014' + linesep:
[1339] Fix | Delete
if len(stops) < len(starts):
[1340] Fix | Delete
stops.append(line_pos - len(linesep))
[1341] Fix | Delete
starts.append(next_pos)
[1342] Fix | Delete
labels = [label.strip() for label
[1343] Fix | Delete
in self._file.readline()[1:].split(b',')
[1344] Fix | Delete
if label.strip()]
[1345] Fix | Delete
label_lists.append(labels)
[1346] Fix | Delete
elif line == b'\037' or line == b'\037' + linesep:
[1347] Fix | Delete
if len(stops) < len(starts):
[1348] Fix | Delete
stops.append(line_pos - len(linesep))
[1349] Fix | Delete
elif not line:
[1350] Fix | Delete
stops.append(line_pos - len(linesep))
[1351] Fix | Delete
break
[1352] Fix | Delete
self._toc = dict(enumerate(zip(starts, stops)))
[1353] Fix | Delete
self._labels = dict(enumerate(label_lists))
[1354] Fix | Delete
self._next_key = len(self._toc)
[1355] Fix | Delete
self._file.seek(0, 2)
[1356] Fix | Delete
self._file_length = self._file.tell()
[1357] Fix | Delete
[1358] Fix | Delete
def _pre_mailbox_hook(self, f):
[1359] Fix | Delete
"""Called before writing the mailbox to file f."""
[1360] Fix | Delete
babyl = b'BABYL OPTIONS:' + linesep
[1361] Fix | Delete
babyl += b'Version: 5' + linesep
[1362] Fix | Delete
labels = self.get_labels()
[1363] Fix | Delete
labels = (label.encode() for label in labels)
[1364] Fix | Delete
babyl += b'Labels:' + b','.join(labels) + linesep
[1365] Fix | Delete
babyl += b'\037'
[1366] Fix | Delete
f.write(babyl)
[1367] Fix | Delete
[1368] Fix | Delete
def _pre_message_hook(self, f):
[1369] Fix | Delete
"""Called before writing each message to file f."""
[1370] Fix | Delete
f.write(b'\014' + linesep)
[1371] Fix | Delete
[1372] Fix | Delete
def _post_message_hook(self, f):
[1373] Fix | Delete
"""Called after writing each message to file f."""
[1374] Fix | Delete
f.write(linesep + b'\037')
[1375] Fix | Delete
[1376] Fix | Delete
def _install_message(self, message):
[1377] Fix | Delete
"""Write message contents and return (start, stop)."""
[1378] Fix | Delete
start = self._file.tell()
[1379] Fix | Delete
if isinstance(message, BabylMessage):
[1380] Fix | Delete
special_labels = []
[1381] Fix | Delete
labels = []
[1382] Fix | Delete
for label in message.get_labels():
[1383] Fix | Delete
if label in self._special_labels:
[1384] Fix | Delete
special_labels.append(label)
[1385] Fix | Delete
else:
[1386] Fix | Delete
labels.append(label)
[1387] Fix | Delete
self._file.write(b'1')
[1388] Fix | Delete
for label in special_labels:
[1389] Fix | Delete
self._file.write(b', ' + label.encode())
[1390] Fix | Delete
self._file.write(b',,')
[1391] Fix | Delete
for label in labels:
[1392] Fix | Delete
self._file.write(b' ' + label.encode() + b',')
[1393] Fix | Delete
self._file.write(linesep)
[1394] Fix | Delete
else:
[1395] Fix | Delete
self._file.write(b'1,,' + linesep)
[1396] Fix | Delete
if isinstance(message, email.message.Message):
[1397] Fix | Delete
orig_buffer = io.BytesIO()
[1398] Fix | Delete
orig_generator = email.generator.BytesGenerator(orig_buffer, False, 0)
[1399] Fix | Delete
orig_generator.flatten(message)
[1400] Fix | Delete
orig_buffer.seek(0)
[1401] Fix | Delete
while True:
[1402] Fix | Delete
line = orig_buffer.readline()
[1403] Fix | Delete
self._file.write(line.replace(b'\n', linesep))
[1404] Fix | Delete
if line == b'\n' or not line:
[1405] Fix | Delete
break
[1406] Fix | Delete
self._file.write(b'*** EOOH ***' + linesep)
[1407] Fix | Delete
if isinstance(message, BabylMessage):
[1408] Fix | Delete
vis_buffer = io.BytesIO()
[1409] Fix | Delete
vis_generator = email.generator.BytesGenerator(vis_buffer, False, 0)
[1410] Fix | Delete
vis_generator.flatten(message.get_visible())
[1411] Fix | Delete
while True:
[1412] Fix | Delete
line = vis_buffer.readline()
[1413] Fix | Delete
self._file.write(line.replace(b'\n', linesep))
[1414] Fix | Delete
if line == b'\n' or not line:
[1415] Fix | Delete
break
[1416] Fix | Delete
else:
[1417] Fix | Delete
orig_buffer.seek(0)
[1418] Fix | Delete
while True:
[1419] Fix | Delete
line = orig_buffer.readline()
[1420] Fix | Delete
self._file.write(line.replace(b'\n', linesep))
[1421] Fix | Delete
if line == b'\n' or not line:
[1422] Fix | Delete
break
[1423] Fix | Delete
while True:
[1424] Fix | Delete
buffer = orig_buffer.read(4096) # Buffer size is arbitrary.
[1425] Fix | Delete
if not buffer:
[1426] Fix | Delete
break
[1427] Fix | Delete
self._file.write(buffer.replace(b'\n', linesep))
[1428] Fix | Delete
elif isinstance(message, (bytes, str, io.StringIO)):
[1429] Fix | Delete
if isinstance(message, io.StringIO):
[1430] Fix | Delete
warnings.warn("Use of StringIO input is deprecated, "
[1431] Fix | Delete
"use BytesIO instead", DeprecationWarning, 3)
[1432] Fix | Delete
message = message.getvalue()
[1433] Fix | Delete
if isinstance(message, str):
[1434] Fix | Delete
message = self._string_to_bytes(message)
[1435] Fix | Delete
body_start = message.find(b'\n\n') + 2
[1436] Fix | Delete
if body_start - 2 != -1:
[1437] Fix | Delete
self._file.write(message[:body_start].replace(b'\n', linesep))
[1438] Fix | Delete
self._file.write(b'*** EOOH ***' + linesep)
[1439] Fix | Delete
self._file.write(message[:body_start].replace(b'\n', linesep))
[1440] Fix | Delete
self._file.write(message[body_start:].replace(b'\n', linesep))
[1441] Fix | Delete
else:
[1442] Fix | Delete
self._file.write(b'*** EOOH ***' + linesep + linesep)
[1443] Fix | Delete
self._file.write(message.replace(b'\n', linesep))
[1444] Fix | Delete
elif hasattr(message, 'readline'):
[1445] Fix | Delete
if hasattr(message, 'buffer'):
[1446] Fix | Delete
warnings.warn("Use of text mode files is deprecated, "
[1447] Fix | Delete
"use a binary mode file instead", DeprecationWarning, 3)
[1448] Fix | Delete
message = message.buffer
[1449] Fix | Delete
original_pos = message.tell()
[1450] Fix | Delete
first_pass = True
[1451] Fix | Delete
while True:
[1452] Fix | Delete
line = message.readline()
[1453] Fix | Delete
# Universal newline support.
[1454] Fix | Delete
if line.endswith(b'\r\n'):
[1455] Fix | Delete
line = line[:-2] + b'\n'
[1456] Fix | Delete
elif line.endswith(b'\r'):
[1457] Fix | Delete
line = line[:-1] + b'\n'
[1458] Fix | Delete
self._file.write(line.replace(b'\n', linesep))
[1459] Fix | Delete
if line == b'\n' or not line:
[1460] Fix | Delete
if first_pass:
[1461] Fix | Delete
first_pass = False
[1462] Fix | Delete
self._file.write(b'*** EOOH ***' + linesep)
[1463] Fix | Delete
message.seek(original_pos)
[1464] Fix | Delete
else:
[1465] Fix | Delete
break
[1466] Fix | Delete
while True:
[1467] Fix | Delete
line = message.readline()
[1468] Fix | Delete
if not line:
[1469] Fix | Delete
break
[1470] Fix | Delete
# Universal newline support.
[1471] Fix | Delete
if line.endswith(b'\r\n'):
[1472] Fix | Delete
line = line[:-2] + linesep
[1473] Fix | Delete
elif line.endswith(b'\r'):
[1474] Fix | Delete
line = line[:-1] + linesep
[1475] Fix | Delete
elif line.endswith(b'\n'):
[1476] Fix | Delete
line = line[:-1] + linesep
[1477] Fix | Delete
self._file.write(line)
[1478] Fix | Delete
else:
[1479] Fix | Delete
raise TypeError('Invalid message type: %s' % type(message))
[1480] Fix | Delete
stop = self._file.tell()
[1481] Fix | Delete
return (start, stop)
[1482] Fix | Delete
[1483] Fix | Delete
[1484] Fix | Delete
class Message(email.message.Message):
[1485] Fix | Delete
"""Message with mailbox-format-specific properties."""
[1486] Fix | Delete
[1487] Fix | Delete
def __init__(self, message=None):
[1488] Fix | Delete
"""Initialize a Message instance."""
[1489] Fix | Delete
if isinstance(message, email.message.Message):
[1490] Fix | Delete
self._become_message(copy.deepcopy(message))
[1491] Fix | Delete
if isinstance(message, Message):
[1492] Fix | Delete
message._explain_to(self)
[1493] Fix | Delete
elif isinstance(message, bytes):
[1494] Fix | Delete
self._become_message(email.message_from_bytes(message))
[1495] Fix | Delete
elif isinstance(message, str):
[1496] Fix | Delete
self._become_message(email.message_from_string(message))
[1497] Fix | Delete
elif isinstance(message, io.TextIOWrapper):
[1498] Fix | Delete
self._become_message(email.message_from_file(message))
[1499] Fix | Delete
It is recommended that you Edit text format, this type of Fix handles quite a lot in one request
Function