Repository URL to install this package:
|
Version:
0.2.1 ▾
|
import collections
import itertools
import operator
import sys
try:
import tracemalloc
except ImportError:
tracemalloc = None
from ddtrace.vendor import six
from ddtrace.profile import _line2def
from ddtrace.profile import exporter
from ddtrace.vendor import attr
from ddtrace.profile.collector import exceptions
from ddtrace.profile.collector import memory
from ddtrace.profile.collector import stack
from ddtrace.profile.collector import threading
from ddtrace.profile.exporter import pprof_pb2
_ITEMGETTER_ZERO = operator.itemgetter(0)
_ITEMGETTER_ONE = operator.itemgetter(1)
_ATTRGETTER_ID = operator.attrgetter("id")
@attr.s
class _Sequence(object):
start_at = attr.ib(default=1)
next_id = attr.ib(init=False, default=None)
def __attrs_post_init__(self):
self.next_id = self.start_at
def generate(self):
"""Generate a new unique id and return it."""
generated_id = self.next_id
self.next_id += 1
return generated_id
@attr.s
class _StringTable(object):
_strings = attr.ib(init=False, factory=lambda: {"": 0})
_seq_id = attr.ib(init=False, factory=_Sequence)
def to_id(self, string):
try:
return self._strings[string]
except KeyError:
generated_id = self._strings[string] = self._seq_id.generate()
return generated_id
def __iter__(self):
for string, _ in sorted(self._strings.items(), key=_ITEMGETTER_ONE):
yield string
def __len__(self):
return len(self._strings)
@attr.s
class _PprofConverter(object):
"""Convert stacks generated by a Profiler to pprof format."""
# Those attributes will be serialize in a `pprof_pb2.Profile`
_functions = attr.ib(init=False, factory=dict)
_locations = attr.ib(init=False, factory=dict)
_string_table = attr.ib(init=False, factory=_StringTable)
_last_location_id = attr.ib(init=False, factory=_Sequence)
_last_func_id = attr.ib(init=False, factory=_Sequence)
# A dict where key is a (Location, [Labels]) and value is a a dict.
# This dict has sample-type (e.g. "cpu-time") as key and the numeric value.
_location_values = attr.ib(factory=lambda: collections.defaultdict(dict), init=False, repr=False)
def _to_Function(self, filename, funcname):
try:
return self._functions[(filename, funcname)]
except KeyError:
func = pprof_pb2.Function(
id=self._last_func_id.generate(), name=self._str(funcname), filename=self._str(filename),
)
self._functions[(filename, funcname)] = func
return func
def _to_Location(self, filename, lineno, funcname=None):
try:
return self._locations[(filename, lineno, funcname)]
except KeyError:
if funcname is None:
real_funcname = _line2def.filename_and_lineno_to_def(filename, lineno)
else:
real_funcname = funcname
location = pprof_pb2.Location(
id=self._last_location_id.generate(),
line=[pprof_pb2.Line(function_id=self._to_Function(filename, real_funcname).id, line=lineno,),],
)
self._locations[(filename, lineno, funcname)] = location
return location
def _str(self, string):
"""Convert a string to an id from the string table."""
return self._string_table.to_id(str(string))
def _to_locations(self, frames, nframes):
locations = [self._to_Location(filename, lineno, funcname).id for filename, lineno, funcname in frames]
omitted = nframes - len(frames)
if omitted:
locations.append(
self._to_Location("", 0, "<%d frame%s omitted>" % (omitted, ("s" if omitted > 1 else ""))).id
)
return tuple(locations)
def convert_uncaught_exception_event(self, thread_id, thread_name, frames, nframes, exc_type_name, events):
location_key = (
self._to_locations(frames, nframes),
(("thread id", str(thread_id)), ("thread name", thread_name), ("exception type", exc_type_name)),
)
self._location_values[location_key]["uncaught-exceptions"] = len(events)
def convert_stack_event(self, thread_id, thread_name, frames, nframes, samples):
location_key = (
self._to_locations(frames, nframes),
(("thread id", str(thread_id)), ("thread name", thread_name),),
)
self._location_values[location_key]["cpu-samples"] = len(samples)
self._location_values[location_key]["cpu-time"] = sum(s.cpu_time_ns for s in samples)
self._location_values[location_key]["wall-time"] = sum(s.wall_time_ns for s in samples)
def convert_lock_acquire_event(self, lock_name, thread_id, thread_name, frames, nframes, events, sampling_ratio):
location_key = (
self._to_locations(frames, nframes),
(("thread id", str(thread_id)), ("thread name", thread_name), ("lock name", lock_name)),
)
self._location_values[location_key]["lock-acquire"] = len(events)
self._location_values[location_key]["lock-acquire-wait"] = int(
sum(e.wait_time_ns for e in events) / sampling_ratio
)
def convert_lock_release_event(self, lock_name, thread_id, thread_name, frames, nframes, events, sampling_ratio):
location_key = (
self._to_locations(frames, nframes),
(("thread id", str(thread_id)), ("thread name", thread_name), ("lock name", lock_name)),
)
self._location_values[location_key]["lock-release"] = len(events)
self._location_values[location_key]["lock-release-hold"] = int(
sum(e.locked_for_ns for e in events) / sampling_ratio
)
def convert_stack_exception_event(self, thread_id, thread_name, frames, nframes, exc_type_name, events):
location_key = (
self._to_locations(frames, nframes),
(("thread id", str(thread_id)), ("thread name", thread_name), ("exception type", exc_type_name),),
)
self._location_values[location_key]["exception-samples"] = len(events)
def convert_memory_event(self, stats, sampling_ratio):
location = tuple(self._to_Location(frame.filename, frame.lineno).id for frame in reversed(stats.traceback))
location_key = (location, tuple())
self._location_values[location_key]["alloc-samples"] = int(stats.count / sampling_ratio)
self._location_values[location_key]["alloc-space"] = int(stats.size / sampling_ratio)
def _build_profile(self, start_time_ns, duration_ns, period, sample_types, program_name):
pprof_sample_type = [
pprof_pb2.ValueType(type=self._str(type_), unit=self._str(unit)) for type_, unit in sample_types
]
sample = [
pprof_pb2.Sample(
location_id=locations,
value=[values.get(sample_type_name, 0) for sample_type_name, unit in sample_types],
label=[pprof_pb2.Label(key=self._str(key), str=self._str(s)) for key, s in labels],
)
for (locations, labels), values in sorted(six.iteritems(self._location_values), key=_ITEMGETTER_ZERO)
]
period_type = pprof_pb2.ValueType(type=self._str("time"), unit=self._str("nanoseconds"))
# WARNING: no code should use _str() here as once the _string_table is serialized below,
# it won't be updated if you call _str later in the code here
return pprof_pb2.Profile(
sample_type=pprof_sample_type,
sample=sample,
mapping=[pprof_pb2.Mapping(id=1, filename=self._str(program_name),),],
# Sort location and function by id so the output is reproducible
location=sorted(self._locations.values(), key=_ATTRGETTER_ID),
function=sorted(self._functions.values(), key=_ATTRGETTER_ID),
string_table=list(self._string_table),
time_nanos=start_time_ns,
duration_nanos=duration_ns,
period=period,
period_type=period_type,
)
class PprofExporter(exporter.Exporter):
"""Export recorder events to pprof format."""
@staticmethod
def _get_program_name(default="-"):
try:
import __main__
program_name = __main__.__file__
except (ImportError, AttributeError):
try:
program_name = sys.argv[0]
except IndexError:
program_name = None
if program_name is None:
return default
return program_name
@staticmethod
def _stack_event_group_key(event):
return (event.thread_id, str(event.thread_name), tuple(event.frames), event.nframes)
def _group_stack_events(self, events):
return itertools.groupby(sorted(events, key=self._stack_event_group_key), key=self._stack_event_group_key,)
@staticmethod
def _lock_event_group_key(event):
return (event.lock_name, event.thread_id, str(event.thread_name), tuple(event.frames), event.nframes)
def _group_lock_events(self, events):
return itertools.groupby(sorted(events, key=self._lock_event_group_key), key=self._lock_event_group_key,)
@staticmethod
def _exception_group_key(event):
exc_type = event.exc_type
exc_type_name = exc_type.__module__ + "." + exc_type.__name__
return (event.thread_id, str(event.thread_name), tuple(event.frames), event.nframes, exc_type_name)
def _group_exception_events(self, events):
return itertools.groupby(sorted(events, key=self._exception_group_key), key=self._exception_group_key,)
@staticmethod
def min_none(a, b):
"""A min() version that discards None values."""
if a is None:
return b
if b is None:
return a
return min(a, b)
@staticmethod
def max_none(a, b):
"""A max() version that discards None values."""
if a is None:
return b
if b is None:
return a
return max(a, b)
def export(self, events):
"""Convert events to pprof format.
:param events: The event dictionary from a `ddtrace.profile.recorder.Recorder`.
:return: A dict where key is the type of profiling and value is the profile objects in protobuf format.
"""
program_name = self._get_program_name()
start_time_ns = None
stop_time_ns = None
sum_period = 0
nb_event = 0
converter = _PprofConverter()
# Handle StackSampleEvent
stack_events = []
for event in events.get(stack.StackSampleEvent, []):
stack_events.append(event)
start_time_ns = self.min_none(event.timestamp, start_time_ns)
stop_time_ns = self.max_none(event.timestamp, stop_time_ns)
sum_period += event.sampling_period
nb_event += 1
for (thread_id, thread_name, frames, nframes), stack_events in self._group_stack_events(stack_events):
converter.convert_stack_event(thread_id, thread_name, frames, nframes, list(stack_events))
# Handle Lock events
for event_class, convert_fn in (
(threading.LockAcquireEvent, converter.convert_lock_acquire_event),
(threading.LockReleaseEvent, converter.convert_lock_release_event),
):
lock_events = events.get(event_class, [])
sampling_sum_pct = sum(event.sampling_pct for event in lock_events)
if lock_events:
sampling_ratio_avg = sampling_sum_pct / (len(lock_events) * 100.0)
for (lock_name, thread_id, thread_name, frames, nframes), l_events in self._group_lock_events(
lock_events
):
convert_fn(lock_name, thread_id, thread_name, frames, nframes, list(l_events), sampling_ratio_avg)
# Handle UncaughtExceptionEvent
for ((thread_id, thread_name, frames, nframes, exc_type_name), ue_events,) in self._group_exception_events(
events.get(exceptions.UncaughtExceptionEvent, [])
):
converter.convert_uncaught_exception_event(
thread_id, thread_name, frames, nframes, exc_type_name, list(ue_events)
)
sample_types = (
("cpu-samples", "count"),
("cpu-time", "nanoseconds"),
("wall-time", "nanoseconds"),
("uncaught-exceptions", "count"),
("lock-acquire", "count"),
("lock-acquire-wait", "nanoseconds"),
("lock-release", "count"),
("lock-release-hold", "nanoseconds"),
)
# Handle StackExceptionSampleEvent
if stack.FEATURES["stack-exceptions"]:
sample_types += (("exception-samples", "count"),)
for (thread_id, thread_name, frames, nframes, exc_type_name), se_events in self._group_exception_events(
events.get(stack.StackExceptionSampleEvent, [])
):
converter.convert_stack_exception_event(
thread_id, thread_name, frames, nframes, exc_type_name, list(se_events)
)
if tracemalloc:
sample_types += (
("alloc-samples", "count"),
("alloc-space", "bytes"),
)
# Handle MemorySampleEvent
# Merge all the memory snapshots
traces = []
traceback_limit = None
sampling_pct_sum = 0
nb_events = 0
for event in events.get(memory.MemorySampleEvent, []):
sampling_pct_sum += event.sampling_pct
nb_events += 1
traces.extend(event.snapshot.traces._traces)
# Assume they are all the same
traceback_limit = event.snapshot.traceback_limit
start_time_ns = self.min_none(event.timestamp, start_time_ns)
stop_time_ns = self.max_none(event.timestamp, stop_time_ns)
# Ignore period for memory events are it's not a time-based sampling
if nb_events:
sampling_ratio_avg = sampling_pct_sum / (nb_events * 100.0) # convert percentage to ratio
for stats in tracemalloc.Snapshot(traces, traceback_limit).statistics("traceback"):
converter.convert_memory_event(stats, sampling_ratio_avg)
# Compute some metadata
if nb_event:
period = int(sum_period / nb_event)
else:
period = None
if stop_time_ns is not None and start_time_ns is not None:
duration_ns = stop_time_ns - start_time_ns
else:
duration_ns = None
return converter._build_profile(
start_time_ns=start_time_ns,
duration_ns=duration_ns,
period=period,
sample_types=sample_types,
program_name=program_name,
)