Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ pool.py

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


import asyncio
import functools
import inspect
import logging
import time
import warnings

from . import connection
from . import connect_utils
from . import exceptions


logger = logging.getLogger(__name__)


class PoolConnectionProxyMeta(type):

    def __new__(mcls, name, bases, dct, *, wrap=False):
        if wrap:
            for attrname in dir(connection.Connection):
                if attrname.startswith('_') or attrname in dct:
                    continue

                meth = getattr(connection.Connection, attrname)
                if not inspect.isfunction(meth):
                    continue

                wrapper = mcls._wrap_connection_method(attrname)
                wrapper = functools.update_wrapper(wrapper, meth)
                dct[attrname] = wrapper

            if '__doc__' not in dct:
                dct['__doc__'] = connection.Connection.__doc__

        return super().__new__(mcls, name, bases, dct)

    def __init__(cls, name, bases, dct, *, wrap=False):
        # Needed for Python 3.5 to handle `wrap` class keyword argument.
        super().__init__(name, bases, dct)

    @staticmethod
    def _wrap_connection_method(meth_name):
        def call_con_method(self, *args, **kwargs):
            # This method will be owned by PoolConnectionProxy class.
            if self._con is None:
                raise exceptions.InterfaceError(
                    'cannot call Connection.{}(): '
                    'connection has been released back to the pool'.format(
                        meth_name))

            meth = getattr(self._con.__class__, meth_name)
            return meth(self._con, *args, **kwargs)

        return call_con_method


class PoolConnectionProxy(connection._ConnectionProxy,
                          metaclass=PoolConnectionProxyMeta,
                          wrap=True):

    __slots__ = ('_con', '_holder')

    def __init__(self, holder: 'PoolConnectionHolder',
                 con: connection.Connection):
        self._con = con
        self._holder = holder
        con._set_proxy(self)

    def __getattr__(self, attr):
        # Proxy all unresolved attributes to the wrapped Connection object.
        return getattr(self._con, attr)

    def _detach(self) -> connection.Connection:
        if self._con is None:
            return

        con, self._con = self._con, None
        con._set_proxy(None)
        return con

    def __repr__(self):
        if self._con is None:
            return '<{classname} [released] {id:#x}>'.format(
                classname=self.__class__.__name__, id=id(self))
        else:
            return '<{classname} {con!r} {id:#x}>'.format(
                classname=self.__class__.__name__, con=self._con, id=id(self))


