Closed
Show file tree
Hide file tree
Changes from all commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Failed to load files.
Original file line numberDiff line numberDiff line change
Expand Up@@ -38,6 +38,7 @@
'statement_cache_size',
'max_cached_statement_lifetime',
'max_cacheable_statement_size',
'max_consecutive_exceptions',
])


Expand DownExpand Up@@ -210,6 +211,7 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, database,
timeout, command_timeout, statement_cache_size,
max_cached_statement_lifetime,
max_cacheable_statement_size,
max_consecutive_exceptions,
ssl, server_settings):

local_vars = locals()
Expand DownExpand Up@@ -245,7 +247,8 @@ def _parse_connect_arguments(*, dsn, host, port, user, password, database,
command_timeout=command_timeout,
statement_cache_size=statement_cache_size,
max_cached_statement_lifetime=max_cached_statement_lifetime,
max_cacheable_statement_size=max_cacheable_statement_size,)
max_cacheable_statement_size=max_cacheable_statement_size,
max_consecutive_exceptions=max_consecutive_exceptions,)

return addrs, params, config

Expand Down
Original file line numberDiff line numberDiff line change
Expand Up@@ -44,7 +44,7 @@ class Connection(metaclass=ConnectionMeta):
'_listeners', '_server_version', '_server_caps',
'_intro_query', '_reset_query', '_proxy',
'_stmt_exclusive_section', '_config', '_params', '_addr',
'_log_listeners', '_cancellations')
'_log_listeners', '_cancellations', '_consecutive_exceptions')

def __init__(self, protocol, transport, loop,
addr: (str, int) or str,
Expand DownExpand Up@@ -97,6 +97,7 @@ def __init__(self, protocol, transport, loop,
# Used for `con.fetchval()`, `con.fetch()`, `con.fetchrow()`,
# `con.execute()`, and `con.executemany()`.
self._stmt_exclusive_section = _Atomic()
self._consecutive_exceptions = 0

async def add_listener(self, channel, callback):
"""Add a listener for Postgres notifications.
Expand DownExpand Up@@ -1331,6 +1332,7 @@ async def _do_execute(self, query, executor, timeout, retry=True):
# It is not possible to recover (the statement is already done at
# the server's side), the only way is to drop our caches and
# reraise the exception to the caller.
#
await self.reload_schema_state()
raise
except exceptions.InvalidCachedStatementError:
Expand DownExpand Up@@ -1362,9 +1364,21 @@ async def _do_execute(self, query, executor, timeout, retry=True):
else:
return await self._do_execute(
query, executor, timeout, retry=False)
except:
await self._maybe_close_bad_connection()
raise

self._consecutive_exceptions = 0
return result, stmt

async def _maybe_close_bad_connection(self):
if self._config.max_consecutive_exceptions > 0:
self._consecutive_exceptions += 1

if self._consecutive_exceptions > \
self._config.max_consecutive_exceptions:
await self.close()


async def connect(dsn=None, *,
host=None, port=None,
Expand All@@ -1375,6 +1389,7 @@ async def connect(dsn=None, *,
statement_cache_size=100,
max_cached_statement_lifetime=300,
max_cacheable_statement_size=1024 * 15,
max_consecutive_exceptions=0,
command_timeout=None,
ssl=None,
connection_class=Connection,
Expand DownExpand Up@@ -1431,6 +1446,11 @@ async def connect(dsn=None, *,
default). Pass ``0`` to allow all statements to be cached
regardless of their size.

:param int max_consecutive_exceptions:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably important to allow users to disable this, ie. by sending 0 in the same way as many of the rest of these parameters.

@asyncpg/maintainers: personally, I think the default of 5ish is pretty reasonable here, but depending on your preference in backwards-compatibility we might want to default this to 0 for now behavior change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea with 0 setting

the maximum number of consecutive exceptions that may be raised by a
single connection before that connection is assumed corrupt (ex.
pointing to an old DB after a failover). Pass ``0`` to disable.

:param float command_timeout:
the default timeout for operations on this connection
(the default is ``None``: no timeout).
Expand DownExpand Up@@ -1495,7 +1515,8 @@ class of the returned connection object. Must be a subclass of
command_timeout=command_timeout,
statement_cache_size=statement_cache_size,
max_cached_statement_lifetime=max_cached_statement_lifetime,
max_cacheable_statement_size=max_cacheable_statement_size)
max_cacheable_statement_size=max_cacheable_statement_size,
max_consecutive_exceptions=max_consecutive_exceptions)


class _StatementCacheEntry:
Expand Down