Repository URL to install this package:
|
Version:
1:7.26.0-1 ▾
|
datadog-agent
/
opt
/
datadog-agent
/
embedded
/
lib
/
python3.8
/
site-packages
/
datadog_checks
/
elastic
/
elastic.py
|
|---|
# (C) Datadog, Inc. 2018-present
# All rights reserved
# Licensed under Simplified BSD License (see LICENSE)
import time
from collections import defaultdict
import requests
from six import iteritems, itervalues
from six.moves.urllib.parse import urljoin, urlparse
from datadog_checks.base import AgentCheck, to_string
from .config import from_instance
from .metrics import (
CLUSTER_PENDING_TASKS,
health_stats_for_version,
index_stats_for_version,
node_system_stats_for_version,
pshard_stats_for_version,
slm_stats_for_version,
stats_for_version,
)
class AuthenticationError(requests.exceptions.HTTPError):
"""Authentication Error, unable to reach server"""
class ESCheck(AgentCheck):
HTTP_CONFIG_REMAPPER = {
'aws_service': {'name': 'aws_service', 'default': 'es'},
'ssl_verify': {'name': 'tls_verify'},
'ssl_cert': {'name': 'tls_cert'},
'ssl_key': {'name': 'tls_private_key'},
}
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
SOURCE_TYPE_NAME = 'elasticsearch'
def __init__(self, name, init_config, instances):
super(ESCheck, self).__init__(name, init_config, instances)
# Host status needs to persist across all checks
self.cluster_status = {}
if self.instance.get('auth_type') == 'aws' and self.instance.get('url'):
self.HTTP_CONFIG_REMAPPER = self.HTTP_CONFIG_REMAPPER.copy()
self.HTTP_CONFIG_REMAPPER['aws_host'] = {
'name': 'aws_host',
'default': urlparse(self.instance['url']).hostname,
}
self.config = from_instance(self.instance)
def check(self, _):
admin_forwarder = self.config.admin_forwarder
jvm_rate = self.instance.get('gc_collectors_as_rate', False)
base_tags = list(self.config.tags)
service_check_tags = list(self.config.service_check_tags)
# Check ES version for this instance and define parameters
# (URLs and metrics) accordingly
try:
version = self._get_es_version()
except AuthenticationError:
self.log.exception("The ElasticSearch credentials are incorrect")
raise
health_url, stats_url, pshard_stats_url, pending_tasks_url, slm_url = self._get_urls(version)
stats_metrics = stats_for_version(version, jvm_rate)
if self.config.cluster_stats:
# Include Node System metrics
stats_metrics.update(node_system_stats_for_version(version))
pshard_stats_metrics = pshard_stats_for_version(version)
# Load stats data.
# This must happen before other URL processing as the cluster name
# is retrieved here, and added to the tag list.
stats_url = self._join_url(stats_url, admin_forwarder)
stats_data = self._get_data(stats_url)
if stats_data.get('cluster_name'):
# retrieve the cluster name from the data, and append it to the
# master tag list.
cluster_name_tag = "cluster_name:{}".format(stats_data['cluster_name'])
base_tags.append(cluster_name_tag)
service_check_tags.append(cluster_name_tag)
self._process_stats_data(stats_data, stats_metrics, base_tags)
# Load cluster-wise data
# Note: this is a cluster-wide query, might TO.
if self.config.pshard_stats:
send_sc = bubble_ex = not self.config.pshard_graceful_to
pshard_stats_url = self._join_url(pshard_stats_url, admin_forwarder)
try:
pshard_stats_data = self._get_data(pshard_stats_url, send_sc=send_sc)
self._process_pshard_stats_data(pshard_stats_data, pshard_stats_metrics, base_tags)
except requests.ReadTimeout as e:
if bubble_ex:
raise
self.log.warning("Timed out reading pshard-stats from servers (%s) - stats will be missing", e)
# Get Snapshot Lifecycle Management (SLM) policies
if slm_url is not None:
slm_url = self._join_url(slm_url, admin_forwarder)
policy_data = self._get_data(slm_url)
self._process_policy_data(policy_data, version, base_tags)
# Load the health data.
health_url = self._join_url(health_url, admin_forwarder)
health_data = self._get_data(health_url)
self._process_health_data(health_data, version, base_tags, service_check_tags)
if self.config.pending_task_stats:
# Load the pending_tasks data.
pending_tasks_url = self._join_url(pending_tasks_url, admin_forwarder)
pending_tasks_data = self._get_data(pending_tasks_url)
self._process_pending_tasks_data(pending_tasks_data, base_tags)
if self.config.index_stats and version >= [1, 0, 0]:
try:
self._get_index_metrics(admin_forwarder, version, base_tags)
except requests.ReadTimeout as e:
self.log.warning("Timed out reading index stats from servers (%s) - stats will be missing", e)
# If we're here we did not have any ES conn issues
self.service_check(self.SERVICE_CHECK_CONNECT_NAME, AgentCheck.OK, tags=self.config.service_check_tags)
def _get_es_version(self):
"""
Get the running version of elasticsearch.
"""
try:
data = self._get_data(self.config.url, send_sc=False)
raw_version = data['version']['number']
self.set_metadata('version', raw_version)
# pre-release versions of elasticearch are suffixed with -rcX etc..
# peel that off so that the map below doesn't error out
raw_version = raw_version.split('-')[0]
version = [int(p) for p in raw_version.split('.')[0:3]]
except AuthenticationError:
raise
except Exception as e:
self.warning("Error while trying to get Elasticsearch version from %s %s", self.config.url, e)
version = [1, 0, 0]
self.log.debug("Elasticsearch version is %s", version)
return version
def _join_url(self, url, admin_forwarder=False):
"""
overrides `urlparse.urljoin` since it removes base url path
https://docs.python.org/2/library/urlparse.html#urlparse.urljoin
"""
if admin_forwarder:
return self.config.url + url
else:
return urljoin(self.config.url, url)
def _get_index_metrics(self, admin_forwarder, version, base_tags):
cat_url = '/_cat/indices?format=json&bytes=b'
index_url = self._join_url(cat_url, admin_forwarder)
index_resp = self._get_data(index_url)
index_stats_metrics = index_stats_for_version(version)
health_stat = {'green': 0, 'yellow': 1, 'red': 2}
reversed_health_stat = {'red': 0, 'yellow': 1, 'green': 2}
for idx in index_resp:
tags = base_tags + ['index_name:' + idx['index']]
# we need to remap metric names because the ones from elastic
# contain dots and that would confuse `_process_metric()` (sic)
index_data = {
'docs_count': idx.get('docs.count'),
'docs_deleted': idx.get('docs.deleted'),
'primary_shards': idx.get('pri'),
'replica_shards': idx.get('rep'),
'primary_store_size': idx.get('pri.store.size'),
'store_size': idx.get('store.size'),
'health': idx.get('health'),
}
# Convert the health status value
if index_data['health'] is not None:
status = index_data['health'].lower()
index_data['health'] = health_stat[status]
index_data['health_reverse'] = reversed_health_stat[status]
# Ensure that index_data does not contain None values
for key, value in list(iteritems(index_data)):
if value is None:
del index_data[key]
self.log.warning("The index %s has no metric data for %s", idx['index'], key)
for metric in index_stats_metrics:
# metric description
desc = index_stats_metrics[metric]
self._process_metric(index_data, metric, *desc, tags=tags)
def _get_urls(self, version):
"""
Compute the URLs we need to hit depending on the running ES version
"""
pshard_stats_url = "/_stats"
health_url = "/_cluster/health"
slm_url = None
if version >= [0, 90, 10]:
pending_tasks_url = "/_cluster/pending_tasks"
stats_url = "/_nodes/stats" if self.config.cluster_stats else "/_nodes/_local/stats"
if version < [5, 0, 0]:
# version 5 errors out if the `all` parameter is set
stats_url += "?all=true"
if version >= [7, 4, 0] and self.config.slm_stats:
slm_url = "/_slm/policy"
else:
# legacy
pending_tasks_url = None
stats_url = (
"/_cluster/nodes/stats?all=true"
if self.config.cluster_stats
else "/_cluster/nodes/_local/stats?all=true"
)
return health_url, stats_url, pshard_stats_url, pending_tasks_url, slm_url
def _get_data(self, url, send_sc=True):
"""
Hit a given URL and return the parsed json
"""
resp = None
try:
resp = self.http.get(url)
resp.raise_for_status()
except Exception as e:
# this means we've hit a particular kind of auth error that means the config is broken
if resp and resp.status_code == 400:
raise AuthenticationError("The ElasticSearch credentials are incorrect")
if send_sc:
self.service_check(
self.SERVICE_CHECK_CONNECT_NAME,
AgentCheck.CRITICAL,
message="Error {} when hitting {}".format(e, url),
tags=self.config.service_check_tags,
)
raise
self.log.debug("request to url %s returned: %s", url, resp)
return resp.json()
def _process_pending_tasks_data(self, data, base_tags):
p_tasks = defaultdict(int)
average_time_in_queue = 0
for task in data.get('tasks', []):
p_tasks[task.get('priority')] += 1
average_time_in_queue += task.get('time_in_queue_millis', 0)
total = sum(itervalues(p_tasks))
node_data = {
'pending_task_total': total,
'pending_tasks_priority_high': p_tasks['high'],
'pending_tasks_priority_urgent': p_tasks['urgent'],
# if total is 0 default to 1
'pending_tasks_time_in_queue': average_time_in_queue // (total or 1),
}
for metric in CLUSTER_PENDING_TASKS:
# metric description
desc = CLUSTER_PENDING_TASKS[metric]
self._process_metric(node_data, metric, *desc, tags=base_tags)
def _process_stats_data(self, data, stats_metrics, base_tags):
for node_data in itervalues(data.get('nodes', {})):
metric_hostname = None
metrics_tags = list(base_tags)
# Resolve the node's name
node_name = node_data.get('name')
if node_name:
metrics_tags.append('node_name:{}'.format(node_name))
# Resolve the node's hostname
if self.config.node_name_as_host:
if node_name:
metric_hostname = node_name
elif self.config.cluster_stats:
for k in ['hostname', 'host']:
if k in node_data:
metric_hostname = node_data[k]
break
for metric, desc in iteritems(stats_metrics):
self._process_metric(node_data, metric, *desc, tags=metrics_tags, hostname=metric_hostname)
def _process_pshard_stats_data(self, data, pshard_stats_metrics, base_tags):
for metric, desc in iteritems(pshard_stats_metrics):
self._process_metric(data, metric, *desc, tags=base_tags)
def _process_metric(self, data, metric, xtype, path, xform=None, tags=None, hostname=None):
"""
data: dictionary containing all the stats
metric: datadog metric
path: corresponding path in data, flattened, e.g. thread_pool.bulk.queue
xform: a lambda to apply to the numerical value
"""
value = data
# Traverse the nested dictionaries
for key in path.split('.'):
if value is not None:
value = value.get(key)
else:
break
if value is not None:
if xform:
value = xform(value)
if xtype == "gauge":
self.gauge(metric, value, tags=tags, hostname=hostname)
else:
self.rate(metric, value, tags=tags, hostname=hostname)
else:
self.log.debug("Metric not found: %s -> %s", path, metric)
def _process_health_data(self, data, version, base_tags, service_check_tags):
cluster_status = data.get('status')
if not self.cluster_status.get(self.config.url):
self.cluster_status[self.config.url] = cluster_status
if cluster_status in ["yellow", "red"]:
event = self._create_event(cluster_status, tags=base_tags)
self.event(event)
if cluster_status != self.cluster_status.get(self.config.url):
self.cluster_status[self.config.url] = cluster_status
event = self._create_event(cluster_status, tags=base_tags)
self.event(event)
cluster_health_metrics = health_stats_for_version(version)
for metric, desc in iteritems(cluster_health_metrics):
self._process_metric(data, metric, *desc, tags=base_tags)
# Process the service check
if cluster_status == 'green':
status = AgentCheck.OK
data['tag'] = "OK"
elif cluster_status == 'yellow':
status = AgentCheck.WARNING
data['tag'] = "WARN"
else:
status = AgentCheck.CRITICAL
data['tag'] = "ALERT"
msg = (
"{tag} on cluster \"{cluster_name}\" "
"| active_shards={active_shards} "
"| initializing_shards={initializing_shards} "
"| relocating_shards={relocating_shards} "
"| unassigned_shards={unassigned_shards} "
"| timed_out={timed_out}".format(
tag=data.get('tag'),
cluster_name=data.get('cluster_name'),
active_shards=data.get('active_shards'),
initializing_shards=data.get('initializing_shards'),
relocating_shards=data.get('relocating_shards'),
unassigned_shards=data.get('unassigned_shards'),
timed_out=data.get('timed_out'),
)
)
self.service_check(self.SERVICE_CHECK_CLUSTER_STATUS, status, message=msg, tags=service_check_tags)
def _process_policy_data(self, data, version, base_tags):
for policy, policy_data in iteritems(data):
repo = policy_data.get('policy', {}).get('repository', 'unknown')
tags = base_tags + ['policy:{}'.format(policy), 'repository:{}'.format(repo)]
slm_stats = slm_stats_for_version(version)
for metric, desc in iteritems(slm_stats):
self._process_metric(policy_data, metric, *desc, tags=tags)
def _create_event(self, status, tags=None):
hostname = to_string(self.hostname)
if status == "red":
alert_type = "error"
msg_title = "{} is {}".format(hostname, status)
elif status == "yellow":
alert_type = "warning"
msg_title = "{} is {}".format(hostname, status)
else:
# then it should be green
alert_type = "success"
msg_title = "{} recovered as {}".format(hostname, status)
msg = "ElasticSearch: {} just reported as {}".format(hostname, status)
return {
'timestamp': int(time.time()),
'event_type': 'elasticsearch',
'host': hostname,
'msg_text': msg,
'msg_title': msg_title,
'alert_type': alert_type,
'source_type_name': "elasticsearch",
'event_object': hostname,
'tags': tags,
}