# psn -- Linux Process Snapper by Tanel Poder [https://0x.tools]
# Copyright 2019-2021 Tanel Poder
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
# SPDX-License-Identifier: GPL-2.0-or-later
from itertools import groupby
from datetime import datetime
return [item for sublist in li for item in sublist]
### ASCII table output ###
def output_table_report(report, dataset):
header_fmts, field_fmts = [], []
total_field_width_without_kstack = 0
for source, cols, expr, token in report.full_projection():
if token in ('pid', 'task', 'samples'):
elif token == 'event_time':
elif token == 'avg_threads':
col = [c for c in source.available_columns if c[0] == cols[0]][0]
if col_type in (str, int, int):
max_field_length = max([len(str(row[col_idx])) for row in dataset])
max_field_length = max([len(str(int(row[col_idx]))) for row in dataset]) + 3 # arbitrary!
field_width = min(max_field_width, max(len(token), max_field_length))
# left-align strings both in header and data
header_fmts.append('%%-%s.%ss' % (field_width, field_width))
header_fmts.append('%%%s.%ss' % (field_width, field_width))
field_fmts.append('%%-%s.%ss' % (field_width, field_width))
elif col_type in (int, int):
field_fmts.append('%%%sd' % field_width)
field_fmts.append('%%%s.%sf' % (field_width, 2)) # arbitrary
total_field_width += field_width
total_field_width_without_kstack += field_width if token != 'kstack' else 0
report_width = total_field_width + (3 * (len(header_fmts) -1)) + 2
title_pad = report_width - len(report.name) - 2
#title = '=== ' + report.name + ' ' + '=' * (title_pad - 29) + ' [' + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + '] ==='
title = '=== ' + report.name + ' ' + '=' * (title_pad - 3)
header_fmt = ' ' + ' | '.join(header_fmts) + ' '
field_fmt = ' ' + ' | '.join(field_fmts) + ' '
print(header_fmt % tuple([c[3] for c in report.full_projection()]))
print('query returned no rows')
def __init__(self, name, projection, dimensions=[], where=[], order=[], output_fn=output_table_report):
def reify_column_token(col_token):
if col_token == 'samples':
return (None, [], 'COUNT(1)', col_token)
elif col_token == 'avg_threads':
return (None, [], 'CAST(COUNT(1) AS REAL) / %(num_sample_events)s', col_token)
elif col_token in ('pid', 'task', 'event_time'):
return ('first_source', [col_token], col_token, col_token)
for t in proc.all_sources:
for c in t.schema_columns:
if col_token.lower() == c[0].lower():
return (t, [c[0]], c[0], c[0])
raise Exception('projection/dimension column %s not found.\nUse psn --list to see all available columns' % col_token)
def process_filter_sql(filter_sql):
idle_filter = "stat.state_id IN ('S', 'Z', 'I')"
if filter_sql == 'active':
return (proc.stat, ['state_id'], 'not(%s)' % idle_filter, filter_sql)
elif filter_sql == 'idle':
return (proc.stat, ['state_id'], idle_filter, filter_sql)
raise Exception('arbitrary filtering not implemented')
self.projection = [reify_column_token(t) for t in projection if t]
self.dimensions = [reify_column_token(t) for t in dimensions if t]
self.order = [reify_column_token(t) for t in order if t]
self.where = [process_filter_sql(t) for t in where if t]
self.output_fn = output_fn
# columns without a specific source are assigned the first source
first_source = [c[0] for c in (self.projection + self.dimensions + self.order + self.where) if c[0] and c[0] != 'first_source'][0]
self.projection = [(first_source if c[0] == 'first_source' else c[0], c[1], c[2], c[3]) for c in self.projection]
self.dimensions = [(first_source if c[0] == 'first_source' else c[0], c[1], c[2], c[3]) for c in self.dimensions]
self.order = [(first_source if c[0] == 'first_source' else c[0], c[1], c[2], c[3]) for c in self.order]
self.where = [(first_source if c[0] == 'first_source' else c[0], c[1], c[2], c[3]) for c in self.where]
self.sources = {} # source -> [cols]
for d in [self.projection, self.dimensions, self.order, self.where]:
for source, column_names, expr, token in d:
source_columns = self.sources.get(source, ['pid', 'task', 'event_time'])
source_columns.extend(column_names)
self.sources[source] = source_columns
def full_projection(self):
return self.projection + [c for c in self.dimensions if c not in self.projection]
return '%s.%s' % (c[0].name, c[2]) if c[0] else c[2]
first_source_name = list(self.sources.keys())[0].name
join_where = flatten([['%s.%s = %s.%s' % (s.name, c, first_source_name, c) for c in ['pid', 'task', 'event_time']] for s in list(self.sources.keys())[1:]])
'projection': '\t' + ',\n\t'.join([render_col(c) for c in self.full_projection()]),
'tables': '\t' + ',\n\t'.join([s.name for s in self.sources]),
'where': '\t' + ' AND\n\t'.join([c[2] for c in self.where] + join_where),
'dimensions': '\t' + ',\n\t'.join([render_col(c) for c in self.dimensions]),
'order': '\t' + ',\n\t'.join([render_col(c) + ' DESC' for c in self.order]),
'num_sample_events': '(SELECT COUNT(DISTINCT(event_time)) FROM %s)' % first_source_name
logging.debug('attr where=%s#end' % attr['where'])
sql = 'SELECT\n%(projection)s\nFROM\n%(tables)s' % attr
# tanel changed from self.where to attr['where']
# TODO think through the logic of using self.where vs attr.where (in the context of allowing pid/tid to be not part of group by)
if attr['where'].strip():
sql += '\nWHERE\n%(where)s' % attr
sql += '\nGROUP BY\n%(dimensions)s' % attr
sql += '\nORDER BY\n%(order)s' % attr
# final substitution allows things like avg_threads to work
logging.debug(self.query())
r = conn.execute(self.query()).fetchall()
def output_report(self, conn):
self.output_fn(self, self.dataset(conn))