Repository URL to install this package:
import contextlib
import os
import sys
import tracemalloc
import unittest
from unittest.mock import patch
from test.support.script_helper import (assert_python_ok, assert_python_failure,
interpreter_requires_environment)
from test import support
try:
import _testcapi
except ImportError:
_testcapi = None
EMPTY_STRING_SIZE = sys.getsizeof(b'')
INVALID_NFRAME = (-1, 2**30)
def get_frames(nframe, lineno_delta):
frames = []
frame = sys._getframe(1)
for index in range(nframe):
code = frame.f_code
lineno = frame.f_lineno + lineno_delta
frames.append((code.co_filename, lineno))
lineno_delta = 0
frame = frame.f_back
if frame is None:
break
return tuple(frames)
def allocate_bytes(size):
nframe = tracemalloc.get_traceback_limit()
bytes_len = (size - EMPTY_STRING_SIZE)
frames = get_frames(nframe, 1)
data = b'x' * bytes_len
return data, tracemalloc.Traceback(frames)
def create_snapshots():
traceback_limit = 2
# _tracemalloc._get_traces() returns a list of (domain, size,
# traceback_frames) tuples. traceback_frames is a tuple of (filename,
# line_number) tuples.
raw_traces = [
(0, 10, (('a.py', 2), ('b.py', 4))),
(0, 10, (('a.py', 2), ('b.py', 4))),
(0, 10, (('a.py', 2), ('b.py', 4))),
(1, 2, (('a.py', 5), ('b.py', 4))),
(2, 66, (('b.py', 1),)),
(3, 7, (('<unknown>', 0),)),
]
snapshot = tracemalloc.Snapshot(raw_traces, traceback_limit)
raw_traces2 = [
(0, 10, (('a.py', 2), ('b.py', 4))),
(0, 10, (('a.py', 2), ('b.py', 4))),
(0, 10, (('a.py', 2), ('b.py', 4))),
(2, 2, (('a.py', 5), ('b.py', 4))),
(2, 5000, (('a.py', 5), ('b.py', 4))),
(4, 400, (('c.py', 578),)),
]
snapshot2 = tracemalloc.Snapshot(raw_traces2, traceback_limit)
return (snapshot, snapshot2)
def frame(filename, lineno):
return tracemalloc._Frame((filename, lineno))
def traceback(*frames):
return tracemalloc.Traceback(frames)
def traceback_lineno(filename, lineno):
return traceback((filename, lineno))
def traceback_filename(filename):
return traceback_lineno(filename, 0)
class TestTracemallocEnabled(unittest.TestCase):
def setUp(self):
if tracemalloc.is_tracing():
self.skipTest("tracemalloc must be stopped before the test")
tracemalloc.start(1)
def tearDown(self):
tracemalloc.stop()
def test_get_tracemalloc_memory(self):
data = [allocate_bytes(123) for count in range(1000)]
size = tracemalloc.get_tracemalloc_memory()
self.assertGreaterEqual(size, 0)
tracemalloc.clear_traces()
size2 = tracemalloc.get_tracemalloc_memory()
self.assertGreaterEqual(size2, 0)
self.assertLessEqual(size2, size)
def test_get_object_traceback(self):
tracemalloc.clear_traces()
obj_size = 12345
obj, obj_traceback = allocate_bytes(obj_size)
traceback = tracemalloc.get_object_traceback(obj)
self.assertEqual(traceback, obj_traceback)
def test_new_reference(self):
tracemalloc.clear_traces()
# gc.collect() indirectly calls PyList_ClearFreeList()
support.gc_collect()
# Create a list and "destroy it": put it in the PyListObject free list
obj = []
obj = None
# Create a list which should reuse the previously created empty list
obj = []
nframe = tracemalloc.get_traceback_limit()
frames = get_frames(nframe, -3)
obj_traceback = tracemalloc.Traceback(frames)
traceback = tracemalloc.get_object_traceback(obj)
self.assertIsNotNone(traceback)
self.assertEqual(traceback, obj_traceback)
def test_set_traceback_limit(self):
obj_size = 10
tracemalloc.stop()
self.assertRaises(ValueError, tracemalloc.start, -1)
tracemalloc.stop()
tracemalloc.start(10)
obj2, obj2_traceback = allocate_bytes(obj_size)
traceback = tracemalloc.get_object_traceback(obj2)
self.assertEqual(len(traceback), 10)
self.assertEqual(traceback, obj2_traceback)
tracemalloc.stop()
tracemalloc.start(1)
obj, obj_traceback = allocate_bytes(obj_size)
traceback = tracemalloc.get_object_traceback(obj)
self.assertEqual(len(traceback), 1)
self.assertEqual(traceback, obj_traceback)
def find_trace(self, traces, traceback):
for trace in traces:
if trace[2] == traceback._frames:
return trace
self.fail("trace not found")
def test_get_traces(self):
tracemalloc.clear_traces()
obj_size = 12345
obj, obj_traceback = allocate_bytes(obj_size)
traces = tracemalloc._get_traces()
trace = self.find_trace(traces, obj_traceback)
self.assertIsInstance(trace, tuple)
domain, size, traceback = trace
self.assertEqual(size, obj_size)
self.assertEqual(traceback, obj_traceback._frames)
tracemalloc.stop()
self.assertEqual(tracemalloc._get_traces(), [])
def test_get_traces_intern_traceback(self):
# dummy wrappers to get more useful and identical frames in the traceback
def allocate_bytes2(size):
return allocate_bytes(size)
def allocate_bytes3(size):
return allocate_bytes2(size)
def allocate_bytes4(size):
return allocate_bytes3(size)
# Ensure that two identical tracebacks are not duplicated
tracemalloc.stop()
tracemalloc.start(4)
obj_size = 123
obj1, obj1_traceback = allocate_bytes4(obj_size)
obj2, obj2_traceback = allocate_bytes4(obj_size)
traces = tracemalloc._get_traces()
obj1_traceback._frames = tuple(reversed(obj1_traceback._frames))
obj2_traceback._frames = tuple(reversed(obj2_traceback._frames))
trace1 = self.find_trace(traces, obj1_traceback)
trace2 = self.find_trace(traces, obj2_traceback)
domain1, size1, traceback1 = trace1
domain2, size2, traceback2 = trace2
self.assertIs(traceback2, traceback1)
def test_get_traced_memory(self):
# Python allocates some internals objects, so the test must tolerate
# a small difference between the expected size and the real usage
max_error = 2048
# allocate one object
obj_size = 1024 * 1024
tracemalloc.clear_traces()
obj, obj_traceback = allocate_bytes(obj_size)
size, peak_size = tracemalloc.get_traced_memory()
self.assertGreaterEqual(size, obj_size)
self.assertGreaterEqual(peak_size, size)
self.assertLessEqual(size - obj_size, max_error)
self.assertLessEqual(peak_size - size, max_error)
# destroy the object
obj = None
size2, peak_size2 = tracemalloc.get_traced_memory()
self.assertLess(size2, size)
self.assertGreaterEqual(size - size2, obj_size - max_error)
self.assertGreaterEqual(peak_size2, peak_size)
# clear_traces() must reset traced memory counters
tracemalloc.clear_traces()
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
# allocate another object
obj, obj_traceback = allocate_bytes(obj_size)
size, peak_size = tracemalloc.get_traced_memory()
self.assertGreaterEqual(size, obj_size)
# stop() also resets traced memory counters
tracemalloc.stop()
self.assertEqual(tracemalloc.get_traced_memory(), (0, 0))
def test_clear_traces(self):
obj, obj_traceback = allocate_bytes(123)
traceback = tracemalloc.get_object_traceback(obj)
self.assertIsNotNone(traceback)
tracemalloc.clear_traces()
traceback2 = tracemalloc.get_object_traceback(obj)
self.assertIsNone(traceback2)
def test_is_tracing(self):
tracemalloc.stop()
self.assertFalse(tracemalloc.is_tracing())
tracemalloc.start()
self.assertTrue(tracemalloc.is_tracing())
def test_snapshot(self):
obj, source = allocate_bytes(123)
# take a snapshot
snapshot = tracemalloc.take_snapshot()
# write on disk
snapshot.dump(support.TESTFN)
self.addCleanup(support.unlink, support.TESTFN)
# load from disk
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
self.assertEqual(snapshot2.traces, snapshot.traces)
# tracemalloc must be tracing memory allocations to take a snapshot
tracemalloc.stop()
with self.assertRaises(RuntimeError) as cm:
tracemalloc.take_snapshot()
self.assertEqual(str(cm.exception),
"the tracemalloc module must be tracing memory "
"allocations to take a snapshot")
def test_snapshot_save_attr(self):
# take a snapshot with a new attribute
snapshot = tracemalloc.take_snapshot()
snapshot.test_attr = "new"
snapshot.dump(support.TESTFN)
self.addCleanup(support.unlink, support.TESTFN)
# load() should recreate the attribute
snapshot2 = tracemalloc.Snapshot.load(support.TESTFN)
self.assertEqual(snapshot2.test_attr, "new")
def fork_child(self):
if not tracemalloc.is_tracing():
return 2
obj_size = 12345
obj, obj_traceback = allocate_bytes(obj_size)
traceback = tracemalloc.get_object_traceback(obj)
if traceback is None:
return 3
# everything is fine
return 0
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_fork(self):
# check that tracemalloc is still working after fork
pid = os.fork()
if not pid:
# child
exitcode = 1
try:
exitcode = self.fork_child()
finally:
os._exit(exitcode)
else:
pid2, status = os.waitpid(pid, 0)
self.assertTrue(os.WIFEXITED(status))
exitcode = os.WEXITSTATUS(status)
self.assertEqual(exitcode, 0)
class TestSnapshot(unittest.TestCase):
maxDiff = 4000
def test_create_snapshot(self):
raw_traces = [(0, 5, (('a.py', 2),))]
with contextlib.ExitStack() as stack:
stack.enter_context(patch.object(tracemalloc, 'is_tracing',
return_value=True))
stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
return_value=5))
stack.enter_context(patch.object(tracemalloc, '_get_traces',
return_value=raw_traces))
snapshot = tracemalloc.take_snapshot()
self.assertEqual(snapshot.traceback_limit, 5)
self.assertEqual(len(snapshot.traces), 1)
trace = snapshot.traces[0]
self.assertEqual(trace.size, 5)
self.assertEqual(len(trace.traceback), 1)
self.assertEqual(trace.traceback[0].filename, 'a.py')
self.assertEqual(trace.traceback[0].lineno, 2)
def test_filter_traces(self):
snapshot, snapshot2 = create_snapshots()
filter1 = tracemalloc.Filter(False, "b.py")
Loading ...