Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
supermeter / supermeter / managers / pymongo / pymongo_inst.py
Size: Mime:
from __future__ import absolute_import

try:
    from typing import Dict
except ImportError:
    pass

from supertenant.consts import INTEGRATION_MODULE_PYTHON_PYMONGO
from supertenant.supermeter.logger import log_instrumentation_skipped, log_instrumentation_success

try:
    import pymongo
    from bson import json_util
    from pymongo import monitoring

    from supertenant.supermeter.data import db_data
    from supertenant.supermeter.managers.db_manager import DBManager
    from supertenant.supermeter.scope_manager import Span

    cmd_doc_locations = {
        "insert": "documents",
        "update": "updates",
        "delete": "deletes",
        "aggregate": "pipeline",
    }

    _pymongo_version = getattr(pymongo, "__version__")

    class MongoCommandTracer(monitoring.CommandListener):
        def __init__(self):
            # type: () -> None
            self.__active_commands = {}  # type: Dict[int, Span]

        def started(self, event):
            # type: (monitoring.CommandStartedEvent) -> None
            before_data = db_data.MongoData(INTEGRATION_MODULE_PYTHON_PYMONGO)
            self._collect_connection_tags(before_data, event)
            self._collect_command_tags(before_data, event)
            if event.command_name in event.command:
                before_data.set_mongodb_collection(str(event.command.get(event.command_name)))
            span_id, _, _ = DBManager.open_span(before_data)
            if span_id is None:
                return
            with Span(span_id, db_data.MongoData(INTEGRATION_MODULE_PYTHON_PYMONGO)) as span:
                self.__active_commands[event.request_id] = span

        def succeeded(self, event):
            # type: (monitoring.CommandSucceededEvent) -> None
            self.__active_commands.pop(event.request_id, None)

        def failed(self, event):
            # type: (monitoring.CommandFailedEvent) -> None
            span = self.__active_commands.pop(event.request_id, None)
            if span is None:
                return

            span.finish_data.mark_error()

        def _collect_connection_tags(self, data, event):
            # type: (db_data.MongoData, monitoring.CommandStartedEvent) -> None
            (host, port) = event.connection_id

            data.set_host(host)
            data.set_port(str(port))
            data.set_db(event.database_name)
            data.set_integration_module_resource_id(host)

        def _collect_command_tags(self, data, event):
            # type: (db_data.MongoData, monitoring.CommandStartedEvent) -> None
            """
            Extract MongoDB command name and arguments and attach it to the span
            """
            cmd = event.command_name
            data.set_command(cmd)

            for key in ["filter", "query"]:
                if key in event.command:
                    data.set_mongodb_filter(json_util.dumps(event.command.get(key)))
                    break

            cmd_doc = None
            if cmd in cmd_doc_locations:
                cmd_doc = event.command.get(cmd_doc_locations[cmd])
            elif cmd.lower() == "mapreduce":
                cmd_doc = {"map": event.command.get("map"), "reduce": event.command.get("reduce")}

            if cmd_doc is not None:
                data.set_mongodb_command_json(json_util.dumps(cmd_doc))

    monitoring.register(MongoCommandTracer())
    log_instrumentation_success(INTEGRATION_MODULE_PYTHON_PYMONGO, _pymongo_version)

except ImportError as exc:
    log_instrumentation_skipped(INTEGRATION_MODULE_PYTHON_PYMONGO, "", {"exc": exc})