diff --git a/CHANGELOG.md b/CHANGELOG.md index 67e4f9ca..62ee6a73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed leak sessions on asyncio timeout + ## 3.12.2 ## * Added support ydb github repo with own auth protobuf diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index dd969c7e..007c8a54 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -537,7 +537,11 @@ async def _send_loop(self, writer: "WriterAsyncIOStream"): m = await self._new_messages.get() # type: InternalMessage if m.seq_no > last_seq_no: writer.write([m]) - except Exception as e: + except asyncio.CancelledError: + # the loop task cancelled be parent code, for example for reconnection + # no need to stop all work. + raise + except BaseException as e: self._stop(e) raise diff --git a/ydb/aio/credentials.py b/ydb/aio/credentials.py index 18e1b7e0..08db1fd0 100644 --- a/ydb/aio/credentials.py +++ b/ydb/aio/credentials.py @@ -86,6 +86,10 @@ async def _refresh(self): await asyncio.sleep(1) self._tp.submit(self._refresh) + except BaseException as e: + self.last_error = str(e) + raise + async def token(self): current_time = time.time() if current_time > self._refresh_in: diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py index c637a7ca..c8fbb904 100644 --- a/ydb/aio/pool.py +++ b/ydb/aio/pool.py @@ -247,7 +247,7 @@ async def __call__( wait_timeout = settings.timeout if settings else 10 try: connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout) - except Exception: + except BaseException: self._discovery.notify_disconnected() raise diff --git a/ydb/aio/resolver.py b/ydb/aio/resolver.py index e8d27bac..623d11ca 100644 --- a/ydb/aio/resolver.py +++ b/ydb/aio/resolver.py @@ -30,7 +30,7 @@ async def resolve(self): connection = conn_impl.Connection(endpoint, self._driver_config) try: await connection.connection_ready() - except Exception: + except BaseException: self._add_debug_details( 'Failed to establish connection to YDB discovery endpoint: "%s". Check endpoint correctness.' % endpoint ) @@ -53,7 +53,7 @@ async def resolve(self): ) return resolved - except Exception as e: + except BaseException as e: self._add_debug_details( 'Failed to resolve endpoints for database %s. Endpoint: "%s". Error details:\n %s', diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 2a33cf78..3c25f7d2 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -221,7 +221,7 @@ async def retry_operation(callee, retry_settings=None, *args, **kwargs): # pyli else: try: return await next_opt.result - except Exception as e: # pylint: disable=W0703 + except BaseException as e: # pylint: disable=W0703 next_opt.set_exception(e) @@ -236,7 +236,7 @@ def __init__(self, pool, timeout, retry_timeout): :param blocking: A flag that specifies that session acquire method should blocks :param timeout: A timeout in seconds for session acquire """ - self._pool = pool + self._pool: SessionPool = pool self._acquired = None self._timeout = timeout self._retry_timeout = retry_timeout @@ -251,7 +251,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): class SessionPool: - def __init__(self, driver: ydb.pool.IConnectionPool, size: int, min_pool_size: int = 0): + def __init__(self, driver: "ydb.aio.Driver", size: int, min_pool_size: int = 0): self._driver_await_timeout = 3 self._should_stop = asyncio.Event() self._waiters = 0 @@ -286,7 +286,7 @@ async def wrapper_callee(): return await retry_operation(wrapper_callee, retry_settings) - def _create(self) -> ydb.ISession: + def _create(self) -> Session: self._active_count += 1 session = self._driver.table_client.session() self._logger.debug("Created session %s", session) @@ -301,6 +301,9 @@ async def _init_session_logic(self, session: ydb.ISession) -> typing.Optional[yd self._logger.error("Failed to create session. Reason: %s", str(e)) except Exception as e: # pylint: disable=W0703 self._logger.exception("Failed to create session. Reason: %s", str(e)) + except BaseException as e: # pylint: disable=W0703 + self._logger.exception("Failed to create session. Reason (base exception): %s", str(e)) + raise return None @@ -324,7 +327,7 @@ async def _prepare_session(self, timeout, retry_num) -> ydb.ISession: if not new_sess: self._destroy(session) return new_sess - except Exception as e: + except BaseException as e: self._destroy(session) raise e @@ -338,7 +341,7 @@ async def _get_session_from_queue(self, timeout: float): _, session = task_wait.result() return session - async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> ydb.ISession: + async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> Session: if self._should_stop.is_set(): self._logger.error("Take session from closed session pool") @@ -408,7 +411,10 @@ def _destroy(self, session: ydb.ISession, wait_for_del: bool = False): asyncio.ensure_future(coro) return None - async def release(self, session: ydb.ISession): + async def release(self, session: Session): + self._release_nowait(session) + + def _release_nowait(self, session: Session): self._logger.debug("Put on session %s", session.session_id) if session.closing(): self._destroy(session) @@ -421,7 +427,8 @@ async def release(self, session: ydb.ISession): self._destroy(session) return False - await self._active_queue.put((time.time() + 10 * 60, session)) + # self._active_queue has no size limit, it means that put_nowait will be successfully always + self._active_queue.put_nowait((time.time() + 10 * 60, session)) self._logger.debug("Session returned to queue: %s", session.session_id) async def _pick_for_keepalive(self): @@ -445,7 +452,7 @@ async def _send_keep_alive(self, session: ydb.ISession): await session.keep_alive(self._req_settings) try: await self.release(session) - except Exception: # pylint: disable=W0703 + except BaseException: # pylint: disable=W0703 self._destroy(session) async def _keep_alive_loop(self):