#!/opt/imh-python/bin/python3
from pymysql.optionfile import Parser as PyMySQLParser
USER_RE = re.compile(r'^# User@Host:\s+([a-z0-9]+)')
r'^# Query_time:\s+([0-9\.]+)\s+Lock_time:\s+([0-9\.]+)\s+'
r'Rows_sent:\s+(\d+)\s+Rows_examined:\s+(\d+)'
parser = argparse.ArgumentParser()
"-q", "--quiet", dest="quiet", action='store_true',
help="Suppress stderr output",
'-H', '--no-header', dest='no_header', action='store_true',
help='Suppress column headers',
"-o", "--output", metavar="FILE",
help="Write output to FILE (default: stdout)",
"-u", "--user", metavar="USER", default=None,
help="Output USER's queries instead of tallys",
"-a", "--average", action="store_true",
help="Print averages per query instead of totals",
parser.add_argument('logpath', nargs='?', help='Path to slow query log')
return parser.parse_args()
"""Holds a user name and tracks numbers of queries"""
def __init__(self, username: str):
def add_query(self, stats_match: re.Match):
query_time, lock_time, rows_sent, rows_examined = stats_match.groups()
self.query_time += float(query_time)
self.lock_time += float(lock_time)
self.rows_sent += int(rows_sent)
self.rows_examined += int(rows_examined)
def row_header(cls, file=sys.stdout):
['QUERIES', 'TIME', 'LOCKTIME', 'ROWSSENT', 'ROWSRECVD'], file=file
def avg_header(cls, file=sys.stdout):
['QUERIES', 'TIME/Q', 'LOCKTIME/Q', 'ROWSSENT/Q', 'ROWSRECV/Q'],
def header(cols: list[str], file=sys.stdout):
print('USER'.rjust(16), end=' ', file=file)
print(*map(lambda x: x.rjust(10), cols), file=file)
def row_print(self, file=sys.stdout):
str(self.num_queries).rjust(10),
str(int(self.query_time)).rjust(10),
str(int(self.lock_time)).rjust(10),
str(self.rows_sent).rjust(10),
str(self.rows_examined).rjust(10),
def avg_print(self, file=sys.stdout):
str(self.num_queries).rjust(10),
str(int(self.query_time / self.num_queries)).rjust(10),
str(int(self.lock_time / self.num_queries)).rjust(10),
str(int(self.rows_sent / self.num_queries)).rjust(10),
str(int(self.rows_examined / self.num_queries)).rjust(10),
parser = PyMySQLParser(strict=False)
if not parser.read('/etc/my.cnf'):
path = Path(parser.get('mysqld', 'slow_query_log_file')).resolve()
if path == Path('/dev/null'):
print("MySQL log points to /dev/null currently", file=sys.stderr)
except configparser.Error:
# if nothing piped to stdin and no path supplied
if not args.logpath and sys.stdin.isatty():
query_log = default_log_path()
"Failed to get slow query log path from /etc/my.cnf",
f"Reading from the default log file, `{query_log}'",
return open(query_log, encoding='utf-8', errors='replace')
# if something piped to stdin with no path supplied, or explicitly sent -
if not args.logpath or args.logpath == '-':
"MySQL slow query log parser reading from stdin/pipe...",
return open(args.logpath, encoding='utf-8', errors='replace')
with open_log(args) as query_log:
while line := query_log.readline():
if args.output: # if we've specified an output file
out_file = open(args.output, "w", encoding='utf-8')
# init user and id dictionaries
user_table: dict[str, MySQLUser] = {}
this_user = "NO_SUCH_USER"
for line in iter_log(args):
if user_match := USER_RE.match(line):
this_user = user_match.group(1)
if args.user and this_user == args.user:
print(line, end=' ', file=out_file)
elif stats_match := STATS_RE.match(line):
if this_user not in user_table:
user_table[this_user] = MySQLUser(this_user)
user_table[this_user].add_query(stats_match)
if args.user and this_user == args.user:
print(line, end=' ', file=out_file)
elif args.user and this_user == args.user:
print(line, end='', file=out_file)
MySQLUser.avg_header(out_file)
MySQLUser.row_header(out_file)
for data in sorted(user_table.values(), key=lambda x: x.num_queries):
if __name__ == '__main__':