#!/opt/imh-python/bin/python3
"""Python exim cleanup script to reduce the urge to run hacky oneliners"""
from collections import defaultdict
# compile regex once and recycle them throughout the script for efficiency
r'\s*(?P<age>[0-9]+[mhd])\s+[0-9\.]+[M|K]? ' # 5m 15K
r'(?P<id>[a-zA-Z0-9\-]+)\s+' # 1XEhho-0006AU-Tx
r'\<(?P<sender>.*)\>' # <emailuser@domain>
command: Union[list[str], str], shell=False
) -> subprocess.CompletedProcess:
Run a process with arguments.
Optionally, as a shell command
returns CompletedProcess.
errors="surrogateescape",
def get_queue(exclude_bounces=False, bounces_only=False):
"""Get current exim queue as a list of dicts, each dict
containing a single message, with keys 'age', 'id', and 'sender'"""
out = run(["exim", "-bp"])
queue_lines = out.stdout.splitlines()
match = EXIM_BP_RE.match(line)
groupdict = match.groupdict()
if exclude_bounces and groupdict['sender'] == '':
if bounces_only and groupdict['sender'] != '':
groupdict['age'] = exim_age_to_secs(groupdict['age'])
if groupdict['age'] > 3600:
def exim_age_to_secs(age):
"""convert exim age to seconds"""
conversions = {'m': 60, 'h': 3600, 'd': 86400}
# find the multiplier based on above conversions
multiplier = conversions[age[-1]]
# typecast to int and use multiplier above
return int(age.rstrip('mhd')) * multiplier
def is_boxtrapper(msg_id):
"""Determine if a message ID is a boxtrapper msg in queue"""
out = run(['exim', '-Mvh', msg_id])
head = out.stdout.strip()
except subprocess.CalledProcessError:
return False # likely no longer in queue
return 'Subject: Your email requires verification verify#' in head
"""Here, msg_id may be a list or just one string"""
if isinstance(msg_id, list):
msg_id = ' '.join(msg_id)
run(['exim', '-Mrm', msg_id]).check_returncode()
except subprocess.CalledProcessError:
pass # may have already been removed from queue or sent
def print_removed(removed):
"""Given a dict of user:count mappings, print removed"""
if len(list(removed.keys())) == 0:
for user, count in removed.items():
print(' %s: %d' % (user, count))
"""Remove old boxtrapper messages from queue. The theory is that if
still stuck in queue after min_age_secs, they were likely sent from
an illigitimate email address and will never clear from queue normally"""
print('Removing old boxtrapper messages...')
removed = defaultdict(int)
queue = get_queue(exclude_bounces=True)
queue = [x for x in queue if is_boxtrapper(x['id'])]
removed[msg['sender']] += 1
def get_full_quota_user(msg_id):
"""Get the user for a message which is at full quota, or None"""
out = run(['exim', '-Mvb', msg_id])
body = out.stdout.strip()
except subprocess.CalledProcessError:
if not ' Mailbox quota exceeded' in body:
for line_num, line in enumerate(body):
if ' Mailbox quota exceeded' in line:
return body[addr_line].strip()
def clean_full_inbox_bounces():
"""Remove "mailbox quota exceeded" bounces from queue"""
print('Removing old full inbox bounces...')
removed = defaultdict(int)
queue = get_queue(bounces_only=True)
user = get_full_quota_user(msg['id'])
def clean_autoresponders():
"""Remove old auto-responder emails from queue which are
likely responses to spam if they have been stuck in queue"""
print('Removing old auto-responses...')
removed = defaultdict(int)
queue = get_queue(exclude_bounces=True)
out = run(['exim', '-Mvh', msg['id']])
headers = out.stdout.strip()
except subprocess.CalledProcessError:
if 'X-Autorespond:' in headers and 'auto_reply' in headers:
removed[msg['sender']] += 1
"""Parse commandline arguments"""
parser = argparse.ArgumentParser(
description="Exim cleanup tool",
epilog="One and only one option is allowed",
'-a', '--all', action='store_true', help='Do all cleanup procedures'
help='Remove old boxtrapper messages',
help='Remove old auto-responder messages',
help='Remove old bounces for full inbox',
args = parser.parse_args()
chosen = [key for key, value in vars(args).items() if value is True]
if args.boxtrapper or args.all:
if args.full or args.all:
clean_full_inbox_bounces()
if args.autorespond or args.all:
if __name__ == '__main__':