Skip to content

Commit 2dfcba7

Browse files
committed
new tests for session pool
1 parent 4d576d0 commit 2dfcba7

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

tests/aio/query/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,6 @@ async def tx(session):
2929

3030

3131
@pytest.fixture
32-
def pool(driver):
33-
pool = QuerySessionPoolAsync(driver)
34-
yield pool
32+
async def pool(driver):
33+
async with QuerySessionPoolAsync(driver) as pool:
34+
yield pool

tests/aio/query/test_query_session_pool.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,45 @@ async def callee(session: QuerySessionAsync):
5353

5454
with pytest.raises(CustomException):
5555
await pool.retry_operation_async(callee)
56+
57+
58+
@pytest.mark.asyncio
59+
async def test_pool_size_limit_logic(self, pool: QuerySessionPoolAsync):
60+
target_size = 5
61+
pool._size = target_size
62+
ids = set()
63+
64+
for i in range(1, target_size + 1):
65+
session = await pool.acquire(timeout=0.5)
66+
assert pool._current_size == i
67+
assert session._state.session_id not in ids
68+
ids.add(session._state.session_id)
69+
70+
with pytest.raises(ydb.SessionPoolEmpty):
71+
await pool.acquire(timeout=0.5)
72+
73+
await pool.release(session)
74+
75+
session = await pool.acquire(timeout=0.5)
76+
assert pool._current_size == target_size
77+
assert session._state.session_id in ids
78+
79+
@pytest.mark.asyncio
80+
async def test_checkout_do_not_increase_size(self, pool: QuerySessionPoolAsync):
81+
session_id = None
82+
for _ in range(10):
83+
async with pool.checkout() as session:
84+
if session_id is None:
85+
session_id = session._state.session_id
86+
assert pool._current_size == 1
87+
assert session_id == session._state.session_id
88+
89+
@pytest.mark.asyncio
90+
async def test_pool_recreates_bad_sessions(self, pool: QuerySessionPoolAsync):
91+
async with pool.checkout() as session:
92+
session_id = session._state.session_id
93+
await session.delete()
94+
95+
async with pool.checkout() as session:
96+
assert session_id != session._state.session_id
97+
assert pool._current_size == 1

ydb/aio/query/pool.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ async def acquire(self, timeout: float) -> QuerySessionAsync:
5151
try:
5252
_, session = self._queue.get_nowait()
5353
logger.debug(f"Acquired active session from queue: {session._state.session_id}")
54-
return session
54+
return session if session._state.attached else await self._create_new_session()
5555
except asyncio.QueueEmpty:
5656
pass
5757

@@ -64,7 +64,7 @@ async def acquire(self, timeout: float) -> QuerySessionAsync:
6464
try:
6565
self._waiters += 1
6666
session = await self._get_session_with_timeout(timeout)
67-
return session
67+
return session if session._state.attached else await self._create_new_session()
6868
except asyncio.TimeoutError:
6969
raise issues.SessionPoolEmpty("Timeout on acquire session")
7070
finally:

0 commit comments

Comments
 (0)