Skip to content

Commit de55153

Browse files
committed
review fixes
1 parent c237015 commit de55153

File tree

4 files changed

+28
-29
lines changed

4 files changed

+28
-29
lines changed

tests/aio/query/test_query_session_pool.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,14 @@ async def test_pool_size_limit_logic(self, pool: QuerySessionPoolAsync):
6868
ids.add(session._state.session_id)
6969

7070
with pytest.raises(asyncio.TimeoutError):
71-
await asyncio.wait_for(pool.acquire(), timeout=0.5)
71+
await asyncio.wait_for(pool.acquire(), timeout=0.1)
7272

73+
last_id = session._state.session_id
7374
await pool.release(session)
7475

7576
session = await pool.acquire()
77+
assert session._state.session_id == last_id
7678
assert pool._current_size == target_size
77-
assert session._state.session_id in ids
7879

7980
@pytest.mark.asyncio
8081
async def test_checkout_do_not_increase_size(self, pool: QuerySessionPoolAsync):
@@ -106,14 +107,14 @@ async def test_acquire_from_closed_pool_raises(self, pool: QuerySessionPoolAsync
106107
async def test_acquire_with_timeout_from_closed_pool_raises(self, pool: QuerySessionPoolAsync):
107108
await pool.stop()
108109
with pytest.raises(RuntimeError):
109-
await asyncio.wait_for(pool.acquire(), timeout=0.5)
110+
await asyncio.wait_for(pool.acquire(), timeout=0.1)
110111

111112
@pytest.mark.asyncio
112113
async def test_no_session_leak(self, driver, docker_project):
113114
pool = ydb.aio.QuerySessionPoolAsync(driver, 1)
114115
docker_project.stop()
115116
try:
116-
await asyncio.wait_for(pool.acquire(), timeout=0.5)
117+
await asyncio.wait_for(pool.acquire(), timeout=0.1)
117118
except ydb.Error:
118119
pass
119120
assert pool._current_size == 0

tests/query/test_query_session_pool.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,20 @@ def test_pool_size_limit_logic(self, pool: QuerySessionPool):
5252
ids = set()
5353

5454
for i in range(1, target_size + 1):
55-
session = pool.acquire(timeout=0.5)
55+
session = pool.acquire(timeout=0.1)
5656
assert pool._current_size == i
5757
assert session._state.session_id not in ids
5858
ids.add(session._state.session_id)
5959

6060
with pytest.raises(ydb.SessionPoolEmpty):
61-
pool.acquire(timeout=0.5)
61+
pool.acquire(timeout=0.1)
6262

63+
last_id = session._state.session_id
6364
pool.release(session)
6465

65-
session = pool.acquire(timeout=0.5)
66+
session = pool.acquire(timeout=0.1)
67+
assert session._state.session_id == last_id
6668
assert pool._current_size == target_size
67-
assert session._state.session_id in ids
6869

6970
def test_checkout_do_not_increase_size(self, pool: QuerySessionPool):
7071
session_id = None
@@ -93,7 +94,7 @@ def test_no_session_leak(self, driver_sync, docker_project):
9394
pool = ydb.QuerySessionPool(driver_sync, 1)
9495
docker_project.stop()
9596
try:
96-
pool.acquire(timeout=0.5)
97+
pool.acquire(timeout=0.1)
9798
except ydb.Error:
9899
pass
99100
assert pool._current_size == 0

ydb/aio/query/pool.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3232
self._driver = driver
3333
self._size = size
3434
self._should_stop = asyncio.Event()
35-
self._queue = asyncio.PriorityQueue()
35+
self._queue = asyncio.Queue()
3636
self._current_size = 0
3737
self._waiters = 0
3838

@@ -49,7 +49,7 @@ async def acquire(self) -> QuerySessionAsync:
4949

5050
session = None
5151
try:
52-
_, session = self._queue.get_nowait()
52+
session = self._queue.get_nowait()
5353
except asyncio.QueueEmpty:
5454
pass
5555

@@ -59,8 +59,9 @@ async def acquire(self) -> QuerySessionAsync:
5959
done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
6060
if task_stop in done:
6161
queue_get.cancel()
62-
return await self._create_new_session() # TODO: not sure why
63-
_, session = queue_get.result()
62+
return await self._create_new_session()
63+
task_stop.cancel()
64+
session = queue_get.result()
6465

6566
if session is not None:
6667
if session._state.attached:
@@ -76,15 +77,15 @@ async def acquire(self) -> QuerySessionAsync:
7677
return session
7778

7879
async def release(self, session: QuerySessionAsync) -> None:
79-
self._queue.put_nowait((1, session))
80+
self._queue.put_nowait(session)
8081
logger.debug("Session returned to queue: %s", session._state.session_id)
8182

82-
def checkout(self, timeout: Optional[float] = None) -> "SimpleQuerySessionCheckoutAsync":
83+
def checkout(self) -> "SimpleQuerySessionCheckoutAsync":
8384
"""WARNING: This API is experimental and could be changed.
8485
Return a Session context manager, that opens session on enter and closes session on exit.
8586
"""
8687

87-
return SimpleQuerySessionCheckoutAsync(self, timeout)
88+
return SimpleQuerySessionCheckoutAsync(self)
8889

8990
async def retry_operation_async(
9091
self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
@@ -135,13 +136,13 @@ async def wrapped_callee():
135136

136137
return await retry_operation_async(wrapped_callee, retry_settings)
137138

138-
async def stop(self, timeout=None):
139+
async def stop(self):
139140
self._should_stop.set()
140141

141142
tasks = []
142143
while True:
143144
try:
144-
_, session = self._queue.get_nowait()
145+
session = self._queue.get_nowait()
145146
tasks.append(session.delete())
146147
except asyncio.QueueEmpty:
147148
break
@@ -158,16 +159,12 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
158159

159160

160161
class SimpleQuerySessionCheckoutAsync:
161-
def __init__(self, pool: QuerySessionPoolAsync, timeout: Optional[float] = None):
162+
def __init__(self, pool: QuerySessionPoolAsync):
162163
self._pool = pool
163-
self._timeout = timeout
164164
self._session = None
165165

166166
async def __aenter__(self) -> QuerySessionAsync:
167-
if self._timeout and self._timeout > 0:
168-
self._session = await self._pool.acquire_with_timeout(self._timeout)
169-
else:
170-
self._session = await self._pool.acquire()
167+
self._session = await self._pool.acquire()
171168
return self._session
172169

173170
async def __aexit__(self, exc_type, exc_val, exc_tb):

ydb/query/pool.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3434

3535
logger.warning("QuerySessionPool is an experimental API, which could be changed.")
3636
self._driver = driver
37-
self._queue = queue.PriorityQueue()
37+
self._queue = queue.Queue()
3838
self._current_size = 0
3939
self._size = size
4040
self._should_stop = threading.Event()
@@ -54,14 +54,14 @@ def acquire(self, timeout: float) -> QuerySessionSync:
5454

5555
session = None
5656
try:
57-
_, session = self._queue.get_nowait()
57+
session = self._queue.get_nowait()
5858
except queue.Empty:
5959
pass
6060

6161
start = time.monotonic()
6262
if session is None and self._current_size == self._size:
6363
try:
64-
_, session = self._queue.get(block=True, timeout=timeout)
64+
session = self._queue.get(block=True, timeout=timeout)
6565
except queue.Empty:
6666
raise issues.SessionPoolEmpty("Timeout on acquire session")
6767

@@ -83,7 +83,7 @@ def acquire(self, timeout: float) -> QuerySessionSync:
8383

8484
def release(self, session: QuerySessionSync) -> None:
8585
with self._lock:
86-
self._queue.put_nowait((1, session))
86+
self._queue.put_nowait(session)
8787
logger.debug("Session returned to queue: %s", session._state.session_id)
8888

8989
def checkout(self, timeout: float = 10) -> "SimpleQuerySessionCheckout":
@@ -145,7 +145,7 @@ def stop(self, timeout=None):
145145
self._should_stop.set()
146146
while True:
147147
try:
148-
_, session = self._queue.get_nowait()
148+
session = self._queue.get_nowait()
149149
session.delete()
150150
except queue.Empty:
151151
break

0 commit comments

Comments
 (0)