#! /opt/imh-python/bin/python3
''' List sources of email sent by address and directory. '''
from collections import defaultdict
from argparse import ArgumentParser
__email__ = "danielk@inmotionhosting.com"
def email_lines(all_logs=False):
'''Return iterable over email log lines'''
log_list = glob.glob('/var/log/exim_mainlog?*')
for log_file in log_list:
if not os.path.exists(log_file):
print(f"Could not find log file: {log_file}")
with gzip.open(log_file, 'r') as mail_log:
print(f"Error reading file '{log_file}': {error}")
log_file = "/var/log/exim_mainlog"
if not os.path.exists(log_file):
print(f"Could not find log file: {log_file}")
with open(log_file, encoding='utf-8') as mail_log:
print(f"Error reading file {log_file}")
except UnicodeDecodeError as e:
print(f"Received decoding error for {log_file}:")
def get_domains(username=''):
'''Get domain regex for username'''
user_file = f"/var/cpanel/users/{username}"
if not os.path.exists(user_file):
"Could not find domains for {}. "
"Invalid cPanel user? Cannot find {}".format(username, user_file)
dns_rx = re.compile(r"^DNS[0-9]*=(.*)$")
with open(user_file, encoding='utf-8') as mail_log:
dns_match = dns_rx.search(line)
if dns_match is not None:
domain_list.append(dns_match.groups(1)[0])
print(f"Error reading file '{user_file}': {error}")
return '|'.join(domain_list)
def get_sources(all_logs=False, username='', time=''):
'''Returns touple of dicts of email sources'''
email_logins = defaultdict(int)
working_directories = defaultdict(int)
spoofing = defaultdict(int)
domains = get_domains(username)
assert isinstance(time, int), "Time is not date or number"
target = datetime.datetime.now()
datetime_rx = re.compile(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}')
r'(courier|dovecot)_(plain|login):(?P<login>[^@ ]+(@(?P<domain>{}))?) '
r'.*for (?P<for>.*)$'.format(domains)
spoofing_rx = re.compile(
r'<= (?P<sender>[^@]*@[^@ ]+)'
r'.*(courier|dovecot)_(plain|login):'
r'(?P<login>(?!(?P=sender))[^@ ]+(@(?P<sdom>{}))?)'
r'.*for (?P<for>.*)$'.format(domains)
directory_rx = re.compile(fr'cwd=(?P<directory>/home/{username}[^ ]*)')
for line in email_lines(all_logs):
if date != '' and not line.startswith(date):
if not datetime_rx.match(line):
# If duration is set, skip any lines not within that duration
if duration > 0 and not (
- datetime.datetime.strptime(line[:19], "%Y-%m-%d %H:%M:%S")
rx_match = spoofing_rx.search(line.lower())
logged_in = "{} as {}".format(
rx_match.group('login'), rx_match.group('sender')
spoofing[logged_in] = spoofing[logged_in] + len(
rx_match.group('for').split()
rx_match = login_rx.search(line.lower())
address = rx_match.group('login')
email_logins[address] = email_logins[address] + len(
rx_match.group('for').split()
rx_match = directory_rx.search(line)
directory = rx_match.group('directory')
if '/usr/local/cpanel/' in directory:
working_directories[directory] = working_directories[directory] + 1
return (email_logins, working_directories, spoofing)
def print_sorted_dict(dictionary):
'''Print a dictionary sorted by values'''
for value in sorted(dictionary, key=dictionary.get):
print(f"{dictionary[value]:>7}\t{value}")
'''Parse command line aruments'''
parser = ArgumentParser(description=__doc__)
help="Search all email logs, rather than only the recent log.",
help="Search for only email from a specific cPanel account",
time_group = parser.add_mutually_exclusive_group()
"Search for entries from a certain date. "
"Must be in the format of YYYY-MM-DD."
"Search entries which were made within the specified "
"number of seconds. Overrides --all."
"Search recent entries, from the last hour. "
"This is the same as -s 3600. Also overrides --all"
args = parser.parse_args()
if args.username is None:
date_rx = re.compile(r"\d{4}-\d{2}-\d{2}")
if not date_rx.match(args.date):
print(f"Date is not in the correct format: {args.date}")
return all_logs, username, time
'''Main function for script'''
(all_logs, username, time) = parse_args()
(email_logins, working_directories, spoofing) = get_sources(
print_sorted_dict(email_logins)
print("\nSource directories:")
print_sorted_dict(working_directories)
print("\nPossibly spoofed emails:")
if not len(spoofing) == 0:
print_sorted_dict(spoofing)
print("\tNo obvious spoofs found")
if __name__ == "__main__":