Repository URL to install this package:
|
Version:
0.8.1 ▾
|
from __future__ import absolute_import
try:
from typing import Dict
except ImportError:
pass
from supertenant.consts import INTEGRATION_MODULE_PYTHON_PYMONGO
from supertenant.supermeter.logger import log_instrumentation_skipped, log_instrumentation_success
try:
import pymongo
from bson import json_util
from pymongo import monitoring
from supertenant.supermeter.data import db_data
from supertenant.supermeter.managers.db_manager import DBManager
from supertenant.supermeter.scope_manager import Span
cmd_doc_locations = {
"insert": "documents",
"update": "updates",
"delete": "deletes",
"aggregate": "pipeline",
}
_pymongo_version = getattr(pymongo, "__version__")
class MongoCommandTracer(monitoring.CommandListener):
def __init__(self):
# type: () -> None
self.__active_commands = {} # type: Dict[int, Span]
def started(self, event):
# type: (monitoring.CommandStartedEvent) -> None
before_data = db_data.MongoData(INTEGRATION_MODULE_PYTHON_PYMONGO)
self._collect_connection_tags(before_data, event)
self._collect_command_tags(before_data, event)
if event.command_name in event.command:
before_data.set_mongodb_collection(str(event.command.get(event.command_name)))
span_id, _, _ = DBManager.open_span(before_data)
if span_id is None:
return
with Span(span_id, db_data.MongoData(INTEGRATION_MODULE_PYTHON_PYMONGO)) as span:
self.__active_commands[event.request_id] = span
def succeeded(self, event):
# type: (monitoring.CommandSucceededEvent) -> None
self.__active_commands.pop(event.request_id, None)
def failed(self, event):
# type: (monitoring.CommandFailedEvent) -> None
span = self.__active_commands.pop(event.request_id, None)
if span is None:
return
span.finish_data.mark_error()
def _collect_connection_tags(self, data, event):
# type: (db_data.MongoData, monitoring.CommandStartedEvent) -> None
(host, port) = event.connection_id
data.set_host(host)
data.set_port(str(port))
data.set_db(event.database_name)
data.set_integration_module_resource_id(host)
def _collect_command_tags(self, data, event):
# type: (db_data.MongoData, monitoring.CommandStartedEvent) -> None
"""
Extract MongoDB command name and arguments and attach it to the span
"""
cmd = event.command_name
data.set_command(cmd)
for key in ["filter", "query"]:
if key in event.command:
data.set_mongodb_filter(json_util.dumps(event.command.get(key)))
break
cmd_doc = None
if cmd in cmd_doc_locations:
cmd_doc = event.command.get(cmd_doc_locations[cmd])
elif cmd.lower() == "mapreduce":
cmd_doc = {"map": event.command.get("map"), "reduce": event.command.get("reduce")}
if cmd_doc is not None:
data.set_mongodb_command_json(json_util.dumps(cmd_doc))
monitoring.register(MongoCommandTracer())
log_instrumentation_success(INTEGRATION_MODULE_PYTHON_PYMONGO, _pymongo_version)
except ImportError as exc:
log_instrumentation_skipped(INTEGRATION_MODULE_PYTHON_PYMONGO, "", {"exc": exc})