# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
import functools
import inspect
import logging
import time
import warnings
from . import connection
from . import connect_utils
from . import exceptions
logger = logging.getLogger(__name__)
class PoolConnectionProxyMeta(type):
def __new__(mcls, name, bases, dct, *, wrap=False):
if wrap:
for attrname in dir(connection.Connection):
if attrname.startswith('_') or attrname in dct:
continue
meth = getattr(connection.Connection, attrname)
if not inspect.isfunction(meth):
continue
wrapper = mcls._wrap_connection_method(attrname)
wrapper = functools.update_wrapper(wrapper, meth)
dct[attrname] = wrapper
if '__doc__' not in dct:
dct['__doc__'] = connection.Connection.__doc__
return super().__new__(mcls, name, bases, dct)
def __init__(cls, name, bases, dct, *, wrap=False):
# Needed for Python 3.5 to handle `wrap` class keyword argument.
super().__init__(name, bases, dct)
@staticmethod
def _wrap_connection_method(meth_name):
def call_con_method(self, *args, **kwargs):
# This method will be owned by PoolConnectionProxy class.
if self._con is None:
raise exceptions.InterfaceError(
'cannot call Connection.{}(): '
'connection has been released back to the pool'.format(
meth_name))
meth = getattr(self._con.__class__, meth_name)
return meth(self._con, *args, **kwargs)
return call_con_method
class PoolConnectionProxy(connection._ConnectionProxy,
metaclass=PoolConnectionProxyMeta,
wrap=True):
__slots__ = ('_con', '_holder')
def __init__(self, holder: 'PoolConnectionHolder',
con: connection.Connection):
self._con = con
self._holder = holder
con._set_proxy(self)
def __getattr__(self, attr):
# Proxy all unresolved attributes to the wrapped Connection object.
return getattr(self._con, attr)
def _detach(self) -> connection.Connection:
if self._con is None:
return
con, self._con = self._con, None
con._set_proxy(None)
return con
def __repr__(self):
if self._con is None:
return '<{classname} [released] {id:#x}>'.format(
classname=self.__class__.__name__, id=id(self))
else:
return '<{classname} {con!r} {id:#x}>'.format(
classname=self.__class__.__name__, con=self._con, id=id(self))
class PoolConnectionHolder:
__slots__ = ('_con', '_pool', '_loop', '_proxy',
'_max_queries', '_setup',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout',
'_generation')
def __init__(self, pool, *, max_queries, setup, max_inactive_time):
self._pool = pool
self._con = None
self._proxy = None
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._setup = setup
self._inactive_callback = None
self._in_use = None # type: asyncio.Future
self._timeout = None
self._generation = None
async def connect(self):
if self._con is not None:
raise exceptions.InternalClientError(
'PoolConnectionHolder.connect() called while another '
'connection already exists')
self._con = await self._pool._get_new_connection()
self._generation = self._pool._generation
self._maybe_cancel_inactive_callback()
self._setup_inactive_callback()
async def acquire(self) -> PoolConnectionProxy:
if self._con is None or self._con.is_closed():
self._con = None
await self.connect()
elif self._generation != self._pool._generation:
# Connections have been expired, re-connect the holder.
self._pool._loop.create_task(
self._con.close(timeout=self._timeout))
self._con = None
await self.connect()
self._maybe_cancel_inactive_callback()
self._proxy = proxy = PoolConnectionProxy(self, self._con)
if self._setup is not None:
try:
await self._setup(proxy)
except Exception as ex:
# If a user-defined `setup` function fails, we don't
# know if the connection is safe for re-use, hence
# we close it. A new connection will be created
# when `acquire` is called again.
try:
# Use `close()` to close the connection gracefully.
# An exception in `setup` isn't necessarily caused
# by an IO or a protocol error. close() will
# do the necessary cleanup via _release_on_close().
await self._con.close()
finally:
raise ex
self._in_use = self._pool._loop.create_future()
return proxy
async def release(self, timeout):
if self._in_use is None:
raise exceptions.InternalClientError(
'PoolConnectionHolder.release() called on '
'a free connection holder')
if self._con.is_closed():
# When closing, pool connections perform the necessary
# cleanup, so we don't have to do anything else here.
return
self._timeout = None
if self._con._protocol.queries_count >= self._max_queries:
# The connection has reached its maximum utilization limit,
# so close it. Connection.close() will call _release().
await self._con.close(timeout=timeout)
return
if self._generation != self._pool._generation:
# The connection has expired because it belongs to
# an older generation (Pool.expire_connections() has
# been called.)
await self._con.close(timeout=timeout)
return
try:
budget = timeout
if self._con._protocol._is_cancelling():
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await asyncio.wait_for(
self._con._protocol._wait_for_cancellation(),
budget, loop=self._pool._loop)
if budget is not None:
budget -= time.monotonic() - started
await self._con.reset(timeout=budget)
except Exception as ex:
# If the `reset` call failed, terminate the connection.
# A new one will be created when `acquire` is called
# again.
try:
# An exception in `reset` is most likely caused by
# an IO error, so terminate the connection.
self._con.terminate()
finally:
raise ex
# Free this connection holder and invalidate the
# connection proxy.
self._release()
# Rearm the connection inactivity timer.
self._setup_inactive_callback()
async def wait_until_released(self):
if self._in_use is None:
return
else:
await self._in_use
async def close(self):
if self._con is not None:
# Connection.close() will call _release_on_close() to
# finish holder cleanup.
await self._con.close()
def terminate(self):
if self._con is not None:
# Connection.terminate() will call _release_on_close() to
# finish holder cleanup.
self._con.terminate()
def _setup_inactive_callback(self):
if self._inactive_callback is not None:
raise exceptions.InternalClientError(
'pool connection inactivity timer already exists')
if self._max_inactive_time:
self._inactive_callback = self._pool._loop.call_later(
self._max_inactive_time, self._deactivate_inactive_connection)
def _maybe_cancel_inactive_callback(self):
if self._inactive_callback is not None:
self._inactive_callback.cancel()
self._inactive_callback = None
def _deactivate_inactive_connection(self):
if self._in_use is not None:
raise exceptions.InternalClientError(
'attempting to deactivate an acquired connection')
if self._con is not None:
# The connection is idle and not in use, so it's fine to
# use terminate() instead of close().
self._con.terminate()
# Must call clear_connection, because _deactivate_connection
# is called when the connection is *not* checked out, and
# so terminate() above will not call the below.
self._release_on_close()
def _release_on_close(self):
self._maybe_cancel_inactive_callback()
self._release()
self._con = None
def _release(self):
"""Release this connection holder."""
if self._in_use is None:
# The holder is not checked out.
return
if not self._in_use.done():
self._in_use.set_result(None)
self._in_use = None
# Deinitialize the connection proxy. All subsequent
# operations on it will fail.
if self._proxy is not None:
self._proxy._detach()
self._proxy = None
# Put ourselves back to the pool queue.
self._pool._queue.put_nowait(self)
class Pool:
"""A connection pool.
Connection pool can be used to manage a set of connections to the database.
Connections are first acquired from the pool, then used, and then released
back to the pool. Once a connection is released, it's reset to close all
open cursors and other resources *except* prepared statements.
Pools are created by calling :func:`~asyncpg.pool.create_pool`.
"""
__slots__ = ('_queue', '_loop', '_minsize', '_maxsize',
'_init', '_connect_args', '_connect_kwargs',
'_working_addr', '_working_config', '_working_params',
'_holders', '_initialized', '_initializing', '_closing',
'_closed', '_connection_class', '_generation')
def __init__(self, *connect_args,
min_size,
max_size,
max_queries,
max_inactive_connection_lifetime,
setup,
init,
loop,
connection_class,
**connect_kwargs):
if len(connect_args) > 1:
warnings.warn(
"Passing multiple positional arguments to asyncpg.Pool "
"constructor is deprecated and will be removed in "
"asyncpg 0.17.0. The non-deprecated form is "
"asyncpg.Pool(<dsn>, **kwargs)",
DeprecationWarning, stacklevel=2)
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
if max_size <= 0:
raise ValueError('max_size is expected to be greater than zero')
if min_size < 0:
raise ValueError(
'min_size is expected to be greater or equal to zero')
if min_size > max_size:
raise ValueError('min_size is greater than max_size')
Loading ...