class PoolConnectionHolder:

    __slots__ = ('_con', '_pool', '_loop', '_proxy',
                 '_max_queries', '_setup',
                 '_max_inactive_time', '_in_use',
                 '_inactive_callback', '_timeout',
                 '_generation')

    def __init__(self, pool, *, max_queries, setup, max_inactive_time):

        self._pool = pool
        self._con = None
        self._proxy = None

        self._max_queries = max_queries
        self._max_inactive_time = max_inactive_time
        self._setup = setup
        self._inactive_callback = None
        self._in_use = None  # type: asyncio.Future
        self._timeout = None
        self._generation = None

    async def connect(self):
        if self._con is not None:
            raise exceptions.InternalClientError(
                'PoolConnectionHolder.connect() called while another '
                'connection already exists')

        self._con = await self._pool._get_new_connection()
        self._generation = self._pool._generation
        self._maybe_cancel_inactive_callback()
        self._setup_inactive_callback()

    async def acquire(self) -> PoolConnectionProxy:
        if self._con is None or self._con.is_closed():
            self._con = None
            await self.connect()

        elif self._generation != self._pool._generation:
            # Connections have been expired, re-connect the holder.
            self._pool._loop.create_task(
                self._con.close(timeout=self._timeout))
            self._con = None
            await self.connect()

        self._maybe_cancel_inactive_callback()

        self._proxy = proxy = PoolConnectionProxy(self, self._con)

        if self._setup is not None:
            try:
                await self._setup(proxy)
            except Exception as ex:
                # If a user-defined `setup` function fails, we don't
                # know if the connection is safe for re-use, hence
                # we close it.  A new connection will be created
                # when `acquire` is called again.
                try:
                    # Use `close()` to close the connection gracefully.
                    # An exception in `setup` isn't necessarily caused
                    # by an IO or a protocol error.  close() will
                    # do the necessary cleanup via _release_on_close().
                    await self._con.close()
                finally:
                    raise ex

        self._in_use = self._pool._loop.create_future()

        return proxy

    async def release(self, timeout):
        if self._in_use is None:
            raise exceptions.InternalClientError(
                'PoolConnectionHolder.release() called on '
                'a free connection holder')

        if self._con.is_closed():
            # When closing, pool connections perform the necessary
            # cleanup, so we don't have to do anything else here.
            return

        self._timeout = None

        if self._con._protocol.queries_count >= self._max_queries:
            # The connection has reached its maximum utilization limit,
            # so close it.  Connection.close() will call _release().
            await self._con.close(timeout=timeout)
            return

        if self._generation != self._pool._generation:
            # The connection has expired because it belongs to
            # an older generation (Pool.expire_connections() has
            # been called.)
            await self._con.close(timeout=timeout)
            return

        try:
            budget = timeout

            if self._con._protocol._is_cancelling():
                # If the connection is in cancellation state,
                # wait for the cancellation
                started = time.monotonic()
                await asyncio.wait_for(
                    self._con._protocol._wait_for_cancellation(),
                    budget, loop=self._pool._loop)
                if budget is not None:
                    budget -= time.monotonic() - started

            await self._con.reset(timeout=budget)
        except Exception as ex:
            # If the `reset` call failed, terminate the connection.
            # A new one will be created when `acquire` is called
            # again.
            try:
                # An exception in `reset` is most likely caused by
                # an IO error, so terminate the connection.
                self._con.terminate()
            finally:
                raise ex

        # Free this connection holder and invalidate the
        # connection proxy.
        self._release()

        # Rearm the connection inactivity timer.
        self._setup_inactive_callback()

    async def wait_until_released(self):
        if self._in_use is None:
            return
        else:
            await self._in_use

    async def close(self):
        if self._con is not None:
            # Connection.close() will call _release_on_close() to
            # finish holder cleanup.
            await self._con.close()

    def terminate(self):
        if self._con is not None:
            # Connection.terminate() will call _release_on_close() to
            # finish holder cleanup.
            self._con.terminate()

    def _setup_inactive_callback(self):
        if self._inactive_callback is not None:
            raise exceptions.InternalClientError(
                'pool connection inactivity timer already exists')

        if self._max_inactive_time:
            self._inactive_callback = self._pool._loop.call_later(
                self._max_inactive_time, self._deactivate_inactive_connection)

    def _maybe_cancel_inactive_callback(self):
        if self._inactive_callback is not None:
            self._inactive_callback.cancel()
            self._inactive_callback = None

    def _deactivate_inactive_connection(self):
        if self._in_use is not None:
            raise exceptions.InternalClientError(
                'attempting to deactivate an acquired connection')

        if self._con is not None:
            # The connection is idle and not in use, so it's fine to
            # use terminate() instead of close().
            self._con.terminate()
            # Must call clear_connection, because _deactivate_connection
            # is called when the connection is *not* checked out, and
            # so terminate() above will not call the below.
            self._release_on_close()

    def _release_on_close(self):
        self._maybe_cancel_inactive_callback()
        self._release()
        self._con = None

    def _release(self):
        """Release this connection holder."""
        if self._in_use is None:
            # The holder is not checked out.
            return

        if not self._in_use.done():
            self._in_use.set_result(None)
        self._in_use = None

        # Deinitialize the connection proxy.  All subsequent
        # operations on it will fail.
        if self._proxy is not None:
            self._proxy._detach()
            self._proxy = None

        # Put ourselves back to the pool queue.
        self._pool._queue.put_nowait(self)


class Pool:
    """A connection pool.

    Connection pool can be used to manage a set of connections to the database.
    Connections are first acquired from the pool, then used, and then released
    back to the pool.  Once a connection is released, it's reset to close all
    open cursors and other resources *except* prepared statements.

    Pools are created by calling :func:`~asyncpg.pool.create_pool`.
    """

    __slots__ = ('_queue', '_loop', '_minsize', '_maxsize',
                 '_init', '_connect_args', '_connect_kwargs',
                 '_working_addr', '_working_config', '_working_params',
                 '_holders', '_initialized', '_initializing', '_closing',
                 '_closed', '_connection_class', '_generation')

    def __init__(self, *connect_args,
                 min_size,
                 max_size,
                 max_queries,
                 max_inactive_connection_lifetime,
                 setup,
                 init,
                 loop,
                 connection_class,
                 **connect_kwargs):

        if len(connect_args) > 1:
            warnings.warn(
                "Passing multiple positional arguments to asyncpg.Pool "
                "constructor is deprecated and will be removed in "
                "asyncpg 0.17.0.  The non-deprecated form is "
                "asyncpg.Pool(<dsn>, **kwargs)",
                DeprecationWarning, stacklevel=2)

        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop

        if max_size <= 0:
            raise ValueError('max_size is expected to be greater than zero')

        if min_size < 0:
            raise ValueError(
                'min_size is expected to be greater or equal to zero')

        if min_size > max_size:
            raise ValueError('min_size is greater than max_size')
Loading ...