Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import with_statement

import hashlib
import threading
import time
import weakref

from pyramid.threadlocal import get_current_request

from pyramid_debugtoolbar.compat import json
from pyramid_debugtoolbar.compat import bytes_
from pyramid_debugtoolbar.compat import url_quote
from pyramid_debugtoolbar.compat import PY3
from pyramid_debugtoolbar.panels import DebugPanel
from pyramid_debugtoolbar.utils import format_sql
from pyramid_debugtoolbar.utils import STATIC_PATH
from pyramid_debugtoolbar.utils import ROOT_ROUTE_NAME

lock = threading.Lock()

def text(s):
    if PY3: # pragma: no cover
        return str(s)
    else:
        return unicode(s)

try:
    from sqlalchemy import event
    from sqlalchemy.engine.base import Engine

    @event.listens_for(Engine, "before_cursor_execute")
    def _before_cursor_execute(conn, cursor, stmt, params, context, execmany):
        setattr(conn, 'pdtb_start_timer', time.time())

    @event.listens_for(Engine, "after_cursor_execute")
    def _after_cursor_execute(conn, cursor, stmt, params, context, execmany):
        stop_timer = time.time()
        request = get_current_request()
        if request is not None and hasattr(request, 'pdtb_sqla_queries'):
            with lock:
                engines = request.registry.pdtb_sqla_engines
                engines[id(conn.engine)] = weakref.ref(conn.engine)
                queries = request.pdtb_sqla_queries
                duration = (stop_timer - conn.pdtb_start_timer) * 1000
                queries.append({
                    'engine_id': id(conn.engine),
                    'duration': duration,
                    'statement': stmt,
                    'parameters': params,
                    'context': context
                })
        delattr(conn, 'pdtb_start_timer')

    has_sqla = True
except ImportError:
    has_sqla = False

_ = lambda x: x


class SQLADebugPanel(DebugPanel):
    """
    Panel that displays the SQL generated by SQLAlchemy plus the time each
    SQL statement took in milliseconds.
    """
    name = 'sqlalchemy'
    template = 'pyramid_debugtoolbar.panels:templates/sqlalchemy.dbtmako'
    title = _('SQLAlchemy Queries')
    nav_title = _('SQLAlchemy')

    def __init__(self, request):
        self.queries = request.pdtb_sqla_queries = []
        if hasattr(request.registry, 'pdtb_sqla_engines'):
            self.engines = request.registry.pdtb_sqla_engines
        else:
            self.engines = request.registry.pdtb_sqla_engines = {}
        self.token = request.registry.pdtb_token
        self.pdtb_id = request.pdtb_id

    @property
    def has_content(self):
        if self.queries:
            return True
        else:
            return False

    @property
    def nav_subtitle(self):
        if self.queries:
            return "%d" % (len(self.queries))

    def process_response(self, response):
        data = []
        for index, query in enumerate(self.queries):
            stmt = query['statement']

            is_select = stmt.strip().lower().startswith('select')
            params = ''
            try:
                params = url_quote(json.dumps(query['parameters']))
            except TypeError:
                pass # object not JSON serializable
            except UnicodeDecodeError:
                pass # parameters contain non-utf8 (probably binary) data

            data.append({
                'engine_id': query['engine_id'],
                'duration': query['duration'],
                'sql': format_sql(stmt),
                'raw_sql': stmt,
                'parameters': query['parameters'],
                'params': params,
                'is_select': is_select,
                'context': query['context'],
                'query_index': index,
            })

        self.data = {
            'queries':data,
            'text':text,
            }

    def render_content(self, request):
        if not self.queries:
            return 'No queries in executed in request.'
        return super(SQLADebugPanel, self).render_content(request)

    def render_vars(self, request):
        return {
            'pdtb_id': self.pdtb_id,
            'route_url': request.route_url,
            'static_path': request.static_url(STATIC_PATH),
            'root_path': request.route_url(ROOT_ROUTE_NAME)
        }