Edit File by line
/home/barbar84/public_h.../wp-conte.../plugins/sujqvwi/AnonR/anonr.TX.../proc/self/root/opt/sharedra...
File: clean_exim.py
#!/opt/imh-python/bin/python3
[0] Fix | Delete
"""Python exim cleanup script to reduce the urge to run hacky oneliners"""
[1] Fix | Delete
[2] Fix | Delete
import argparse
[3] Fix | Delete
import re
[4] Fix | Delete
import subprocess
[5] Fix | Delete
import sys
[6] Fix | Delete
from collections import defaultdict
[7] Fix | Delete
from typing import Union
[8] Fix | Delete
[9] Fix | Delete
# compile regex once and recycle them throughout the script for efficiency
[10] Fix | Delete
EXIM_BP_RE = re.compile(
[11] Fix | Delete
r'\s*(?P<age>[0-9]+[mhd])\s+[0-9\.]+[M|K]? ' # 5m 15K
[12] Fix | Delete
r'(?P<id>[a-zA-Z0-9\-]+)\s+' # 1XEhho-0006AU-Tx
[13] Fix | Delete
r'\<(?P<sender>.*)\>' # <emailuser@domain>
[14] Fix | Delete
)
[15] Fix | Delete
[16] Fix | Delete
[17] Fix | Delete
def run(
[18] Fix | Delete
command: Union[list[str], str], shell=False
[19] Fix | Delete
) -> subprocess.CompletedProcess:
[20] Fix | Delete
"""
[21] Fix | Delete
Run a process with arguments.
[22] Fix | Delete
Optionally, as a shell command
[23] Fix | Delete
returns CompletedProcess.
[24] Fix | Delete
"""
[25] Fix | Delete
args = command
[26] Fix | Delete
[27] Fix | Delete
result = subprocess.run(
[28] Fix | Delete
args,
[29] Fix | Delete
capture_output=True,
[30] Fix | Delete
shell=shell,
[31] Fix | Delete
errors="surrogateescape",
[32] Fix | Delete
encoding="utf-8",
[33] Fix | Delete
check=False,
[34] Fix | Delete
)
[35] Fix | Delete
return result
[36] Fix | Delete
[37] Fix | Delete
[38] Fix | Delete
def get_queue(exclude_bounces=False, bounces_only=False):
[39] Fix | Delete
"""Get current exim queue as a list of dicts, each dict
[40] Fix | Delete
containing a single message, with keys 'age', 'id', and 'sender'"""
[41] Fix | Delete
queue = []
[42] Fix | Delete
out = run(["exim", "-bp"])
[43] Fix | Delete
queue_lines = out.stdout.splitlines()
[44] Fix | Delete
for line in queue_lines:
[45] Fix | Delete
match = EXIM_BP_RE.match(line)
[46] Fix | Delete
if match is None:
[47] Fix | Delete
continue
[48] Fix | Delete
groupdict = match.groupdict()
[49] Fix | Delete
if exclude_bounces and groupdict['sender'] == '':
[50] Fix | Delete
continue
[51] Fix | Delete
if bounces_only and groupdict['sender'] != '':
[52] Fix | Delete
continue
[53] Fix | Delete
groupdict['age'] = exim_age_to_secs(groupdict['age'])
[54] Fix | Delete
if groupdict['age'] > 3600:
[55] Fix | Delete
queue.append(groupdict)
[56] Fix | Delete
return queue
[57] Fix | Delete
[58] Fix | Delete
[59] Fix | Delete
def exim_age_to_secs(age):
[60] Fix | Delete
"""convert exim age to seconds"""
[61] Fix | Delete
conversions = {'m': 60, 'h': 3600, 'd': 86400}
[62] Fix | Delete
# find the multiplier based on above conversions
[63] Fix | Delete
multiplier = conversions[age[-1]]
[64] Fix | Delete
# typecast to int and use multiplier above
[65] Fix | Delete
return int(age.rstrip('mhd')) * multiplier
[66] Fix | Delete
[67] Fix | Delete
[68] Fix | Delete
def is_boxtrapper(msg_id):
[69] Fix | Delete
"""Determine if a message ID is a boxtrapper msg in queue"""
[70] Fix | Delete
try:
[71] Fix | Delete
out = run(['exim', '-Mvh', msg_id])
[72] Fix | Delete
out.check_returncode()
[73] Fix | Delete
head = out.stdout.strip()
[74] Fix | Delete
except subprocess.CalledProcessError:
[75] Fix | Delete
return False # likely no longer in queue
[76] Fix | Delete
return 'Subject: Your email requires verification verify#' in head
[77] Fix | Delete
[78] Fix | Delete
[79] Fix | Delete
def remove_msg(msg_id):
[80] Fix | Delete
"""Here, msg_id may be a list or just one string"""
[81] Fix | Delete
if isinstance(msg_id, list):
[82] Fix | Delete
msg_id = ' '.join(msg_id)
[83] Fix | Delete
try:
[84] Fix | Delete
run(['exim', '-Mrm', msg_id]).check_returncode()
[85] Fix | Delete
except subprocess.CalledProcessError:
[86] Fix | Delete
pass # may have already been removed from queue or sent
[87] Fix | Delete
[88] Fix | Delete
[89] Fix | Delete
def print_removed(removed):
[90] Fix | Delete
"""Given a dict of user:count mappings, print removed"""
[91] Fix | Delete
if len(list(removed.keys())) == 0:
[92] Fix | Delete
print('None to remove')
[93] Fix | Delete
else:
[94] Fix | Delete
print('Removed:')
[95] Fix | Delete
for user, count in removed.items():
[96] Fix | Delete
print(' %s: %d' % (user, count))
[97] Fix | Delete
[98] Fix | Delete
[99] Fix | Delete
def clean_boxtrapper():
[100] Fix | Delete
"""Remove old boxtrapper messages from queue. The theory is that if
[101] Fix | Delete
still stuck in queue after min_age_secs, they were likely sent from
[102] Fix | Delete
an illigitimate email address and will never clear from queue normally"""
[103] Fix | Delete
print('Removing old boxtrapper messages...')
[104] Fix | Delete
removed = defaultdict(int)
[105] Fix | Delete
queue = get_queue(exclude_bounces=True)
[106] Fix | Delete
queue = [x for x in queue if is_boxtrapper(x['id'])]
[107] Fix | Delete
for msg in queue:
[108] Fix | Delete
remove_msg(msg['id'])
[109] Fix | Delete
removed[msg['sender']] += 1
[110] Fix | Delete
print_removed(removed)
[111] Fix | Delete
[112] Fix | Delete
[113] Fix | Delete
def get_full_quota_user(msg_id):
[114] Fix | Delete
"""Get the user for a message which is at full quota, or None"""
[115] Fix | Delete
try:
[116] Fix | Delete
out = run(['exim', '-Mvb', msg_id])
[117] Fix | Delete
out.check_returncode()
[118] Fix | Delete
body = out.stdout.strip()
[119] Fix | Delete
except subprocess.CalledProcessError:
[120] Fix | Delete
return None
[121] Fix | Delete
if not ' Mailbox quota exceeded' in body:
[122] Fix | Delete
return None
[123] Fix | Delete
body = body.splitlines()
[124] Fix | Delete
addr_line = -1
[125] Fix | Delete
for line_num, line in enumerate(body):
[126] Fix | Delete
if ' Mailbox quota exceeded' in line:
[127] Fix | Delete
addr_line = line_num - 1
[128] Fix | Delete
break
[129] Fix | Delete
if addr_line >= 0:
[130] Fix | Delete
return body[addr_line].strip()
[131] Fix | Delete
return None
[132] Fix | Delete
[133] Fix | Delete
[134] Fix | Delete
def clean_full_inbox_bounces():
[135] Fix | Delete
"""Remove "mailbox quota exceeded" bounces from queue"""
[136] Fix | Delete
print('Removing old full inbox bounces...')
[137] Fix | Delete
removed = defaultdict(int)
[138] Fix | Delete
queue = get_queue(bounces_only=True)
[139] Fix | Delete
for msg in queue:
[140] Fix | Delete
user = get_full_quota_user(msg['id'])
[141] Fix | Delete
if user is None:
[142] Fix | Delete
continue
[143] Fix | Delete
remove_msg(msg['id'])
[144] Fix | Delete
removed[user] += 1
[145] Fix | Delete
print_removed(removed)
[146] Fix | Delete
[147] Fix | Delete
[148] Fix | Delete
def clean_autoresponders():
[149] Fix | Delete
"""Remove old auto-responder emails from queue which are
[150] Fix | Delete
likely responses to spam if they have been stuck in queue"""
[151] Fix | Delete
print('Removing old auto-responses...')
[152] Fix | Delete
removed = defaultdict(int)
[153] Fix | Delete
queue = get_queue(exclude_bounces=True)
[154] Fix | Delete
for msg in queue:
[155] Fix | Delete
try:
[156] Fix | Delete
out = run(['exim', '-Mvh', msg['id']])
[157] Fix | Delete
out.check_returncode()
[158] Fix | Delete
headers = out.stdout.strip()
[159] Fix | Delete
except subprocess.CalledProcessError:
[160] Fix | Delete
continue
[161] Fix | Delete
if 'X-Autorespond:' in headers and 'auto_reply' in headers:
[162] Fix | Delete
remove_msg(msg['id'])
[163] Fix | Delete
removed[msg['sender']] += 1
[164] Fix | Delete
print_removed(removed)
[165] Fix | Delete
[166] Fix | Delete
[167] Fix | Delete
def _parse_args():
[168] Fix | Delete
"""Parse commandline arguments"""
[169] Fix | Delete
parser = argparse.ArgumentParser(
[170] Fix | Delete
description="Exim cleanup tool",
[171] Fix | Delete
epilog="One and only one option is allowed",
[172] Fix | Delete
)
[173] Fix | Delete
parser.add_argument(
[174] Fix | Delete
'-a', '--all', action='store_true', help='Do all cleanup procedures'
[175] Fix | Delete
)
[176] Fix | Delete
parser.add_argument(
[177] Fix | Delete
'-b',
[178] Fix | Delete
'--boxtrapper',
[179] Fix | Delete
action='store_true',
[180] Fix | Delete
help='Remove old boxtrapper messages',
[181] Fix | Delete
)
[182] Fix | Delete
parser.add_argument(
[183] Fix | Delete
'-r',
[184] Fix | Delete
'--autorespond',
[185] Fix | Delete
action='store_true',
[186] Fix | Delete
help='Remove old auto-responder messages',
[187] Fix | Delete
)
[188] Fix | Delete
parser.add_argument(
[189] Fix | Delete
'-f',
[190] Fix | Delete
'--full',
[191] Fix | Delete
action='store_true',
[192] Fix | Delete
help='Remove old bounces for full inbox',
[193] Fix | Delete
)
[194] Fix | Delete
args = parser.parse_args()
[195] Fix | Delete
chosen = [key for key, value in vars(args).items() if value is True]
[196] Fix | Delete
if len(chosen) != 1:
[197] Fix | Delete
parser.print_help()
[198] Fix | Delete
sys.exit(1)
[199] Fix | Delete
return args
[200] Fix | Delete
[201] Fix | Delete
[202] Fix | Delete
def main():
[203] Fix | Delete
"""Main logic"""
[204] Fix | Delete
args = _parse_args()
[205] Fix | Delete
if args.boxtrapper or args.all:
[206] Fix | Delete
clean_boxtrapper()
[207] Fix | Delete
if args.full or args.all:
[208] Fix | Delete
clean_full_inbox_bounces()
[209] Fix | Delete
if args.autorespond or args.all:
[210] Fix | Delete
clean_autoresponders()
[211] Fix | Delete
[212] Fix | Delete
[213] Fix | Delete
if __name__ == '__main__':
[214] Fix | Delete
main()
[215] Fix | Delete
[216] Fix | Delete
It is recommended that you Edit text format, this type of Fix handles quite a lot in one request
Function