Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / asyncpg   python

Repository URL to install this package:

/ pool.py

# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0


import asyncio
import functools
import inspect
import time

from . import connection
from . import connect_utils
from . import exceptions


class PoolConnectionProxyMeta(type):

    def __new__(mcls, name, bases, dct, *, wrap=False):
        if wrap:
            for attrname in dir(connection.Connection):
                if attrname.startswith('_') or attrname in dct:
                    continue

                meth = getattr(connection.Connection, attrname)
                if not inspect.isfunction(meth):
                    continue

                wrapper = mcls._wrap_connection_method(attrname)
                wrapper = functools.update_wrapper(wrapper, meth)
                dct[attrname] = wrapper

            if '__doc__' not in dct:
                dct['__doc__'] = connection.Connection.__doc__

        return super().__new__(mcls, name, bases, dct)

    def __init__(cls, name, bases, dct, *, wrap=False):
        # Needed for Python 3.5 to handle `wrap` class keyword argument.
        super().__init__(name, bases, dct)

    @staticmethod
    def _wrap_connection_method(meth_name):
        def call_con_method(self, *args, **kwargs):
            # This method will be owned by PoolConnectionProxy class.
            if self._con is None:
                raise exceptions.InterfaceError(
                    'cannot call Connection.{}(): '
                    'connection has been released back to the pool'.format(
                        meth_name))

            meth = getattr(self._con.__class__, meth_name)
            return meth(self._con, *args, **kwargs)

        return call_con_method


class PoolConnectionProxy(connection._ConnectionProxy,
                          metaclass=PoolConnectionProxyMeta,
                          wrap=True):

    __slots__ = ('_con', '_holder')

    def __init__(self, holder: 'PoolConnectionHolder',
                 con: connection.Connection):
        self._con = con
        self._holder = holder
        con._set_proxy(self)

    def __getattr__(self, attr):
        # Proxy all unresolved attributes to the wrapped Connection object.
        return getattr(self._con, attr)

    def _detach(self) -> connection.Connection:
        if self._con is None:
            raise exceptions.InterfaceError(
                'cannot detach PoolConnectionProxy: already detached')

        con, self._con = self._con, None
        con._set_proxy(None)
        return con

    def __repr__(self):
        if self._con is None:
            return '<{classname} [released] {id:#x}>'.format(
                classname=self.__class__.__name__, id=id(self))
        else:
            return '<{classname} {con!r} {id:#x}>'.format(
                classname=self.__class__.__name__, con=self._con, id=id(self))


