#!/opt/imh-python/bin/python3
"""Parses MySQL general query logs"""
from datetime import datetime, timedelta
from typing import IO, Union
from pymysql.optionfile import Parser as PyMySQLParser
parser = argparse.ArgumentParser(description=__doc__)
"-q", "--quiet", action="store_false", dest="verbose",
help="Suppress non-error output",
"-o", "--output", metavar="FILE",
help="Write output to FILE (default: stdout)",
"-r", "--regex", type=re.compile, metavar="REGEX",
help="Tally arbitrary REGEX string (slow)",
display = parser.add_mutually_exclusive_group()
"-u", "--user", metavar="USER",
help="Output USER's queries instead of summary",
'-s', '--sort', default='total',
choices=['select', 'insert', 'update', 'replace', 'regex', 'total'],
help='Sort summary by a type of query',
help='file to read from. optional - defaults to try stdin',
args = parser.parse_args()
"""Holds a user name and tracks numbers of queries"""
def num_total(self) -> int:
def header(qps: bool, reg: bool, file=sys.stdout):
cols = ['Sel', 'Upd', 'Ins', 'Repl']
print('User'.rjust(16), end='', file=file)
print('', f"Num{col}".rjust(8), end='', file=file)
print('', f"{col}/s".rjust(8), end='', file=file)
def show(self, total_secs: float, reg: bool, file=sys.stdout):
cols = ['select', 'update', 'insert', 'replace']
val: int = getattr(self, f'num_{col}')
print('', str(val).rjust(8), end='', file=file)
f"{int(val / total_secs)}qps".rjust(8),
self.first_date: Union[str, None] = None
self.last_date: Union[str, None] = None
self.total_time = timedelta()
def add_to_total(self) -> None:
first = self.first_datetime
last = self.last_datetime
if first and last: # not None
self.total_time += last - first
def first_datetime(self) -> Union[datetime, None]:
return self.stamp_to_datetime(self.first_date)
def last_datetime(self) -> Union[datetime, None]:
return self.stamp_to_datetime(self.last_date)
def stamp_to_datetime(mysql_stamp: str) -> datetime:
"""convert mysql timestamp to datetime object"""
return datetime.strptime(mysql_stamp, '%y%m%d %H:%M:%S')
if first := self.first_datetime:
time_delta = datetime.now() - first
total_seconds = time_delta.total_seconds()
f"First timestamp at {self.first_date}",
f"({int(total_seconds / 3600)} hours,",
f"{int(total_seconds / 60 % 60)} minutes,",
f"{int(total_seconds % 60)} seconds ago)",
print("No timestamps found in log file")
def __init__(self, verbose: bool):
self.username = "NO_SUCH_USER"
self.id_table: dict[str, str] = {}
self.user_table: dict[str, MySQLUser] = {}
self.times = TimeTracker()
def handle_match(self, line: str, match: re.Match) -> None:
if parsed_date := match.group(1): # if it's got a date group
if not self.times.first_date: # and we've never set a date before
self.times.first_date = parsed_date # set our first date
self.times.last_date = parsed_date # set our last date
if match.group(3) == "Connect": # if it's a connection
self.query_id = match.group(2) # get the query id
if self.query_id in self.id_table:
# We have hit a SERIOUS problem. This likely means that
# mysql restarted. We're dumping the time and query_id
if 'Access denied for user' in line or ' as on' in line:
self.times.add_to_total()
# don't have to do the user table because that data in
# theory is still good (qps = total queries / total time)
self.times.last_date = None
self.times.first_date = None
self.username = match.group(4) # set user_name
# create the entry with user name as the value and the id as
self.id_table[self.query_id] = self.username
# if the user name is new (could be, could already exist)
if self.username not in self.user_table:
# create a new counter class for it using the user name
self.user_table[self.username] = MySQLUser()
elif match.group(3) in ("Query", "Execute"):
self.query_id = match.group(2) # get the id
# get the user name from our lookup table
# (the user who started it)
self.username = self.id_table[self.query_id]
self.username = "NO_SUCH_USER"
if self.username not in self.user_table:
self.user_table[self.username] = MySQLUser()
# get the type of query (select, insert, update, etc.)
query_type = match.group(4).lower()
if query_type == "select":
self.user_table[self.username].num_select += 1
elif query_type == "update":
self.user_table[self.username].num_update += 1
elif query_type == "insert":
self.user_table[self.username].num_insert += 1
elif query_type == "replace":
self.user_table[self.username].num_replace += 1
else: # must be init db, prepare, or execute
query_id = match.group(2) # get the id
# get the user name from our lookup table
# (the user who started it)
self.username = self.id_table[query_id]
self.username = "NO_SUCH_USER"
if self.username not in self.user_table:
self.user_table[self.username] = MySQLUser()
def handle_user_match(self, match: re.Match) -> None:
# dirty trick. Try to get the ID, but what if the match
# wasn't a query and didn't match our regex?
self.query_id = match.group(2)
# we can re-use the last query_id, which hasn't been unset
# since the last matching Query! That makes the user_name
# likely to be the same as well, so we reuse it
# get the user name from our lookup table
# (the user who started it)
self.username = self.id_table[self.query_id]
self.username = "NO_SUCH_USER"
if not self.username in self.user_table:
self.user_table[self.username] = MySQLUser()
self.user_table[self.username].num_regex += 1
def gen_log_path() -> Union[str, None]:
"""Reads mysqld.general_log_file from my.cnf"""
parser = PyMySQLParser(strict=False)
if not parser.read('/etc/my.cnf'):
path = Path(parser.get('mysqld', 'general_log_file')).resolve()
if path == Path('/dev/null'):
print("MySQL log points to /dev/null currently", file=sys.stderr)
except configparser.Error:
def open_log(args) -> IO:
"""Finds/Opens query log"""
if not args.filename and sys.stdin.isatty():
args.filename = gen_log_path()
if args.filename is None:
sys.exit("Could not get default log file from /etc/my.cnf")
f"Reading from the default log file, `{args.filename}'",
return open(args.filename, encoding='utf-8', errors='replace')
sys.exit(f"Failed to open log file `{args.filename}': {exc}")
"MySQL general query log parser reading from stdin/pipe...",
user_regex: Union[re.Pattern, None],
# Search entry v2, group(1)=(None|Timestamp), group(2)=(ConnectionID),
# group(3)=(Connect|Query), group(4)=(UserName|QueryType)
r"([0-9]{6}[\s]+[0-9:]+)*[\s]+([0-9]+)\s"
r"(Connect|Query|Init DB|Prepare|Execute)[\s]+([a-zA-Z0-9]+)"
while line := query_log.readline():
match = search_re.match(line)
user_match = user_regex.search(line) if user_regex else None
if not match and not user_match:
state.handle_match(line=line, match=match)
state.handle_user_match(match=match)
# --user was supplied and matches this line
if user and state.username == user:
print(line, end='', file=out_file)
user_regex: Union[re.Pattern, None],
if user: # we were in per-user mode. Skip summary page
if not state.times.first_date: # no timestamps found at all
sys.exit("Not enough data to parse, please try a longer log file.")
total_secs = state.times.total_time.total_seconds()
show_reg = user_regex is not None
print('Not enough timestamps logged to display QPS', file=out_file)
state.user_table.items(),
key=lambda x: getattr(x[1], sort_by),
MySQLUser.header(qps=total_secs != 0, reg=show_reg, file=out_file)
for username, counts in sorted_entries:
print(username.rjust(16), end='', file=out_file)
counts.show(total_secs=total_secs, reg=show_reg, file=out_file)
# determine where to write output
out_file = open(args.output, "w", encoding='utf-8')
with open_log(args) as query_log:
state = StateTracker(args.verbose)
parse_log(query_log, args.regex, args.user, state, out_file)
state.times.add_to_total()
if __name__ == '__main__':