Repository URL to install this package:
|
Version:
0.2.1 ▾
|
from __future__ import absolute_import
import os.path
import sys
import threading
from ddtrace.vendor.six.moves import _thread
from ddtrace.vendor import wrapt
from ddtrace import compat
from ddtrace.profile import _attr
from ddtrace.profile import collector
from ddtrace.profile import event
from ddtrace.vendor import attr
from ddtrace.profile.collector import _traceback
@event.event_class
class LockEventBase(event.Event):
"""Base Lock event."""
lock_name = attr.ib(default=None)
frames = attr.ib(default=None)
nframes = attr.ib(default=None)
thread_id = attr.ib(default=None)
thread_name = attr.ib(default=None)
sampling_pct = attr.ib(default=None)
@event.event_class
class LockAcquireEvent(LockEventBase):
"""A lock has been acquired."""
wait_time_ns = attr.ib(default=None)
@event.event_class
class LockReleaseEvent(LockEventBase):
"""A lock has been released."""
locked_for_ns = attr.ib(default=None)
def _current_thread():
# This is a custom version of `threading.current_thread`
# that does not try # to create a `DummyThread` on `KeyError`.
ident = _thread.get_ident()
try:
thread = threading._active[ident]
except KeyError:
name = None
else:
name = thread.name
return ident, name
# We need to know if wrapt is compiled in C or not. If it's not using the C module, then the wrappers function will
# appear in the stack trace and we need to hide it.
if os.environ.get("WRAPT_DISABLE_EXTENSIONS"):
WRAPT_C_EXT = False
else:
try:
import ddtrace.vendor.wrapt._wrappers as _w # noqa: F401
except ImportError:
WRAPT_C_EXT = False
else:
WRAPT_C_EXT = True
del _w
class _ProfiledLock(wrapt.ObjectProxy):
def __init__(self, wrapped, recorder, max_nframes, capture_sampler):
wrapt.ObjectProxy.__init__(self, wrapped)
self._self_recorder = recorder
self._self_max_nframes = max_nframes
self._self_capture_sampler = capture_sampler
frame = sys._getframe(2 if WRAPT_C_EXT else 3)
code = frame.f_code
self._self_name = "%s:%d" % (os.path.basename(code.co_filename), frame.f_lineno)
def acquire(self, *args, **kwargs):
if not self._self_capture_sampler.capture():
return self.__wrapped__.acquire(*args, **kwargs)
start = compat.monotonic_ns()
try:
return self.__wrapped__.acquire(*args, **kwargs)
finally:
try:
end = self._self_acquired_at = compat.monotonic_ns()
thread_id, thread_name = _current_thread()
frames, nframes = _traceback.pyframe_to_frames(sys._getframe(1), self._self_max_nframes)
self._self_recorder.push_event(
LockAcquireEvent(
lock_name=self._self_name,
frames=frames,
nframes=nframes,
thread_id=thread_id,
thread_name=thread_name,
wait_time_ns=end - start,
sampling_pct=self._self_capture_sampler.capture_pct,
)
)
except Exception:
pass
def release(self, *args, **kwargs):
try:
return self.__wrapped__.release(*args, **kwargs)
finally:
try:
if hasattr(self, "_self_acquired_at"):
try:
end = compat.monotonic_ns()
frames, nframes = _traceback.pyframe_to_frames(sys._getframe(1), self._self_max_nframes)
thread_id, thread_name = _current_thread()
self._self_recorder.push_event(
LockReleaseEvent(
lock_name=self._self_name,
frames=frames,
nframes=nframes,
thread_id=thread_id,
thread_name=thread_name,
locked_for_ns=end - self._self_acquired_at,
sampling_pct=self._self_capture_sampler.capture_pct,
)
)
finally:
del self._self_acquired_at
except Exception:
pass
acquire_lock = acquire
@attr.s
class LockCollector(collector.CaptureSamplerCollector):
"""Record lock usage."""
nframes = attr.ib(factory=_attr.from_env("DD_PROFILING_MAX_FRAMES", 64, int))
def start(self):
"""Start collecting `threading.Lock` usage."""
super(LockCollector, self).start()
self.patch()
def stop(self):
"""Stop collecting `threading.Lock` usage."""
self.unpatch()
super(LockCollector, self).stop()
def patch(self):
"""Patch the threading module for tracking lock allocation."""
# We only patch the lock from the `threading` module.
# Nobody should use locks from `_thread`; if they do so, then it's deliberate and we don't profile.
self.original = threading.Lock
@wrapt.function_wrapper
def _allocate_lock(wrapped, instance, args, kwargs):
lock = wrapped(*args, **kwargs)
return _ProfiledLock(lock, self.recorder, self.nframes, self._capture_sampler)
threading.Lock = _allocate_lock(self.original)
def unpatch(self):
"""Unpatch the threading module for tracking lock allocation."""
threading.Lock = self.original