class PoolConnectionHolder:

    __slots__ = ('_con', '_pool', '_loop',
                 '_connect_args', '_connect_kwargs',
                 '_max_queries', '_setup', '_init',
                 '_max_inactive_time', '_in_use',
                 '_inactive_callback', '_timeout')

    def __init__(self, pool, *, connect_args, connect_kwargs,
                 max_queries, setup, init, max_inactive_time):

        self._pool = pool
        self._con = None

        self._connect_args = connect_args
        self._connect_kwargs = connect_kwargs
        self._max_queries = max_queries
        self._max_inactive_time = max_inactive_time
        self._setup = setup
        self._init = init
        self._inactive_callback = None
        self._in_use = False
        self._timeout = None

    async def connect(self):
        assert self._con is None

        if self._pool._working_addr is None:
            # First connection attempt on this pool.
            con = await connection.connect(
                *self._connect_args,
                loop=self._pool._loop,
                connection_class=self._pool._connection_class,
                **self._connect_kwargs)

            self._pool._working_addr = con._addr
            self._pool._working_config = con._config
            self._pool._working_params = con._params

        else:
            # We've connected before and have a resolved address,
            # and parsed options and config.
            con = await connect_utils._connect_addr(
                loop=self._pool._loop,
                addr=self._pool._working_addr,
                timeout=self._pool._working_params.connect_timeout,
                config=self._pool._working_config,
                params=self._pool._working_params,
                connection_class=self._pool._connection_class)

        if self._init is not None:
            await self._init(con)

        self._con = con

    async def acquire(self) -> PoolConnectionProxy:
        if self._con is None or self._con.is_closed():
            self._con = None
            await self.connect()

        self._maybe_cancel_inactive_callback()

        proxy = PoolConnectionProxy(self, self._con)

        if self._setup is not None:
            try:
                await self._setup(proxy)
            except Exception as ex:
                # If a user-defined `setup` function fails, we don't
                # know if the connection is safe for re-use, hence
                # we close it.  A new connection will be created
                # when `acquire` is called again.
                try:
                    proxy._detach()
                    # Use `close` to close the connection gracefully.
                    # An exception in `setup` isn't necessarily caused
                    # by an IO or a protocol error.
                    await self._con.close()
                finally:
                    self._con = None
                    raise ex

        self._in_use = True
        return proxy

    async def release(self, timeout):
        assert self._in_use
        self._in_use = False
        self._timeout = None

        if self._con.is_closed():
            self._con = None

        elif self._con._protocol.queries_count >= self._max_queries:
            try:
                await self._con.close(timeout=timeout)
            finally:
                self._con = None

        else:
            try:
                budget = timeout

                if self._con._protocol._is_cancelling():
                    # If the connection is in cancellation state,
                    # wait for the cancellation
                    started = time.monotonic()
                    await asyncio.wait_for(
                        self._con._protocol._wait_for_cancellation(),
                        budget, loop=self._pool._loop)
                    if budget is not None:
                        budget -= time.monotonic() - started

                await self._con.reset(timeout=budget)
            except Exception as ex:
                # If the `reset` call failed, terminate the connection.
                # A new one will be created when `acquire` is called
                # again.
                try:
                    # An exception in `reset` is most likely caused by
                    # an IO error, so terminate the connection.
                    self._con.terminate()
                finally:
                    self._con = None
                    raise ex

        assert self._inactive_callback is None
        if self._max_inactive_time and self._con is not None:
            self._inactive_callback = self._pool._loop.call_later(
                self._max_inactive_time, self._deactivate_connection)

    async def close(self):
        self._maybe_cancel_inactive_callback()
        if self._con is None:
            return
        if self._con.is_closed():
            self._con = None
            return

        try:
            await self._con.close()
        finally:
            self._con = None

    def terminate(self):
        self._maybe_cancel_inactive_callback()
        if self._con is None:
            return
        if self._con.is_closed():
            self._con = None
            return

        try:
            self._con.terminate()
        finally:
            self._con = None

    def _maybe_cancel_inactive_callback(self):
        if self._inactive_callback is not None:
            self._inactive_callback.cancel()
            self._inactive_callback = None

    def _deactivate_connection(self):
        assert not self._in_use
        if self._con is None or self._con.is_closed():
            return
        self._con.terminate()
        self._con = None


class Pool:
    """A connection pool.

    Connection pool can be used to manage a set of connections to the database.
    Connections are first acquired from the pool, then used, and then released
    back to the pool.  Once a connection is released, it's reset to close all
    open cursors and other resources *except* prepared statements.

    Pools are created by calling :func:`~asyncpg.pool.create_pool`.
    """

    __slots__ = ('_queue', '_loop', '_minsize', '_maxsize',
                 '_working_addr', '_working_config', '_working_params',
                 '_holders', '_initialized', '_closed',
                 '_connection_class')

    def __init__(self, *connect_args,
                 min_size,
                 max_size,
                 max_queries,
                 max_inactive_connection_lifetime,
                 setup,
                 init,
                 loop,
                 connection_class,
                 **connect_kwargs):

        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop

        if max_size <= 0:
            raise ValueError('max_size is expected to be greater than zero')

        if min_size < 0:
            raise ValueError(
                'min_size is expected to be greater or equal to zero')

        if min_size > max_size:
            raise ValueError('min_size is greater than max_size')

        if max_queries <= 0:
            raise ValueError('max_queries is expected to be greater than zero')

        if max_inactive_connection_lifetime < 0:
            raise ValueError(
                'max_inactive_connection_lifetime is expected to be greater '
                'or equal to zero')

        self._minsize = min_size
        self._maxsize = max_size

        self._holders = []
        self._initialized = False
        self._queue = asyncio.LifoQueue(maxsize=self._maxsize, loop=self._loop)

        self._working_addr = None
        self._working_config = None
        self._working_params = None

        self._connection_class = connection_class

        self._closed = False

        for _ in range(max_size):
            ch = PoolConnectionHolder(
                self,
                connect_args=connect_args,
                connect_kwargs=connect_kwargs,
                max_queries=max_queries,
                max_inactive_time=max_inactive_connection_lifetime,
                setup=setup,
                init=init)

            self._holders.append(ch)
            self._queue.put_nowait(ch)

    async def _async__init__(self):
        if self._initialized:
            return
        if self._closed:
            raise exceptions.InterfaceError('pool is closed')
Loading ...