#! /usr/libexec/platform-python
Utility for converting *_clat_hist* files generated by fio into latency statistics.
$ fiologparser_hist.py *_clat_hist*
end-time, samples, min, avg, median, 90%, 95%, 99%, max
1000, 15, 192, 1678.107, 1788.859, 1856.076, 1880.040, 1899.208, 1888.000
2000, 43, 152, 1642.368, 1714.099, 1816.659, 1845.552, 1888.131, 1888.000
4000, 39, 1152, 1546.962, 1545.785, 1627.192, 1640.019, 1691.204, 1744
@author Karl Cronburg <karl.cronburg@gmail.com>
""" Class to read a hist file line by line, buffering
a value array for the latest line, and allowing a preview
of the next timestamp in next line
Note: this does not follow a generator pattern, but must explicitly
def __init__(self, file):
self.fp = open(file, 'r')
self.data = self.nextData()
line = self.fp.readline()
self.data = [int(x) for x in line.replace(' ', '').rstrip().split(',')]
def weighted_percentile(percs, vs, ws):
""" Use linear interpolation to calculate the weighted percentile.
Value and weight arrays are first sorted by value. The cumulative
distribution function (cdf) is then computed, after which np.interp
finds the two values closest to our desired weighted percentile(s)
and linearly interpolates them.
percs :: List of percentiles we want to calculate
vs :: Array of values we are computing the percentile of
ws :: Array of weights for our corresponding values
return :: Array of percentiles
vs, ws = vs[idx], ws[idx] # weights and values sorted by value
cdf = 100 * (ws.cumsum() - ws / 2.0) / ws.sum()
return np.interp(percs, cdf, vs) # linear interpolation
def weights(start_ts, end_ts, start, end):
""" Calculate weights based on fraction of sample falling in the
given interval [start,end]. Weights computed using vector / array
computation instead of for-loops.
Note that samples with zero time length are effectively ignored
(we set their weight to zero).
start_ts :: Array of start times for a set of samples
end_ts :: Array of end times for a set of samples
return :: Array of weights
sbounds = np.maximum(start_ts, start).astype(float)
ebounds = np.minimum(end_ts, end).astype(float)
ws = (ebounds - sbounds) / (end_ts - start_ts)
err("WARNING: zero-length sample(s) detected. Log file corrupt"
" / bad time values? Ignoring these samples.\n")
ws[np.where(np.isnan(ws))] = 0.0;
def weighted_average(vs, ws):
return np.sum(vs * ws) / np.sum(ws)
def gen_output_columns(ctx):
strpercs = re.split('[,:]', ctx.percentiles)
percs = [50.0] # always print 50% in 'median' column
percs.extend(list(map(float,strpercs)))
columns = ["end-time", "dir", "samples", "min", "avg", "median"]
columns = ["end-time", "samples", "min", "avg", "median"]
columns.extend(list(map(lambda x: x+'%', strpercs)))
def fmt_float_list(ctx, num=1):
""" Return a comma separated list of float formatters to the required number
of decimal places. For instance:
fmt_float_list(ctx.decimals=4, num=3) == "%.4f, %.4f, %.4f"
return ', '.join(["%%.%df" % ctx.decimals] * num)
# Default values - see beginning of main() for how we detect number columns in
__TOTAL_COLUMNS = __HIST_COLUMNS + __NON_HIST_COLUMNS
""" Read the next chunk of size sz from the given reader. """
""" StopIteration occurs when the pandas reader is empty, and AttributeError
occurs if rdr is None due to the file being empty. """
new_arr = rdr.read().values
except (StopIteration, AttributeError):
# Let's leave the array as is, and let later code ignore the block size
#""" Extract array of the times, directions wo times, and histograms matrix without times column. """
#times, rws, szs = new_arr[:,0], new_arr[:,1], new_arr[:,2]
#hists = new_arr[:,__NON_HIST_COLUMNS:]
#times = times.reshape((len(times),1))
#dirs = rws.reshape((len(rws),1))
#arr = np.append(times, hists, axis=1)
""" Find the file with the current first row with the smallest start time """
return min([fp for fp in fps if not arrs[fp] is None], key=lambda fp: arrs.get(fp)[0][0])
def histogram_generator(ctx, fps, sz):
# Create a chunked pandas reader for each of the files:
rdrs[fp] = pandas.read_csv(fp, dtype=int, header=None, chunksize=sz)
if e.message == 'No columns to parse from file':
if ctx.warn: sys.stderr.write("WARNING: Empty input file encountered.\n")
# Initial histograms from disk:
arrs = {fp: read_chunk(rdr, sz) for fp,rdr in rdrs.items()}
""" ValueError occurs when nothing more to read """
arri = np.insert(arr[0], 1, fps.index(fp))
if arrs[fp].shape[0] == 0:
arrs[fp] = read_chunk(rdrs[fp], sz)
def _plat_idx_to_val(idx, edge=0.5, FIO_IO_U_PLAT_BITS=6, FIO_IO_U_PLAT_VAL=64):
""" Taken from fio's stat.c for calculating the latency value of a bin
idx : the value of the index into the histogram bins
edge : fractional value in the range [0,1]** indicating how far into
the bin we wish to compute the latency value of.
** edge = 0.0 and 1.0 computes the lower and upper latency bounds
respectively of the given bin index. """
# MSB <= (FIO_IO_U_PLAT_BITS-1), cannot be rounded off. Use
# all bits of the sample as index
if (idx < (FIO_IO_U_PLAT_VAL << 1)):
# Find the group and compute the minimum value of that group
error_bits = (idx >> FIO_IO_U_PLAT_BITS) - 1
base = 1 << (error_bits + FIO_IO_U_PLAT_BITS)
# Find its bucket number of the group
k = idx % FIO_IO_U_PLAT_VAL
# Return the mean (if edge=0.5) of the range of the bucket
return base + ((k + edge) * (1 << error_bits))
def plat_idx_to_val_coarse(idx, coarseness, edge=0.5):
""" Converts the given *coarse* index into a non-coarse index as used by fio
in stat.h:plat_idx_to_val(), subsequently computing the appropriate
latency value for that bin.
# Multiply the index by the power of 2 coarseness to get the bin
# bin index with a max of 1536 bins (FIO_IO_U_PLAT_GROUP_NR = 24 in stat.h)
lower = _plat_idx_to_val(idx, edge=0.0)
upper = _plat_idx_to_val(idx + stride, edge=1.0)
return lower + (upper - lower) * edge
def print_all_stats(ctx, end, mn, ss_cnt, vs, ws, mx, dir=dir):
ps = weighted_percentile(percs, vs, ws)
avg = weighted_average(vs, ws)
values = [mn, avg] + list(ps) + [mx]
row = row + [float(x) / ctx.divisor for x in values]
fmt = fmt + fmt_float_list(ctx, len(percs)+3)
# max and min are decimal values if no divisor
fmt = fmt + "%d, " + fmt_float_list(ctx, len(percs)+1) + ", %d"
def update_extreme(val, fncn, new_val):
""" Calculate min / max in the presence of None values """
if val is None: return new_val
else: return fncn(val, new_val)
# See beginning of main() for how bin_vals are computed
lower_bin_vals = [] # lower edge of each bin
upper_bin_vals = [] # upper edge of each bin
def process_interval(ctx, iHist, iEnd, dir):
""" print estimated percentages for the given merged sample
ss_cnt = 0 # number of samples affecting this interval
mn_bin_val, mx_bin_val = None, None
# Update total number of samples affecting current interval histogram:
# Update min and max bin values
idxs = np.nonzero(iHist != 0)[0]
mn_bin_val = bin_vals[idxs[0]]
mx_bin_val = bin_vals[idxs[-1]]
if ss_cnt > 0: print_all_stats(ctx, iEnd, mn_bin_val, ss_cnt, bin_vals, iHist, mx_bin_val, dir=dir)
dir_map = ['r', 'w', 't'] # map of directional value in log to textual representation
def process_weighted_interval(ctx, samples, iStart, iEnd, printdirs):
""" Construct the weighted histogram for the given interval by scanning
through all the histograms and figuring out which of their bins have
samples with latencies which overlap with the given interval
times, files, dirs, sizes, hists = samples[:,0], samples[:,1], samples[:,2], samples[:,3], samples[:,4:]
iHist={}; ss_cnt = {}; mn_bin_val={}; mx_bin_val={}
iHist[dir] = np.zeros(__HIST_COLUMNS, dtype=float)
ss_cnt[dir] = 0 # number of samples affecting this interval
for end_time,file,dir,hist in zip(times,files,dirs,hists):
# Only look at bins of the current histogram sample which
# started before the end of the current time interval [start,end]
start_times = (end_time - 0.5 * ctx.interval) - bin_vals / ctx.time_divisor
idx = np.where(start_times < iEnd)
s_ts, l_bvs, u_bvs, hs = start_times[idx], lower_bin_vals[idx], upper_bin_vals[idx], hist[idx]
# Increment current interval histogram by weighted values of future histogram
# total number of samples
# and min and max values as necessary
ws = hs * weights(s_ts, end_time, iStart, iEnd)
mmidx = np.where(hs != 0)[0]
ss_cnt['m'] += np.sum(hs)
mn_bin_val['m'] = update_extreme(mn_bin_val['m'], min, l_bvs[max(0, mmidx[0] - 1)])
mx_bin_val['m'] = update_extreme(mx_bin_val['m'], max, u_bvs[min(len(hs) - 1, mmidx[-1] + 1)])
iHist[textdir][idx] += ws
ss_cnt[textdir] += np.sum(hs) # Update total number of samples affecting current interval histogram:
mn_bin_val[textdir] = update_extreme(mn_bin_val[textdir], min, l_bvs[max(0, mmidx[0] - 1)])
mx_bin_val[textdir] = update_extreme(mx_bin_val[textdir], max, u_bvs[min(len(hs) - 1, mmidx[-1] + 1)])
for textdir in sorted(printdirs):
if ss_cnt[textdir] > 0: print_all_stats(ctx, iEnd, mn_bin_val[textdir], ss_cnt[textdir], bin_vals, iHist[textdir], mx_bin_val[textdir], dir=textdir)
def guess_max_from_bins(ctx, hist_cols):
""" Try to guess the GROUP_NR from given # of histogram
columns seen in an input file """
if ctx.group_nr < 19 or ctx.group_nr > 26:
bins = [ctx.group_nr * (1 << 6)]
bins = [1216,1280,1344,1408,1472,1536,1600,1664]
coarses = range(max_coarse + 1)
fncn = lambda z: list(map(lambda x: z/2**x if z % 2**x == 0 else -10, coarses))
arr = np.transpose(list(map(fncn, bins)))
idx = np.where(arr == hist_cols)
table = repr(arr.astype(int)).replace('-10', 'N/A').replace('array',' ')
errmsg = ("Unable to determine bin values from input clat_hist files. Namely \n"
"the first line of file '%s' " % ctx.FILE[0] + "has %d \n" % (__TOTAL_COLUMNS,) +
"columns of which we assume %d " % (hist_cols,) + "correspond to histogram bins. \n"
"This number needs to be equal to one of the following numbers:\n\n"
"Possible reasons and corresponding solutions:\n"
" - Input file(s) does not contain histograms.\n"
" - You recompiled fio with a different GROUP_NR. If so please specify this\n"
" new GROUP_NR on the command line with --group_nr\n")
raise RuntimeError(errmsg)
def output_weighted_interval_data(ctx,printdirs):
fps = [open(f, 'r') for f in ctx.FILE]
gen = histogram_generator(ctx, fps, ctx.buff_size)
print(', '.join(columns))
start, end = 0, ctx.interval
arr = np.empty(shape=(0,__TOTAL_COLUMNS + 1),dtype=int)
while more_data or len(arr) > 0:
# Read up to ctx.max_latency (default 20 seconds) of data from end of current interval.
while len(arr) == 0 or arr[-1][0] < ctx.max_latency * 1000 + end:
nashape = new_arr.reshape((1,__TOTAL_COLUMNS + 1))
arr = np.append(arr, nashape, axis=0)
# Jump immediately to the start of the input, rounding
# down to the nearest multiple of the interval (useful when --log_unix_epoch
# was used to create these histograms):
if start == 0 and arr[0][0] - ctx.max_latency > end:
start = arr[0][0] - ctx.max_latency
start = start - (start % ctx.interval)
end = start + ctx.interval
process_weighted_interval(ctx, arr, start, end, printdirs)
# Update arr to throw away samples we no longer need - samples which
# end before the start of the next interval, i.e. the end of the
idx = np.where(arr[:,0] > end)
end = start + ctx.interval
def output_interval_data(ctx,directions):
fps = [HistFileRdr(f) for f in ctx.FILE]
print(', '.join(columns))
# add bins from all files in target intervals
if ts and ts+10 < end: # shift sample time when very close to an end time
arr[d] = np.zeros(shape=(__HIST_COLUMNS), dtype=int)
arr['m'] = np.add(arr['m'], fp.curBins)
if 'r' in arr and curdirect == 0:
arr['r'] = np.add(arr['r'], fp.curBins)
if 'w' in arr and curdirect == 1:
arr['w'] = np.add(arr['w'], fp.curBins)
if 't' in arr and curdirect == 2:
arr['t'] = np.add(arr['t'], fp.curBins)
# reached end of all files
# or gone through all files without finding sample in interval
if not more_data or not foundSamples:
#print("{} size({}) samples({}) nonzero({}):".format(end, arr.size, numSamples, np.count_nonzero(arr)), str(arr), )
for d in sorted(arr.keys()):
process_interval(ctx, aval, end, d)
end = start + ctx.interval
from configparser import SafeConfigParser, NoOptionError
from ConfigParser import SafeConfigParser, NoOptionError
cp = SafeConfigParser(allow_no_value=True)
with open(ctx.job_file, 'r') as fp:
# Auto detect --interval value
hist_msec = cp.get(s, 'log_hist_msec')
if hist_msec is not None:
ctx.interval = int(hist_msec)
if not hasattr(ctx, 'percentiles'):
ctx.percentiles = "90,95,99"
ctx.directions = ctx.directions.lower()