Repository URL to install this package:
|
Version:
2.68.0.50 ▾
|
"""The stats engine is what collects the accumulated transactions metrics,
details of errors and slow transactions. There is one instance of the stats
engine per application. This will be cleared upon each successful harvest of
data whereby it is sent to the core application.
"""
import base64
import copy
import logging
import operator
import random
import zlib
import traceback
import time
import sys
import newrelic.packages.six as six
from newrelic.core.attribute_filter import DST_ERROR_COLLECTOR
from newrelic.core.attribute import create_user_attributes
from .attribute import process_user_attribute
from .database_utils import explain_plan
from .error_collector import TracedError
from .internal_metrics import (internal_trace, InternalTrace, internal_metric)
from .metric import TimeMetric
from .stack_trace import exception_stack
from ..api.settings import STRIP_EXCEPTION_MESSAGE
from ..common.encoding_utils import json_encode
_logger = logging.getLogger(__name__)
class ApdexStats(list):
"""Bucket for accumulating apdex metrics.
"""
# Is based on a list of length 6 as all metrics are sent to the core
# application as that and list as base class means it encodes direct
# to JSON as we need it. In this case only the first 3 entries are
# strictly used for the metric. The 4th and 5th entries are set to
# be the apdex_t value in use at the time.
def __init__(self, satisfying=0, tolerating=0, frustrating=0, apdex_t=0.0):
super(ApdexStats, self).__init__([satisfying, tolerating,
frustrating, apdex_t, apdex_t, 0])
satisfying = property(operator.itemgetter(0))
tolerating = property(operator.itemgetter(1))
frustrating = property(operator.itemgetter(2))
def merge_stats(self, other):
"""Merge data from another instance of this object."""
self[0] += other[0]
self[1] += other[1]
self[2] += other[2]
self[3] = ((self[0] or self[1] or self[2]) and
min(self[3], other[3]) or other[3])
self[4] = max(self[4], other[3])
def merge_apdex_metric(self, metric):
"""Merge data from an apdex metric object."""
self[0] += metric.satisfying
self[1] += metric.tolerating
self[2] += metric.frustrating
self[3] = ((self[0] or self[1] or self[2]) and
min(self[3], metric.apdex_t) or metric.apdex_t)
self[4] = max(self[4], metric.apdex_t)
class TimeStats(list):
"""Bucket for accumulating time and value metrics.
"""
# Is based on a list of length 6 as all metrics are sent to the core
# application as that and list as base class means it encodes direct
# to JSON as we need it.
def __init__(self, call_count=0, total_call_time=0.0,
total_exclusive_call_time=0.0, min_call_time=0.0,
max_call_time=0.0, sum_of_squares=0.0):
super(TimeStats, self).__init__([call_count, total_call_time,
total_exclusive_call_time, min_call_time,
max_call_time, sum_of_squares])
call_count = property(operator.itemgetter(0))
total_call_time = property(operator.itemgetter(1))
total_exclusive_call_time = property(operator.itemgetter(2))
min_call_time = property(operator.itemgetter(3))
max_call_time = property(operator.itemgetter(4))
sum_of_squares = property(operator.itemgetter(5))
def merge_stats(self, other):
"""Merge data from another instance of this object."""
self[1] += other[1]
self[2] += other[2]
self[3] = self[0] and min(self[3], other[3]) or other[3]
self[4] = max(self[4], other[4])
self[5] += other[5]
# Must update the call count last as update of the
# minimum call time is dependent on initial value.
self[0] += other[0]
def merge_raw_time_metric(self, duration, exclusive=None):
"""Merge time value."""
if exclusive is None:
exclusive = duration
self[1] += duration
self[2] += exclusive
self[3] = self[0] and min(self[3], duration) or duration
self[4] = max(self[4], duration)
self[5] += duration ** 2
# Must update the call count last as update of the
# minimum call time is dependent on initial value.
self[0] += 1
def merge_time_metric(self, metric):
"""Merge data from a time metric object."""
self.merge_raw_time_metric(metric.duration, metric.exclusive)
def merge_custom_metric(self, value):
"""Merge data value."""
self.merge_raw_time_metric(value)
class CustomMetrics(object):
"""Table for collection a set of value metrics.
"""
def __init__(self):
self.__stats_table = {}
def __contains__(self, key):
return key in self.__stats_table
def record_custom_metric(self, name, value):
"""Record a single value metric, merging the data with any data
from prior value metrics with the same name.
"""
stats = self.__stats_table.get(name)
if stats is None:
stats = TimeStats()
self.__stats_table[name] = stats
def c2t(count=0, total=0.0, min=0.0, max=0.0, sum_of_squares=0.0):
return (count, total, total, min, max, sum_of_squares)
try:
stats.merge_stats(TimeStats(*c2t(**value)))
except Exception:
stats.merge_custom_metric(value)
def metrics(self):
"""Returns an iterator over the set of value metrics. The items
returned are a tuple consisting of the metric name and accumulated
stats for the metric.
"""
return six.iteritems(self.__stats_table)
class SlowSqlStats(list):
def __init__(self):
super(SlowSqlStats, self).__init__([0, 0, 0, 0, None])
call_count = property(operator.itemgetter(0))
total_call_time = property(operator.itemgetter(1))
min_call_time = property(operator.itemgetter(2))
max_call_time = property(operator.itemgetter(3))
slow_sql_node = property(operator.itemgetter(4))
def merge_stats(self, other):
"""Merge data from another instance of this object."""
self[1] += other[1]
self[2] = self[0] and min(self[2], other[2]) or other[2]
self[3] = max(self[3], other[3])
if self[3] == other[3]:
self[4] = other[4]
# Must update the call count last as update of the
# minimum call time is dependent on initial value.
self[0] += other[0]
def merge_slow_sql_node(self, node):
"""Merge data from a slow sql node object."""
duration = node.duration
self[1] += duration
self[2] = self[0] and min(self[2], duration) or duration
self[3] = max(self[3], duration)
if self[3] == duration:
self[4] = node
# Must update the call count last as update of the
# minimum call time is dependent on initial value.
self[0] += 1
class SampledDataSet(object):
def __init__(self, capacity=100):
self.samples = []
self.capacity = capacity
self.num_seen = 0
@property
def num_samples(self):
return len(self.samples)
@property
def sampling_info(self):
return {
'reservoir_size' : self.capacity,
'events_seen' : self.num_seen
}
def reset(self):
self.samples = []
self.num_seen = 0
def add(self, sample):
if self.num_samples < self.capacity:
self.samples.append(sample)
else:
index = random.randint(0, self.num_seen)
if index < self.capacity:
self.samples[index] = sample
self.num_seen += 1
def merge(self, other_data_set):
for item in other_data_set.samples:
self.add(item)
# Make sure num_seen includes total items seen from merged set
self.num_seen += other_data_set.num_seen - other_data_set.num_samples
class StatsEngine(object):
"""The stats engine object holds the accumulated transactions metrics,
details of errors and slow transactions. There should be one instance
of the stats engine per application. This will be cleared upon each
successful harvest of data whereby it is sent to the core application.
No data will however be accumulated while there is no associated
settings object indicating that application has been successfully
activated and server side settings received.
All of the accumulated apdex, time and value metrics are mapped to from
the same stats table. The key is comprised of a tuple (name, scope).
For an apdex metric the scope is None. Time metrics should always have
a string as the scope and it can be either empty or not. Value metrics
technically overlap in same namespace as time metrics as the scope is
always an empty string. There are however no checks against adding a
value metric which clashes with an existing time metric or vice versa.
If that is done then the results will simply be wrong. The name chose
for a time or value metric should thus be chosen wisely so as not to
clash.
Note that there is no locking performed within the stats engine itself.
It is assumed the holder and user of the instance performs adequate
external locking to ensure that multiple threads do not try and update
it at the same time.
"""
def __init__(self):
self.__settings = None
self.__stats_table = {}
self.__transaction_events = SampledDataSet()
self.__error_events = SampledDataSet()
self.__custom_events = SampledDataSet()
self.__sql_stats_table = {}
self.__slow_transaction = None
self.__slow_transaction_map = {}
self.__slow_transaction_old_duration = None
self.__slow_transaction_dry_harvests = 0
self.__transaction_errors = []
self.__metric_ids = {}
self.__synthetics_events = []
self.__synthetics_transactions = []
self.__xray_transactions = []
self.xray_sessions = {}
@property
def settings(self):
return self.__settings
@property
def stats_table(self):
return self.__stats_table
@property
def metric_ids(self):
"""Returns a reference to the dictionary containing the mappings
from metric (name, scope) to the integer identifier supplied
back from the core application. These integer identifiers are
used when sending data to the core application to cut down on
the size of data being sent.
"""
return self.__metric_ids
@property
def transaction_events(self):
return self.__transaction_events
@property
def custom_events(self):
return self.__custom_events
@property
def synthetics_events(self):
return self.__synthetics_events
@property
def synthetics_transactions(self):
return self.__synthetics_transactions
@property
def error_events(self):
return self.__error_events
def error_events_sampling_info(self):
sampling_info = {
'reservoir_size' : self.error_events.capacity,
'events_seen' : self.error_events.num_seen
}
return sampling_info
def update_metric_ids(self, metric_ids):
"""Updates the dictionary containing the mappings from metric
(name, scope) to the integer identifier supplied back from the
core application. The input should be an iterable returning a
list of pairs where first is a dictionary with name and scope
keys with corresponding values. The second should be the integer
identifier. The dictionary is converted to a (name, scope) tuple
for use as key into the internal dictionary containing the
mappings.
"""
for key, value in metric_ids:
key = (key['name'], key['scope'])
self.__metric_ids[key] = value
def metrics_count(self):
"""Returns a count of the number of unique metrics currently
recorded for apdex, time and value metrics.
"""
return len(self.__stats_table)
def record_apdex_metric(self, metric):
"""Record a single apdex metric, merging the data with any data
from prior apdex metrics with the same name.
"""
if not self.__settings:
return
# Note that because we are using a scope here of an empty string
# we can potentially clash with an unscoped metric. Using None,
# although it may help to keep them separate in the agent will
# not make a difference to the data collector which treats None
# as an empty string anyway.
key = (metric.name, '')
stats = self.__stats_table.get(key)
if stats is None:
stats = ApdexStats(apdex_t=metric.apdex_t)
self.__stats_table[key] = stats
stats.merge_apdex_metric(metric)
return key
def record_apdex_metrics(self, metrics):
"""Record the apdex metrics supplied by the iterable for a
single transaction, merging the data with any data from prior
apdex metrics with the same name.
"""
if not self.__settings:
return
for metric in metrics:
self.record_apdex_metric(metric)
def record_time_metric(self, metric):
"""Record a single time metric, merging the data with any data
from prior time metrics with the same name and scope.
"""
if not self.__settings:
return
# Scope is forced to be empty string if None as
# scope of None is reserved for apdex metrics.
key = (metric.name, metric.scope or '')
stats = self.__stats_table.get(key)
if stats is None:
stats = TimeStats()
self.__stats_table[key] = stats
stats.merge_time_metric(metric)
return key
def record_time_metrics(self, metrics):
"""Record the time metrics supplied by the iterable for a single
transaction, merging the data with any data from prior time
metrics with the same name and scope.
"""
if not self.__settings:
return
for metric in metrics:
self.record_time_metric(metric)
def record_exception(self, exc=None, value=None, tb=None, params={},
ignore_errors=[]):
settings = self.__settings
if not settings:
return
error_collector = settings.error_collector
if not error_collector.enabled:
return
if not settings.collect_errors and not settings.collect_error_events:
return
# If no exception details provided, use current exception.
if exc is None and value is None and tb is None:
exc, value, tb = sys.exc_info()
# Has to be an error to be logged.
if exc is None or value is None or tb is None:
return
# Where ignore_errors is a callable it should return a
# tri-state variable with the following behavior.
#
# True - Ignore the error.
# False- Record the error.
# None - Use the default ignore rules.
should_ignore = None
if callable(ignore_errors):
should_ignore = ignore_errors(exc, value, tb)
if should_ignore:
return
module = value.__class__.__module__
name = value.__class__.__name__
if should_ignore is None:
# We need to check for module.name and module:name.
# Originally we used module.class but that was
# inconsistent with everything else which used
# module:name. So changed to use ':' as separator, but
# for backward compatibility need to support '.' as
# separator for time being. Check that with the ':'
# last as we will use that name as the exception type.
if module:
fullname = '%s.%s' % (module, name)
else:
fullname = name
if not callable(ignore_errors) and fullname in ignore_errors:
return
if fullname in error_collector.ignore_errors:
return
if module:
fullname = '%s:%s' % (module, name)
else:
fullname = name
if not callable(ignore_errors) and fullname in ignore_errors:
return
if fullname in error_collector.ignore_errors:
return
else:
if module:
fullname = '%s:%s' % (module, name)
else:
fullname = name
# Only add params if High Security Mode is off.
if settings.high_security:
if params:
_logger.debug('Cannot add custom parameters in '
'High Security Mode.')
attributes = []
else:
custom_params = {}
try:
for k, v in params.items():
name, val = process_user_attribute(k, v)
if name:
custom_params[name] = val
except Exception:
_logger.debug('Parameters failed to validate for unknown '
'reason. Dropping parameters for error: %r. Check '
'traceback for clues.', fullname, exc_info=True)
custom_params = {}
attributes = create_user_attributes(custom_params,
settings.attribute_filter)
# Check to see if we need to strip the message before recording it.
if (settings.strip_exception_messages.enabled and
fullname not in settings.strip_exception_messages.whitelist):
message = STRIP_EXCEPTION_MESSAGE
else:
try:
# Favor unicode in exception messages.
message = six.text_type(value)
except Exception:
try:
# If exception cannot be represented in unicode, this means
# that it is a byte string encoded with an encoding
# that is not compatible with the default system encoding.
# So, just pass this byte string along.
message = str(value)
except Exception:
message = '<unprintable %s object>' % type(value).__name__
# Record the exception details.
params = {}
params["stack_trace"] = exception_stack(tb)
# filter custom error specific params using attribute filter (user)
params['userAttributes'] = {}
for attr in attributes:
if attr.destinations & DST_ERROR_COLLECTOR:
params['userAttributes'][attr.name] = attr.value
error_details = TracedError(
start_time=time.time(),
path='Exception',
message=message,
type=fullname,
parameters=params)
# Save this error as a trace and an event.
if error_collector.capture_events and settings.collect_error_events:
event = self._error_event(error_details)
self.__error_events.add(event)
if settings.collect_errors and (len(self.__transaction_errors) <
settings.agent_limits.errors_per_harvest):
self.__transaction_errors.append(error_details)
# Regardless of whether we record the trace or the event we still
# want to increment the metric Errors/all
self.record_time_metric(TimeMetric(name='Errors/all', scope='',
duration=0.0, exclusive=None))
def _error_event(self, error):
# This method is for recording error events outside of transactions,
# don't let the poorly named 'type' attribute fool you.
intrinsics = {
'type' : 'TransactionError',
'error.class' : error.type,
'error.message' : error.message,
'timestamp' : error.start_time,
'transactionName' : None,
}
# Leave agent attributes field blank since not a transaction
error_event = [intrinsics, error.parameters['userAttributes'], {}]
return error_event
def record_custom_event(self, event):
settings = self.__settings
if not settings:
return
if (settings.collect_custom_events
and settings.custom_insights_events.enabled):
self.__custom_events.add(event)
def record_custom_metric(self, name, value):
"""Record a single value metric, merging the data with any data
from prior value metrics with the same name.
"""
if not self.__settings:
return
# Scope is forced to be empty string. This means
# that it can overlap with a time metric, but no
# validation is done to avoid clashes and mixing
# the two types of metrics will simply cause
# incorrect data.
key = (name, '')
stats = self.__stats_table.get(key)
if stats is None:
stats = TimeStats()
self.__stats_table[key] = stats
def c2t(count=0, total=0.0, min=0.0, max=0.0, sum_of_squares=0.0):
return (count, total, total, min, max, sum_of_squares)
try:
stats.merge_stats(TimeStats(*c2t(**value)))
except Exception:
stats.merge_custom_metric(value)
return key
def record_custom_metrics(self, metrics):
"""Record the value metrics supplied by the iterable, merging
the data with any data from prior value metrics with the same
name.
"""
if not self.__settings:
return
for name, value in metrics:
self.record_custom_metric(name, value)
def record_slow_sql_node(self, node):
"""Record a single sql metric, merging the data with any data
from prior sql metrics for the same sql key.
"""
if not self.__settings:
return
key = node.identifier
stats = self.__sql_stats_table.get(key)
if stats is None:
# Only record slow SQL if not already over the limit on
# how many can be collected in the harvest period.
settings = self.__settings
maximum = settings.agent_limits.slow_sql_data
if len(self.__sql_stats_table) < maximum:
stats = SlowSqlStats()
self.__sql_stats_table[key] = stats
if stats:
stats.merge_slow_sql_node(node)
return key
def _update_xray_transaction(self, transaction):
"""Check if transaction is an x-ray transaction and save it to the
__xray_transactions
"""
settings = self.__settings
# Nothing to do if we have reached the max limit of x-ray transactions
# to send per harvest.
maximum = settings.agent_limits.xray_transactions
if len(self.__xray_transactions) >= maximum:
return
# If current transaction qualifies as an xray_transaction, set the
# xray_id on the transaction object and save it in the
# xray_transactions list.
xray_session = self.xray_sessions.get(transaction.path)
if xray_session:
transaction.xray_id = xray_session.xray_id
self.__xray_transactions.append(transaction)
def _update_slow_transaction(self, transaction):
"""Check if transaction is the slowest transaction and update
accordingly.
"""
slowest = 0
name = transaction.path
if self.__slow_transaction:
slowest = self.__slow_transaction.duration
if name in self.__slow_transaction_map:
slowest = max(self.__slow_transaction_map[name], slowest)
if transaction.duration > slowest:
# We are going to replace the prior slow transaction.
# We need to be a bit tricky here. If we are overriding
# an existing slow transaction for a different name,
# then we need to restore in the transaction map what
# the previous slowest duration was for that, or remove
# it if there wasn't one. This is so we do not incorrectly
# suppress it given that it was never actually reported
# as the slowest transaction.
if self.__slow_transaction:
if self.__slow_transaction.path != name:
if self.__slow_transaction_old_duration:
self.__slow_transaction_map[
self.__slow_transaction.path] = (
self.__slow_transaction_old_duration)
else:
del self.__slow_transaction_map[
self.__slow_transaction.path]
if name in self.__slow_transaction_map:
self.__slow_transaction_old_duration = (
self.__slow_transaction_map[name])
else:
self.__slow_transaction_old_duration = None
self.__slow_transaction = transaction
self.__slow_transaction_map[name] = transaction.duration
def _update_synthetics_transaction(self, transaction):
"""Check if transaction is a synthetics trace and save it to
__synthetics_transactions.
"""
settings = self.__settings
if not transaction.synthetics_resource_id:
return
maximum = settings.agent_limits.synthetics_transactions
if len(self.__synthetics_transactions) < maximum:
self.__synthetics_transactions.append(transaction)
@internal_trace('Supportability/Python/StatsEngine/Calls/'
'record_transaction')
def record_transaction(self, transaction):
"""Record any apdex and time metrics for the transaction as
well as any errors which occurred for the transaction. If the
transaction qualifies to become the slow transaction remember
it for later.
"""
if not self.__settings:
return
settings = self.__settings
# Record the apdex, value and time metrics generated from the
# transaction. Whether time metrics are reported as distinct
# metrics or into a rollup is in part controlled via settings
# for minimum number of unique metrics to be reported and thence
# whether over a time threshold calculated as percentage of
# overall request time, up to a maximum number of unique
# metrics. This is intended to limit how many metrics are
# reported for each transaction and try and cut down on an
# explosion of unique metric names. The limits and thresholds
# are applied after the metrics are reverse sorted based on
# exclusive times for each metric. This ensures that the metrics
# with greatest exclusive time are retained over those with
# lesser time. Such metrics get reported into the performance
# breakdown tab for specific web transactions.
with InternalTrace('Supportability/Python/TransactionNode/Calls/'
'apdex_metrics'):
self.record_apdex_metrics(transaction.apdex_metrics(self))
with InternalTrace('Supportability/Python/TransactionNode/Calls/'
'value_metrics'):
self.merge_custom_metrics(transaction.custom_metrics.metrics())
with InternalTrace('Supportability/Python/TransactionNode/Calls/'
'time_metrics'):
self.record_time_metrics(transaction.time_metrics(self))
# Capture any errors if error collection is enabled.
# Only retain maximum number allowed per harvest.
error_collector = settings.error_collector
if (error_collector.enabled and settings.collect_errors and
len(self.__transaction_errors) <
settings.agent_limits.errors_per_harvest):
with InternalTrace('Supportability/Python/TransactionNode/Calls/'
'error_details'):
self.__transaction_errors.extend(transaction.error_details())
self.__transaction_errors = self.__transaction_errors[:
settings.agent_limits.errors_per_harvest]
if (error_collector.capture_events and error_collector.enabled
and settings.collect_error_events):
events = transaction.error_events(self.__stats_table)
for event in events:
self.__error_events.add(event)
# Capture any sql traces if transaction tracer enabled.
if settings.slow_sql.enabled and settings.collect_traces:
with InternalTrace('Supportability/Python/TransactionNode/Calls/'
'slow_sql_nodes'):
for node in transaction.slow_sql_nodes(self):
self.record_slow_sql_node(node)
# Remember as slowest transaction if transaction tracer
# is enabled, it is over the threshold and slower than
# any existing transaction seen for this period and in
# the historical snapshot of slow transactions, plus
# recording of transaction trace for this transaction
# has not been suppressed.
transaction_tracer = settings.transaction_tracer
if (not transaction.suppress_transaction_trace and
transaction_tracer.enabled and settings.collect_traces):
# Transactions saved for x-ray session and Synthetics transactions
# do not depend on the transaction threshold.
self._update_xray_transaction(transaction)
self._update_synthetics_transaction(transaction)
threshold = transaction_tracer.transaction_threshold
if threshold is None:
threshold = transaction.apdex_t * 4
if transaction.duration >= threshold:
self._update_slow_transaction(transaction)
# Create the transaction event and add it to the
# appropriate "bucket." Synthetic requests are saved in one,
# while transactions from regular requests are saved in another.
if transaction.synthetics_resource_id:
if (len(self.__synthetics_events) <
settings.agent_limits.synthetics_events):
event = transaction.transaction_event(self.__stats_table)
self.__synthetics_events.append(event)
elif (settings.collect_analytics_events and
settings.transaction_events.enabled):
event = transaction.transaction_event(self.__stats_table)
self.__transaction_events.add(event)
# Merge in custom events
if (settings.collect_custom_events and
settings.custom_insights_events.enabled):
self.custom_events.merge(transaction.custom_events)
@internal_trace('Supportability/Python/StatsEngine/Calls/metric_data')
def metric_data(self, normalizer=None):
"""Returns a list containing the low level metric data for
sending to the core application pertaining to the reporting
period. This consists of tuple pairs where first is dictionary
with name and scope keys with corresponding values, or integer
identifier if metric had an entry in dictionary mapping metric
(name, scope) as supplied from core application. The second is
the list of accumulated metric data, the list always being of
length 6.
"""
if not self.__settings:
return []
result = []
normalized_stats = {}
# Metric Renaming and Re-Aggregation. After applying the metric
# renaming rules, the metrics are re-aggregated to collapse the
# metrics with same names after the renaming.
if self.__settings.debug.log_raw_metric_data:
_logger.info('Raw metric data for harvest of %r is %r.',
self.__settings.app_name,
list(six.iteritems(self.__stats_table)))
if normalizer is not None:
for key, value in six.iteritems(self.__stats_table):
key = (normalizer(key[0])[0] , key[1])
stats = normalized_stats.get(key)
if stats is None:
normalized_stats[key] = copy.copy(value)
else:
stats.merge_stats(value)
else:
normalized_stats = self.__stats_table
if self.__settings.debug.log_normalized_metric_data:
_logger.info('Normalized metric data for harvest of %r is %r.',
self.__settings.app_name,
list(six.iteritems(normalized_stats)))
for key, value in six.iteritems(normalized_stats):
if key not in self.__metric_ids:
key = dict(name=key[0], scope=key[1])
else:
key = self.__metric_ids[key]
result.append((key, value))
return result
def metric_data_count(self):
"""Returns a count of the number of unique metrics.
"""
if not self.__settings:
return 0
return len(self.__stats_table)
@internal_trace('Supportability/Python/StatsEngine/Calls/error_data')
def error_data(self):
"""Returns a to a list containing any errors collected during
the reporting period.
"""
if not self.__settings:
return []
return self.__transaction_errors
@internal_trace('Supportability/Python/StatsEngine/Calls/slow_sql_data')
def slow_sql_data(self, connections):
_logger.debug('Generating slow SQL data.')
if not self.__settings:
return []
if not self.__sql_stats_table:
return []
if not self.__settings.slow_sql.enabled:
return []
maximum = self.__settings.agent_limits.slow_sql_data
slow_sql_nodes = sorted(six.itervalues(self.__sql_stats_table),
key=lambda x: x.max_call_time)[-maximum:]
result = []
for stats_node in slow_sql_nodes:
params = {}
slow_sql_node = stats_node.slow_sql_node
if slow_sql_node.stack_trace:
params['backtrace'] = slow_sql_node.stack_trace
explain_plan_data = explain_plan(connections,
slow_sql_node.statement,
slow_sql_node.connect_params,
slow_sql_node.cursor_params,
slow_sql_node.sql_parameters,
slow_sql_node.execute_params,
slow_sql_node.sql_format)
if explain_plan_data:
params['explain_plan'] = explain_plan_data
json_data = json_encode(params)
level = self.__settings.agent_limits.data_compression_level
level = level or zlib.Z_DEFAULT_COMPRESSION
params_data = base64.standard_b64encode(
zlib.compress(six.b(json_data), level))
if six.PY3:
params_data = params_data.decode('Latin-1')
# Limit the length of any SQL that is reported back.
limit = self.__settings.agent_limits.sql_query_length_maximum
sql = slow_sql_node.formatted[:limit]
data = [slow_sql_node.path,
slow_sql_node.request_uri,
slow_sql_node.identifier,
sql,
slow_sql_node.metric,
stats_node.call_count,
stats_node.total_call_time * 1000,
stats_node.min_call_time * 1000,
stats_node.max_call_time * 1000,
params_data]
result.append(data)
return result
@internal_trace('Supportability/Python/StatsEngine/Calls/'
'transaction_trace_data')
def transaction_trace_data(self, connections):
"""Returns a list of slow transaction data collected
during the reporting period.
"""
_logger.debug('Generating transaction trace data.')
if not self.__settings:
return []
# Create a set 'traces' that is a union of slow transaction,
# xray_transactions, and Synthetics transactions.
# This ensures we don't send duplicates of a transaction.
traces = set()
if self.__slow_transaction:
traces.add(self.__slow_transaction)
traces.update(self.__xray_transactions)
traces.update(self.__synthetics_transactions)
# Return an empty list if no transactions were captured.
if not traces:
return []
# We want to limit the number of explain plans we do across
# these. So work out what were the slowest and tag them.
# Later the explain plan will only be run on those which are
# tagged.
agent_limits = self.__settings.agent_limits
explain_plan_limit = agent_limits.sql_explain_plans_per_harvest
maximum_nodes = agent_limits.transaction_traces_nodes
database_nodes = []
if explain_plan_limit != 0:
for trace in traces:
for node in trace.slow_sql:
# Make sure we clear any flag for explain plans on
# the nodes in case a transaction trace was merged
# in from previous harvest period.
node.generate_explain_plan = False
# Node should be excluded if not for an operation
# that we can't do an explain plan on. Also should
# not be one which would not be included in the
# transaction trace because limit was reached.
if (node.node_count < maximum_nodes and
node.connect_params and node.statement.operation in
node.statement.database.explain_stmts):
database_nodes.append(node)
database_nodes = sorted(database_nodes,
key=lambda x: x.duration)[-explain_plan_limit:]
for node in database_nodes:
node.generate_explain_plan = True
else:
for trace in traces:
for node in trace.slow_sql:
node.generate_explain_plan = True
database_nodes.append(node)
# Now generate the transaction traces. We need to cap the
# number of nodes capture to the specified limit.
trace_data = []
for trace in traces:
transaction_trace = trace.transaction_trace(
self, maximum_nodes, connections)
internal_metric('Supportability/Python/StatsEngine/Counts/'
'transaction_sample_data', trace.trace_node_count)
data = [transaction_trace,
list(trace.string_table.values())]
if self.__settings.debug.log_transaction_trace_payload:
_logger.debug('Encoding slow transaction data where '
'payload=%r.', data)
with InternalTrace('Supportability/Python/StatsEngine/JSON/'
'Encode/transaction_sample_data'):
json_data = json_encode(data)
internal_metric('Supportability/Python/StatsEngine/ZLIB/Bytes/'
'transaction_sample_data', len(json_data))
level = self.__settings.agent_limits.data_compression_level
level = level or zlib.Z_DEFAULT_COMPRESSION
with InternalTrace('Supportability/Python/StatsEngine/ZLIB/'
'Compress/transaction_sample_data'):
zlib_data = zlib.compress(six.b(json_data), level)
with InternalTrace('Supportability/Python/StatsEngine/BASE64/'
'Encode/transaction_sample_data'):
pack_data = base64.standard_b64encode(zlib_data)
if six.PY3:
pack_data = pack_data.decode('Latin-1')
root = transaction_trace.root
xray_id = getattr(trace, 'xray_id', None)
if (xray_id or trace.record_tt):
force_persist = True
else:
force_persist = False
trace_data.append([root.start_time,
root.end_time - root.start_time,
trace.path,
trace.request_uri,
pack_data,
trace.guid,
None,
force_persist,
xray_id,
trace.synthetics_resource_id,])
return trace_data
@internal_trace('Supportability/Python/StatsEngine/Calls/'
'slow_transaction_data')
def slow_transaction_data(self):
"""Returns a list containing any slow transaction data collected
during the reporting period.
NOTE Currently only the slowest transaction for the reporting
period is retained.
"""
# XXX This method no longer appears to be used. Being replaced
# by the transaction_trace_data() method.
if not self.__settings:
return []
if not self.__slow_transaction:
return []
maximum = self.__settings.agent_limits.transaction_traces_nodes
transaction_trace = self.__slow_transaction.transaction_trace(
self, maximum)
internal_metric('Supportability/Python/StatsEngine/Counts/'
'transaction_sample_data',
self.__slow_transaction.trace_node_count)
data = [transaction_trace,
list(self.__slow_transaction.string_table.values())]
if self.__settings.debug.log_transaction_trace_payload:
_logger.debug('Encoding slow transaction data where '
'payload=%r.', data)
with InternalTrace('Supportability/Python/StatsEngine/JSON/Encode/'
'transaction_sample_data'):
json_data = json_encode(data)
internal_metric('Supportability/Python/StatsEngine/ZLIB/Bytes/'
'transaction_sample_data', len(json_data))
level = self.__settings.agent_limits.data_compression_level
level = level or zlib.Z_DEFAULT_COMPRESSION
with InternalTrace('Supportability/Python/StatsEngine/ZLIB/Compress/'
'transaction_sample_data'):
zlib_data = zlib.compress(six.b(json_data), level)
with InternalTrace('Supportability/Python/StatsEngine/BASE64/Encode/'
'transaction_sample_data'):
pack_data = base64.standard_b64encode(zlib_data)
if six.PY3:
pack_data = pack_data.decode('Latin-1')
root = transaction_trace.root
trace_data = [[root.start_time,
root.end_time - root.start_time,
self.__slow_transaction.path,
self.__slow_transaction.request_uri,
pack_data]]
return trace_data
def reset_stats(self, settings):
"""Resets the accumulated statistics back to initial state and
associates the application settings object with the stats
engine. This should be called when application is first
activated and combined application settings incorporating server
side settings are available. Would also be called on any forced
restart of agent or a reconnection due to loss of connection.
"""
self.__settings = settings
self.__stats_table = {}
self.__sql_stats_table = {}
self.__slow_transaction = None
self.__slow_transaction_map = {}
self.__slow_transaction_old_duration = None
self.__transaction_errors = []
self.__metric_ids = {}
self.__synthetics_events = []
self.__synthetics_transactions = []
self.__xray_transactions = []
self.xray_sessions = {}
self.reset_transaction_events()
self.reset_error_events()
self.reset_custom_events()
def reset_metric_stats(self):
"""Resets the accumulated statistics back to initial state for
metric data.
"""
self.__stats_table = {}
def reset_transaction_events(self):
"""Resets the accumulated statistics back to initial state for
sample analytics data.
"""
if self.__settings is not None:
self.__transaction_events = SampledDataSet(
self.__settings.transaction_events.max_samples_stored)
else:
self.__transaction_events = SampledDataSet()
def reset_error_events(self):
if self.__settings is not None:
self.__error_events = SampledDataSet(
self.__settings.error_collector.max_event_samples_stored)
else:
self.__error_events = SampledDataSet()
def reset_custom_events(self):
if self.__settings is not None:
self.__custom_events = SampledDataSet(
self.__settings.custom_insights_events.max_samples_stored)
else:
self.__custom_events = SampledDataSet()
def reset_synthetics_events(self):
"""Resets the accumulated statistics back to initial state for
Synthetics events data.
"""
self.__synthetics_events = []
def harvest_snapshot(self):
"""Creates a snapshot of the accumulated statistics, error
details and slow transaction and returns it. This is a shallow
copy, only copying the top level objects. The originals are then
reset back to being empty, with the exception of the dictionary
mapping metric (name, scope) to the integer identifiers received
from the core application. The latter is retained as should
carry forward to subsequent runs. This method would be called
to snapshot the data when doing the harvest.
"""
stats = copy.copy(self)
# The slow transaction map is retained but we need to
# perform some housework on each harvest snapshot. What
# we do is add the slow transaction to the map of
# transactions and if we reach the threshold for maximum
# number we clear the table. Also clear the table if
# have number of harvests where no slow transaction was
# collected.
if self.__settings is None:
self.__slow_transaction_dry_harvests = 0
self.__slow_transaction_map = {}
self.__slow_transaction_old_duration = None
elif self.__slow_transaction is None:
self.__slow_transaction_dry_harvests += 1
agent_limits = self.__settings.agent_limits
dry_harvests = agent_limits.slow_transaction_dry_harvests
if self.__slow_transaction_dry_harvests >= dry_harvests:
self.__slow_transaction_dry_harvests = 0
self.__slow_transaction_map = {}
self.__slow_transaction_old_duration = None
else:
self.__slow_transaction_dry_harvests = 0
name = self.__slow_transaction.path
duration = self.__slow_transaction.duration
self.__slow_transaction_map[name] = duration
top_n = self.__settings.transaction_tracer.top_n
if len(self.__slow_transaction_map) >= top_n:
self.__slow_transaction_map = {}
self.__slow_transaction_old_duration = None
# We also retain the table of metric IDs. This should be
# okay for continuing connection. If connection is lost
# then reset_engine() above would be called and it would
# be all thrown away so no chance of following through
# with incorrect mappings. Everything else is reset to
# initial values.
self.__stats_table = {}
self.__sql_stats_table = {}
self.__slow_transaction = None
self.__transaction_errors = []
self.__xray_transactions = []
self.__synthetics_events = []
self.__synthetics_transactions = []
self.reset_transaction_events()
self.reset_error_events()
self.reset_custom_events()
return stats
def create_workarea(self):
"""Creates and returns a new empty stats engine object. This would
be used to distill stats from a single web transaction before then
merging it back into the parent under a thread lock.
"""
stats = StatsEngine()
stats.__settings = self.__settings
stats.xray_sessions = self.xray_sessions
return stats
def merge(self, snapshot):
"""Merges data from a single transaction. Snapshot is an instance of
StatsEngine that contains stats for the single transaction.
"""
if not self.__settings:
return
self.merge_metric_stats(snapshot)
self._merge_transaction_events(snapshot)
self._merge_synthetics_events(snapshot)
self._merge_error_events(snapshot)
self._merge_error_traces(snapshot)
self._merge_custom_events(snapshot)
self._merge_sql(snapshot)
self._merge_traces(snapshot)
def rollback(self, snapshot):
"""Performs a "rollback" merge after a failed harvest. Snapshot is a
copy of the main StatsEngine data that we attempted to harvest, but
failed. Not all types of data get merged during a rollback.
"""
if not self.__settings:
return
_logger.debug('Performing rollback of data into '
'subsequent harvest period. Metric data and transaction events'
'will be preserved and rolled into next harvest')
self.merge_metric_stats(snapshot)
self._merge_transaction_events(snapshot, rollback=True)
self._merge_synthetics_events(snapshot, rollback=True)
self._merge_error_events(snapshot)
self._merge_custom_events(snapshot, rollback=True)
def merge_metric_stats(self, snapshot):
"""Merges metric data from a snapshot. This is used both when merging
data from a single transaction into the main stats engine, and for
performing a rollback merge. In either case, the merge is done the exact
same way.
"""
if not self.__settings:
return
for key, other in six.iteritems(snapshot.__stats_table):
stats = self.__stats_table.get(key)
if not stats:
self.__stats_table[key] = copy.copy(other)
else:
stats.merge_stats(other)
def _merge_transaction_events(self, snapshot, rollback=False):
# Merge in transaction events. In the normal case snapshot is a
# StatsEngine from a single transaction, and should only have one event.
# Just to avoid issues, if there is more than one, don't merge.
# If this is a rollback, snapshot is a copy of a previous main
# StatsEngine, and self is still the current main StatsEngine. Then
# we are merging multiple events, but still using the reservoir sampling
# that gives equal probability for keeping all events
if rollback:
for sample in snapshot.__transaction_events.samples:
self.__transaction_events.add(sample)
else:
if snapshot.__transaction_events.num_samples == 1:
self.__transaction_events.add(
snapshot.__transaction_events.samples[0])
def _merge_synthetics_events(self, snapshot, rollback=False):
# Merge Synthetic analytic events, appending to the list
# that contains events from previous transactions. In the normal
# case snapshot is a StatsEngine from a single transaction, and should
# only have one event. Cap this list at a maximum, so that newer events
# over the limit will be thrown out.
# If this is a rollback, snapshot is a copy of a previous main
# StatsEngine, and self is still the current main StatsEngine,
# Thus, the events already existing in this object will be newer than
# those in snapshot, and we favor the newer events.
if rollback:
self.__synthetics_events.extend(snapshot.__synthetics_events)
else:
if len(snapshot.__synthetics_events) == 1:
self.__synthetics_events.append(
snapshot.__synthetics_events[0])
maximum = self.__settings.agent_limits.synthetics_events
self.__synthetics_events = self.__synthetics_events[:maximum]
def _merge_error_events(self, snapshot):
# Merge in error events. Since we are using reservoir sampling that
# gives equal probability to keeping each event, merge is the same as
# rollback. There may be multiple error events per transaction.
self.__error_events.merge(snapshot.error_events)
def _merge_custom_events(self, snapshot, rollback=False):
self.__custom_events.merge(snapshot.custom_events)
def _merge_error_traces(self, snapshot):
# Append snapshot error details at end to maintain time
# based order and then trim at maximum to be kept. snapshot will
# always have newer data.
maximum = self.__settings.agent_limits.errors_per_harvest
self.__transaction_errors.extend(snapshot.__transaction_errors)
self.__transaction_errors = self.__transaction_errors[:maximum]
def _merge_sql(self, snapshot):
# Add sql traces to the set of existing entries. If over
# the limit of how many to collect, only merge in if already
# seen the specific SQL.
for key, slow_sql_stats in six.iteritems(snapshot.__sql_stats_table):
stats = self.__sql_stats_table.get(key)
if not stats:
maximum = self.__settings.agent_limits.slow_sql_data
if len(self.__sql_stats_table) < maximum:
self.__sql_stats_table[key] = copy.copy(slow_sql_stats)
else:
stats.merge_stats(slow_sql_stats)
def _merge_traces(self, snapshot):
# Limit number of x-ray traces to the limit.
# Spill over traces after the limit should have no x-ray ids. This
# qualifies the trace to be considered for slow transaction.
maximum = self.__settings.agent_limits.xray_transactions
self.__xray_transactions.extend(snapshot.__xray_transactions)
for txn in self.__xray_transactions[maximum:]:
txn.xray_id = None
self.__xray_transactions = self.__xray_transactions[:maximum]
# Limit number of Synthetics transactions
maximum = self.__settings.agent_limits.synthetics_transactions
self.__synthetics_transactions.extend(
snapshot.__synthetics_transactions)
synthetics_slice = self.__synthetics_transactions[:maximum]
self.__synthetics_transactions = synthetics_slice
transaction = snapshot.__slow_transaction
# If the transaction has an xray_id then it does not qualify to
# be considered for slow transaction. This is because in the Core
# app, there is logic to NOT show TTs with x-ray ids in the
# WebTransactions tab. If a TT has xray_id it is only shown under
# the x-ray page.
xray_id = getattr(transaction, 'xray_id', None)
if transaction and xray_id is None:
# Restore original slow transaction if slower than any newer slow
# transaction.
self._update_slow_transaction(transaction)
def merge_custom_metrics(self, metrics):
"""Merges in a set of custom metrics. The metrics should be
provide as an iterable where each item is a tuple of the metric
name and the accumulated stats for the metric.
"""
if not self.__settings:
return
for name, other in metrics:
key = (name, '')
stats = self.__stats_table.get(key)
if not stats:
self.__stats_table[key] = copy.copy(other)
else:
stats.merge_stats(other)