from collections.abc import Sequence, Iterable
from functools import total_ordering
# Import types and functions implemented in C
from _tracemalloc import *
from _tracemalloc import _get_object_traceback, _get_traces
def _format_size(size, sign):
for unit in ('B', 'KiB', 'MiB', 'GiB', 'TiB'):
if abs(size) < 100 and unit != 'B':
return "%+.1f %s" % (size, unit)
return "%.1f %s" % (size, unit)
if abs(size) < 10 * 1024 or unit == 'TiB':
# 4 or 5 digits (xxxx UNIT)
return "%+.0f %s" % (size, unit)
return "%.0f %s" % (size, unit)
Statistic difference on memory allocations between two Snapshot instance.
__slots__ = ('traceback', 'size', 'count')
def __init__(self, traceback, size, count):
self.traceback = traceback
return hash((self.traceback, self.size, self.count))
if not isinstance(other, Statistic):
return (self.traceback == other.traceback
and self.size == other.size
and self.count == other.count)
text = ("%s: size=%s, count=%i"
_format_size(self.size, False),
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return ('<Statistic traceback=%r size=%i count=%i>'
% (self.traceback, self.size, self.count))
return (self.size, self.count, self.traceback)
Statistic difference on memory allocations between an old and a new
__slots__ = ('traceback', 'size', 'size_diff', 'count', 'count_diff')
def __init__(self, traceback, size, size_diff, count, count_diff):
self.traceback = traceback
self.size_diff = size_diff
self.count_diff = count_diff
return hash((self.traceback, self.size, self.size_diff,
self.count, self.count_diff))
if not isinstance(other, StatisticDiff):
return (self.traceback == other.traceback
and self.size == other.size
and self.size_diff == other.size_diff
and self.count == other.count
and self.count_diff == other.count_diff)
text = ("%s: size=%s (%s), count=%i (%+i)"
_format_size(self.size, False),
_format_size(self.size_diff, True),
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return ('<StatisticDiff traceback=%r size=%i (%+i) count=%i (%+i)>'
% (self.traceback, self.size, self.size_diff,
self.count, self.count_diff))
return (abs(self.size_diff), self.size,
abs(self.count_diff), self.count,
def _compare_grouped_stats(old_group, new_group):
for traceback, stat in new_group.items():
previous = old_group.pop(traceback, None)
stat = StatisticDiff(traceback,
stat.size, stat.size - previous.size,
stat.count, stat.count - previous.count)
stat = StatisticDiff(traceback,
for traceback, stat in old_group.items():
stat = StatisticDiff(traceback, 0, -stat.size, 0, -stat.count)
def __init__(self, frame):
# frame is a tuple: (filename: str, lineno: int)
if not isinstance(other, Frame):
return (self._frame == other._frame)
if not isinstance(other, Frame):
return (self._frame < other._frame)
return "%s:%s" % (self.filename, self.lineno)
return "<Frame filename=%r lineno=%r>" % (self.filename, self.lineno)
class Traceback(Sequence):
Sequence of Frame instances sorted from the oldest frame
to the most recent frame.
__slots__ = ("_frames", '_total_nframe')
def __init__(self, frames, total_nframe=None):
# frames is a tuple of frame tuples: see Frame constructor for the
# format of a frame tuple; it is reversed, because _tracemalloc
# returns frames sorted from most recent to oldest, but the
# Python API expects oldest to most recent
self._frames = tuple(reversed(frames))
self._total_nframe = total_nframe
return self._total_nframe
def __getitem__(self, index):
if isinstance(index, slice):
return tuple(Frame(trace) for trace in self._frames[index])
return Frame(self._frames[index])
def __contains__(self, frame):
return frame._frame in self._frames
return hash(self._frames)
if not isinstance(other, Traceback):
return (self._frames == other._frames)
if not isinstance(other, Traceback):
return (self._frames < other._frames)
s = f"<Traceback {tuple(self)}"
if self._total_nframe is None:
s += f" total_nframe={self.total_nframe}>"
def format(self, limit=None, most_recent_first=False):
frame_slice = self[-limit:]
frame_slice = self[:limit]
frame_slice = reversed(frame_slice)
for frame in frame_slice:
lines.append(' File "%s", line %s'
% (frame.filename, frame.lineno))
line = linecache.getline(frame.filename, frame.lineno).strip()
lines.append(' %s' % line)
def get_object_traceback(obj):
Get the traceback where the Python object *obj* was allocated.
Return a Traceback instance.
Return None if the tracemalloc module is not tracing memory allocations or
did not trace the allocation of the object.
frames = _get_object_traceback(obj)
def __init__(self, trace):
# trace is a tuple: (domain: int, size: int, traceback: tuple).
# See Traceback constructor for the format of the traceback tuple.
return Traceback(*self._trace[2:])
if not isinstance(other, Trace):
return (self._trace == other._trace)
return "%s: %s" % (self.traceback, _format_size(self.size, False))
return ("<Trace domain=%s size=%s, traceback=%r>"
% (self.domain, _format_size(self.size, False), self.traceback))
def __init__(self, traces):
# traces is a tuple of trace tuples: see Trace constructor
def __getitem__(self, index):
if isinstance(index, slice):
return tuple(Trace(trace) for trace in self._traces[index])
return Trace(self._traces[index])
def __contains__(self, trace):
return trace._trace in self._traces
if not isinstance(other, _Traces):
return (self._traces == other._traces)
return "<Traces len=%s>" % len(self)
def _normalize_filename(filename):
filename = os.path.normcase(filename)
if filename.endswith('.pyc'):
def __init__(self, inclusive):
self.inclusive = inclusive
raise NotImplementedError
class Filter(BaseFilter):
def __init__(self, inclusive, filename_pattern,
lineno=None, all_frames=False, domain=None):
super().__init__(inclusive)
self.inclusive = inclusive
self._filename_pattern = _normalize_filename(filename_pattern)
self.all_frames = all_frames
def filename_pattern(self):
return self._filename_pattern
def _match_frame_impl(self, filename, lineno):
filename = _normalize_filename(filename)
if not fnmatch.fnmatch(filename, self._filename_pattern):
return (lineno == self.lineno)
def _match_frame(self, filename, lineno):
return self._match_frame_impl(filename, lineno) ^ (not self.inclusive)
def _match_traceback(self, traceback):
if any(self._match_frame_impl(filename, lineno)
for filename, lineno in traceback):
return (not self.inclusive)
filename, lineno = traceback[0]
return self._match_frame(filename, lineno)
domain, size, traceback, total_nframe = trace
res = self._match_traceback(traceback)
if self.domain is not None:
return res and (domain == self.domain)
return res or (domain != self.domain)
class DomainFilter(BaseFilter):
def __init__(self, inclusive, domain):
super().__init__(inclusive)
domain, size, traceback, total_nframe = trace
return (domain == self.domain) ^ (not self.inclusive)
Snapshot of traces of memory blocks allocated by Python.
def __init__(self, traces, traceback_limit):
# traces is a tuple of trace tuples: see _Traces constructor for
self.traces = _Traces(traces)
self.traceback_limit = traceback_limit
def dump(self, filename):
Write the snapshot into a file.
with open(filename, "wb") as fp:
pickle.dump(self, fp, pickle.HIGHEST_PROTOCOL)
Load a snapshot from a file.
with open(filename, "rb") as fp:
def _filter_trace(self, include_filters, exclude_filters, trace):
if not any(trace_filter._match(trace)
for trace_filter in include_filters):
if any(not trace_filter._match(trace)
for trace_filter in exclude_filters):
def filter_traces(self, filters):
Create a new Snapshot instance with a filtered traces sequence, filters
is a list of Filter or DomainFilter instances. If filters is an empty
list, return a new Snapshot instance with a copy of the traces.
if not isinstance(filters, Iterable):
raise TypeError("filters must be a list of filters, not %s"
% type(filters).__name__)
for trace_filter in filters:
if trace_filter.inclusive:
include_filters.append(trace_filter)
exclude_filters.append(trace_filter)
new_traces = [trace for trace in self.traces._traces
if self._filter_trace(include_filters,
new_traces = self.traces._traces.copy()
return Snapshot(new_traces, self.traceback_limit)
def _group_by(self, key_type, cumulative):
if key_type not in ('traceback', 'filename', 'lineno'):
raise ValueError("unknown key_type: %r" % (key_type,))
if cumulative and key_type not in ('lineno', 'filename'):
raise ValueError("cumulative mode cannot by used "
"with key type %r" % key_type)
for trace in self.traces._traces:
domain, size, trace_traceback, total_nframe = trace
traceback = tracebacks[trace_traceback]
if key_type == 'traceback':
elif key_type == 'lineno':
frames = trace_traceback[:1]
else: # key_type == 'filename':
frames = ((trace_traceback[0][0], 0),)
traceback = Traceback(frames)
tracebacks[trace_traceback] = traceback