Skip to content

Commit e1bafec

Browse files
committed
review fixes
1 parent b848168 commit e1bafec

File tree

2 files changed

+24
-6
lines changed

2 files changed

+24
-6
lines changed

ydb/aio/query/pool.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import functools
34
from typing import (
45
Callable,
56
Optional,
@@ -35,6 +36,7 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3536
self._queue = asyncio.Queue()
3637
self._current_size = 0
3738
self._waiters = 0
39+
self._loop = asyncio.get_running_loop()
3840

3941
async def _create_new_session(self):
4042
session = QuerySessionAsync(self._driver)
@@ -157,6 +159,12 @@ async def __aenter__(self):
157159
async def __aexit__(self, exc_type, exc_val, exc_tb):
158160
await self.stop()
159161

162+
def __del__(self):
163+
if self._should_stop.is_set() or self._loop.is_closed():
164+
return
165+
166+
self._loop.call_soon(functools.partial(self.stop))
167+
160168

161169
class SimpleQuerySessionCheckoutAsync:
162170
def __init__(self, pool: QuerySessionPoolAsync):

ydb/query/pool.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ def _create_new_session(self, timeout: float):
4747
return session
4848

4949
def acquire(self, timeout: float) -> QuerySessionSync:
50-
with self._lock:
50+
acquired = self._lock.acquire(timeout=timeout)
51+
try:
5152
if self._should_stop.is_set():
5253
logger.error("An attempt to take session from closed session pool.")
5354
raise RuntimeError("An attempt to take session from closed session pool.")
@@ -80,11 +81,13 @@ def acquire(self, timeout: float) -> QuerySessionSync:
8081

8182
self._current_size += 1
8283
return session
84+
finally:
85+
if acquired:
86+
self._lock.release()
8387

8488
def release(self, session: QuerySessionSync) -> None:
85-
with self._lock:
86-
self._queue.put_nowait(session)
87-
logger.debug("Session returned to queue: %s", session._state.session_id)
89+
self._queue.put_nowait(session)
90+
logger.debug("Session returned to queue: %s", session._state.session_id)
8891

8992
def checkout(self, timeout: float = 10) -> "SimpleQuerySessionCheckout":
9093
"""WARNING: This API is experimental and could be changed.
@@ -140,8 +143,9 @@ def wrapped_callee():
140143

141144
return retry_operation_sync(wrapped_callee, retry_settings)
142145

143-
def stop(self, timeout=None):
144-
with self._lock:
146+
def stop(self, timeout=-1):
147+
acquired = self._lock.acquire(timeout=timeout)
148+
try:
145149
self._should_stop.set()
146150
while True:
147151
try:
@@ -151,13 +155,19 @@ def stop(self, timeout=None):
151155
break
152156

153157
logger.debug("All session were deleted.")
158+
finally:
159+
if acquired:
160+
self._lock.release()
154161

155162
def __enter__(self):
156163
return self
157164

158165
def __exit__(self, exc_type, exc_val, exc_tb):
159166
self.stop()
160167

168+
def __del__(self):
169+
self.stop()
170+
161171

162172
class SimpleQuerySessionCheckout:
163173
def __init__(self, pool: QuerySessionPool, timeout: float):

0 commit comments

Comments
 (0)