Skip to content

Commit a812257

Browse files
committed
update async query session pool
1 parent a309fe5 commit a812257

File tree

1 file changed

+18
-16
lines changed

1 file changed

+18
-16
lines changed

ydb/aio/query/pool.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,31 +48,33 @@ async def acquire(self, timeout: float) -> QuerySessionAsync:
4848
logger.error("An attempt to take session from closed session pool.")
4949
raise RuntimeError("An attempt to take session from closed session pool.")
5050

51+
session = None
5152
try:
5253
_, session = self._queue.get_nowait()
54+
except asyncio.QueueEmpty:
55+
pass
56+
57+
if session is None and self._current_size == self._size:
58+
try:
59+
self._waiters += 1
60+
session = await self._get_session_with_timeout(timeout)
61+
except asyncio.TimeoutError:
62+
raise issues.SessionPoolEmpty("Timeout on acquire session")
63+
finally:
64+
self._waiters -= 1
65+
66+
if session is not None:
5367
if session._state.attached:
5468
logger.debug(f"Acquired active session from queue: {session._state.session_id}")
5569
return session
5670
else:
5771
self._current_size -= 1
5872
logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
59-
except asyncio.QueueEmpty:
60-
pass
61-
62-
if self._current_size < self._size:
63-
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
64-
session = await self._create_new_session()
65-
self._current_size += 1
66-
return session
6773

68-
try:
69-
self._waiters += 1
70-
session = await self._get_session_with_timeout(timeout)
71-
return session if session._state.attached else await self._create_new_session()
72-
except asyncio.TimeoutError:
73-
raise issues.SessionPoolEmpty("Timeout on acquire session")
74-
finally:
75-
self._waiters -= 1
74+
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
75+
session = await self._create_new_session()
76+
self._current_size += 1
77+
return session
7678

7779
async def _get_session_with_timeout(self, timeout: float):
7880
task_wait = asyncio.ensure_future(asyncio.wait_for(self._queue.get(), timeout=timeout))

0 commit comments

Comments
 (0)