#! /opt/imh-python/bin/python3
''' Apache Log Parser - Parse Apache domain access logs '''
from argparse import ArgumentParser
from collections import defaultdict
from platform import node as hostname
from dns import resolver, reversename, exception
from rads import setup_logging, color
__maintainer__ = "Daniel K"
__email__ = "danielk@inmotionhosting.com"
# Location of Apache domain logs for users.
# The bit at the end is for str.format() to allow users to be added there
USER_DOMLOG_DIR = envinfo.get_data()['apache_domlogs'] + "/{0!s}/"
# Maximum number of log files on shared servers
LOGGER = logging.getLogger(__name__)
"""Return PTR for IP address"""
myresolver = resolver.Resolver()
myresolver.lifetime = 1.0
question_name = reversename.from_address(ip_addr)
answers = myresolver.query(question_name, "PTR")
except resolver.NXDOMAIN:
except exception.Timeout:
LOGGER.debug("Query Timed out looking for %s", ip_addr)
except resolver.NoNameservers:
LOGGER.debug("No nameservers found for %s", ip_addr)
return "No nameservers found"
except resolver.NoAnswer:
LOGGER.debug("No answer for %s", ip_addr)
def domlog_lines(source):
LOGGER.info("Processing from STDIN.")
LOGGER.info("Process file %s", source)
if os.path.exists(filename):
with open(filename, encoding='utf-8') as file_handle:
LOGGER.error("Error reading file %s", filename)
def trim_dict(dictionary, entries):
'''Trim dictionary to top entries ordered by value'''
for item in sorted(dictionary, key=lambda x: dictionary[x], reverse=True):
trimmed_dict[item] = dictionary[item]
def parse_domlogs(source, numlines=10, add_ptr=False):
'status_codes': defaultdict(int),
'daily_hourly': defaultdict(lambda: defaultdict(int)),
'requests': defaultdict(int),
'user_agents': defaultdict(int),
'top_ips': defaultdict(int),
# Single regex to match all log lines.
# It stores each entry in named groups, even though not all groups
# are used by this script. You can see the names listed below
r'^(?P<ips>(?P<ip>[0-9.]+|[a-fA-F0-9:]+)' # Could handle multiple IPs
r'(,\s*[0-9.]+|[a-fA-F0-9:]+)*)\s+'
r'(?P<logname>\S+)\s+(?P<user>\S+)\s+' # Could find logged in users
r'\[(?P<date>[0-9]+/[a-zA-Z]+/[0-9]+):'
r'(?P<time>(?P<hour>[0-9]+):[0-9]+:[0-9]+ [0-9-+]+)\]\s+'
r'"(?P<request>(?P<type>[A-Z]+)\s+(?P<uri>\S+)) [^"]*"\s+'
r'(?P<status>[0-9]+|-)\s+(?P<size>[0-9]+|-)\s+'
r'"(?P<referrer>[^"]*)"\s+'
for line in domlog_lines(source):
results['linecount'] = results['linecount'] + 1
match_logline = rx_logline.search(line)
if match_logline is not None:
results['status_codes'][match_logline.group('status')] = (
results['status_codes'][match_logline.group('status')] + 1
request = "{: <4} {}".format(
match_logline.group('status'), match_logline.group('request')
results['requests'][request] = results['requests'][request] + 1
results['top_ips'][match_logline.group('ip')] = (
results['top_ips'][match_logline.group('ip')] + 1
results['user_agents'][match_logline.group('useragent')] = (
results['user_agents'][match_logline.group('useragent')] + 1
date = match_logline.group('date')
hour = match_logline.group('hour')
results['daily_hourly'][date][hour] = (
results['daily_hourly'][date][hour] + 1
LOGGER.warning("Missed log line: %s", line)
results['requests'] = trim_dict(results['requests'], numlines)
results['user_agents'] = trim_dict(results['user_agents'], numlines)
results['top_ips'] = trim_dict(results['top_ips'], numlines)
ip_ptr = defaultdict(int)
for ip_addr in results['top_ips']:
ptr_record = ptr_lookup(ip_addr)
ip_with_ptr = f"{ip_addr: <15} {ptr_record}"
ip_ptr[ip_with_ptr] = results['top_ips'][ip_addr]
results['top_ips_with_ptr'] = ip_ptr
def logs_for_user(cpuser):
'''Array of domlogs for cpuser. If cpuser is None, return all domlogs.'''
LOGGER.info("Choosing domlog for all users")
LOGGER.info("Choosing domlog for %s", cpuser)
for filename in os.listdir(USER_DOMLOG_DIR.format(cpuser)):
if ("_log" not in filename) and ("-ssl" not in filename):
if "ftpxferlog" in filename:
logfile = os.path.join(USER_DOMLOG_DIR.format(cpuser), filename)
if os.path.isfile(logfile):
logfile_list.append(logfile)
def choose_logfile(cpuser):
Determine log file to use for a cPanel user.
This is done by first using any unique file, then using any
unique recently updated file, and then preferring size for
If cpuser is None, search for all logs.
logfile_list = logs_for_user(cpuser)
if len(logfile_list) == 0:
LOGGER.warning("Could not find valid log file for %s", cpuser)
if len(logfile_list) == 1:
LOGGER.debug("Only one log file for %s: %s", cpuser, logfile_list[0])
for logfile in logfile_list:
if os.path.getmtime(logfile) > (time() - 86400):
# File is newer than 24 hours
recentlog_list.append(logfile)
if len(recentlog_list) == 1:
"Only one recent log file for %s: %s", cpuser, recentlog_list[0]
if len(recentlog_list) == 0:
# If there are no recent files, choose from all files.
LOGGER.debug("No recent logs for %s", cpuser)
logfile_list = recentlog_list
for logfile in logfile_list:
if os.path.getsize(logfile) > largest:
largest = os.path.getsize(logfile)
def print_title(title, width):
'''Print pretty header'''
header_format = "~~ {0!s} ~~{1}"
# If there is not enough room for the title, truncate it
title = title[: width - base_header_size]
head_length = len(title) + base_header_size
long_bar = "".join("~" for i in range(width - head_length))
def print_tall(title, array, numlines, width):
'''Print pretty data in a tall format, with one entry per line'''
print_title(title, width)
for item in sorted(array, key=lambda x: array[x], reverse=True):
line_count = line_count + 1
print(f"{array[item]: 6} {item}"[:width])
if line_count == numlines:
def print_wide(title, array, numlines, width):
'''Print pretty data in a wide format, with many entries per line'''
print_title(title, width)
next_item = f"{item}: {array[item]} "
if current_width + len(next_item) >= width:
line_count = line_count + 1
if line_count == numlines:
current_width = current_width + len(next_item)
print(next_item, end=' ')
Parse command line arguments
parser = ArgumentParser(description=__doc__)
"Search all users. Do not limit search to single user. "
"Overrides any usernames or paths given."
help="Return results for all log files, rather than just one.",
ptr_group = parser.add_mutually_exclusive_group()
help="Get PTR records for IPs. This is the default.",
help="Do not resolve PTRs for IPs. Overrides -p.",
help="Print version information and exit.",
output_group = parser.add_argument_group("Output options")
output_group.add_argument(
"Number of lines to display in each section. " "The default is 10."
output_group.add_argument(
help="Width of output in characters. The default is 110.",
output_group.add_argument(
"-j", "--json", action='store_true', help="Output data as JSON instead."
logging_parser_group = parser.add_argument_group("Error logging options")
logging_group = logging_parser_group.add_mutually_exclusive_group()
logging_group.add_argument(
help="Use verbose logging.",
logging_group.add_argument(
help='Log only critical errors',
logging_group.add_argument(
choices=['error', 'info', 'debug', 'warning', 'critical'],
"Specify the verbosity of logging output. "
"The default is 'warning'."
logging_parser_group.add_argument(
help="Output logging to the specified file.",
"Either a cPanel user or an Apache domain log file. "
"'-' will be handled as STDIN. "
"If none are given, then the script will attempt to gather "
args = parser.parse_args()
print(f"Apache Log Parser version {__version__}")
print(f"Last modified on {__date__}.")
if args.loglevel is None:
logging_level = logging.WARNING
logging_level = getattr(logging, args.loglevel.upper())
path='/var/log/messages',
setup_logging(path=args.output, loglevel=logging_level, print_out=False)
if len(args.sources) == 0:
LOGGER.info("No sources. Using STDIN.")
def print_results(results, numlines, width):
'''Print out results to terminal'''
for (source, result) in results:
if result['linecount'] < 1:
print(f"{source} is empty.")
print(color.yellow(f"Results for {source}:") + ":")
for day in result['daily_hourly']:
result['daily_hourly'][day],
"HTTP response codes", result['status_codes'], numlines, width
print_tall("Top Requests", result['requests'], numlines, width)
print_tall("Top user agents", result['user_agents'], numlines, width)
if result['top_ips_with_ptr'] is not None:
"Top IPs with PTRs", result['top_ips_with_ptr'], numlines, width
print_tall("Top IPs", result['top_ips'], numlines, width)
'''Main function for script'''
# On shared servers, limit the number of log files searched
if any(shared_type in hostname() for shared_type in ["biz", "hub", "res"]):
log_limit = MAX_LOGS_SHARED