# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
import asyncio
import functools
import inspect
import time
from . import connection
from . import connect_utils
from . import exceptions
class PoolConnectionProxyMeta(type):
def __new__(mcls, name, bases, dct, *, wrap=False):
if wrap:
for attrname in dir(connection.Connection):
if attrname.startswith('_') or attrname in dct:
continue
meth = getattr(connection.Connection, attrname)
if not inspect.isfunction(meth):
continue
wrapper = mcls._wrap_connection_method(attrname)
wrapper = functools.update_wrapper(wrapper, meth)
dct[attrname] = wrapper
if '__doc__' not in dct:
dct['__doc__'] = connection.Connection.__doc__
return super().__new__(mcls, name, bases, dct)
def __init__(cls, name, bases, dct, *, wrap=False):
# Needed for Python 3.5 to handle `wrap` class keyword argument.
super().__init__(name, bases, dct)
@staticmethod
def _wrap_connection_method(meth_name):
def call_con_method(self, *args, **kwargs):
# This method will be owned by PoolConnectionProxy class.
if self._con is None:
raise exceptions.InterfaceError(
'cannot call Connection.{}(): '
'connection has been released back to the pool'.format(
meth_name))
meth = getattr(self._con.__class__, meth_name)
return meth(self._con, *args, **kwargs)
return call_con_method
class PoolConnectionProxy(connection._ConnectionProxy,
metaclass=PoolConnectionProxyMeta,
wrap=True):
__slots__ = ('_con', '_holder')
def __init__(self, holder: 'PoolConnectionHolder',
con: connection.Connection):
self._con = con
self._holder = holder
con._set_proxy(self)
def __getattr__(self, attr):
# Proxy all unresolved attributes to the wrapped Connection object.
return getattr(self._con, attr)
def _detach(self) -> connection.Connection:
if self._con is None:
raise exceptions.InterfaceError(
'cannot detach PoolConnectionProxy: already detached')
con, self._con = self._con, None
con._set_proxy(None)
return con
def __repr__(self):
if self._con is None:
return '<{classname} [released] {id:#x}>'.format(
classname=self.__class__.__name__, id=id(self))
else:
return '<{classname} {con!r} {id:#x}>'.format(
classname=self.__class__.__name__, con=self._con, id=id(self))
class PoolConnectionHolder:
__slots__ = ('_con', '_pool', '_loop',
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback', '_timeout')
def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init, max_inactive_time):
self._pool = pool
self._con = None
self._connect_args = connect_args
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._setup = setup
self._init = init
self._inactive_callback = None
self._in_use = False
self._timeout = None
async def connect(self):
assert self._con is None
if self._pool._working_addr is None:
# First connection attempt on this pool.
con = await connection.connect(
*self._connect_args,
loop=self._pool._loop,
connection_class=self._pool._connection_class,
**self._connect_kwargs)
self._pool._working_addr = con._addr
self._pool._working_config = con._config
self._pool._working_params = con._params
else:
# We've connected before and have a resolved address,
# and parsed options and config.
con = await connect_utils._connect_addr(
loop=self._pool._loop,
addr=self._pool._working_addr,
timeout=self._pool._working_params.connect_timeout,
config=self._pool._working_config,
params=self._pool._working_params,
connection_class=self._pool._connection_class)
if self._init is not None:
await self._init(con)
self._con = con
async def acquire(self) -> PoolConnectionProxy:
if self._con is None or self._con.is_closed():
self._con = None
await self.connect()
self._maybe_cancel_inactive_callback()
proxy = PoolConnectionProxy(self, self._con)
if self._setup is not None:
try:
await self._setup(proxy)
except Exception as ex:
# If a user-defined `setup` function fails, we don't
# know if the connection is safe for re-use, hence
# we close it. A new connection will be created
# when `acquire` is called again.
try:
proxy._detach()
# Use `close` to close the connection gracefully.
# An exception in `setup` isn't necessarily caused
# by an IO or a protocol error.
await self._con.close()
finally:
self._con = None
raise ex
self._in_use = True
return proxy
async def release(self, timeout):
assert self._in_use
self._in_use = False
self._timeout = None
if self._con.is_closed():
self._con = None
elif self._con._protocol.queries_count >= self._max_queries:
try:
await self._con.close(timeout=timeout)
finally:
self._con = None
else:
try:
budget = timeout
if self._con._protocol._is_cancelling():
# If the connection is in cancellation state,
# wait for the cancellation
started = time.monotonic()
await asyncio.wait_for(
self._con._protocol._wait_for_cancellation(),
budget, loop=self._pool._loop)
if budget is not None:
budget -= time.monotonic() - started
await self._con.reset(timeout=budget)
except Exception as ex:
# If the `reset` call failed, terminate the connection.
# A new one will be created when `acquire` is called
# again.
try:
# An exception in `reset` is most likely caused by
# an IO error, so terminate the connection.
self._con.terminate()
finally:
self._con = None
raise ex
assert self._inactive_callback is None
if self._max_inactive_time and self._con is not None:
self._inactive_callback = self._pool._loop.call_later(
self._max_inactive_time, self._deactivate_connection)
async def close(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
self._con = None
return
try:
await self._con.close()
finally:
self._con = None
def terminate(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
self._con = None
return
try:
self._con.terminate()
finally:
self._con = None
def _maybe_cancel_inactive_callback(self):
if self._inactive_callback is not None:
self._inactive_callback.cancel()
self._inactive_callback = None
def _deactivate_connection(self):
assert not self._in_use
if self._con is None or self._con.is_closed():
return
self._con.terminate()
self._con = None
class Pool:
"""A connection pool.
Connection pool can be used to manage a set of connections to the database.
Connections are first acquired from the pool, then used, and then released
back to the pool. Once a connection is released, it's reset to close all
open cursors and other resources *except* prepared statements.
Pools are created by calling :func:`~asyncpg.pool.create_pool`.
"""
__slots__ = ('_queue', '_loop', '_minsize', '_maxsize',
'_working_addr', '_working_config', '_working_params',
'_holders', '_initialized', '_closed',
'_connection_class')
def __init__(self, *connect_args,
min_size,
max_size,
max_queries,
max_inactive_connection_lifetime,
setup,
init,
loop,
connection_class,
**connect_kwargs):
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
if max_size <= 0:
raise ValueError('max_size is expected to be greater than zero')
if min_size < 0:
raise ValueError(
'min_size is expected to be greater or equal to zero')
if min_size > max_size:
raise ValueError('min_size is greater than max_size')
if max_queries <= 0:
raise ValueError('max_queries is expected to be greater than zero')
if max_inactive_connection_lifetime < 0:
raise ValueError(
'max_inactive_connection_lifetime is expected to be greater '
'or equal to zero')
self._minsize = min_size
self._maxsize = max_size
self._holders = []
self._initialized = False
self._queue = asyncio.LifoQueue(maxsize=self._maxsize, loop=self._loop)
self._working_addr = None
self._working_config = None
self._working_params = None
self._connection_class = connection_class
self._closed = False
for _ in range(max_size):
ch = PoolConnectionHolder(
self,
connect_args=connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
setup=setup,
init=init)
self._holders.append(ch)
self._queue.put_nowait(ch)
async def _async__init__(self):
if self._initialized:
return
if self._closed:
raise exceptions.InterfaceError('pool is closed')
Loading ...