Skip to content

Commit 7496942

Browse files
committed
sync query session pool
1 parent 4a5c58e commit 7496942

File tree

2 files changed

+119
-11
lines changed

2 files changed

+119
-11
lines changed

tests/query/test_query_session_pool.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ def test_checkout_provides_created_session(self, pool: QuerySessionPool):
99
with pool.checkout() as session:
1010
assert session._state._state == QuerySessionStateEnum.CREATED
1111

12-
assert session._state._state == QuerySessionStateEnum.CLOSED
13-
1412
def test_oneshot_query_normal(self, pool: QuerySessionPool):
1513
res = pool.execute_with_retries("select 1;")
1614
assert len(res) == 1
@@ -47,3 +45,58 @@ def callee(session: QuerySessionSync):
4745

4846
with pytest.raises(CustomException):
4947
pool.retry_operation_sync(callee)
48+
49+
def test_pool_size_limit_logic(self, pool: QuerySessionPool):
50+
target_size = 5
51+
pool._size = target_size
52+
ids = set()
53+
54+
for i in range(1, target_size + 1):
55+
session = pool.acquire(timeout=0.5)
56+
assert pool._current_size == i
57+
assert session._state.session_id not in ids
58+
ids.add(session._state.session_id)
59+
60+
with pytest.raises(ydb.SessionPoolEmpty):
61+
pool.acquire(timeout=0.5)
62+
63+
pool.release(session)
64+
65+
session = pool.acquire(timeout=0.5)
66+
assert pool._current_size == target_size
67+
assert session._state.session_id in ids
68+
69+
def test_checkout_do_not_increase_size(self, pool: QuerySessionPool):
70+
session_id = None
71+
for _ in range(10):
72+
with pool.checkout() as session:
73+
if session_id is None:
74+
session_id = session._state.session_id
75+
assert pool._current_size == 1
76+
assert session_id == session._state.session_id
77+
78+
def test_pool_recreates_bad_sessions(self, pool: QuerySessionPool):
79+
with pool.checkout() as session:
80+
session_id = session._state.session_id
81+
session.delete()
82+
83+
with pool.checkout() as session:
84+
assert session_id != session._state.session_id
85+
assert pool._current_size == 1
86+
87+
def test_acquire_from_closed_pool_raises(self, pool: QuerySessionPool):
88+
pool.stop()
89+
with pytest.raises(RuntimeError):
90+
pool.acquire(1)
91+
92+
def test_no_session_leak(self, driver_sync, docker_project):
93+
pool = ydb.QuerySessionPool(driver_sync, 1)
94+
docker_project.stop()
95+
try:
96+
pool.acquire(timeout=0.5)
97+
except ydb.Error:
98+
pass
99+
assert pool._current_size == 0
100+
101+
docker_project.start()
102+
pool.stop()

ydb/query/pool.py

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
Optional,
55
List,
66
)
7+
import threading
8+
import queue
79

810
from .session import (
911
QuerySessionSync,
@@ -12,6 +14,7 @@
1214
RetrySettings,
1315
retry_operation_sync,
1416
)
17+
from .. import issues
1518
from .. import convert
1619
from .._grpc.grpcwrapper import common_utils
1720

@@ -22,20 +25,61 @@
2225
class QuerySessionPool:
2326
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""
2427

25-
def __init__(self, driver: common_utils.SupportedDriverType):
28+
def __init__(self, driver: common_utils.SupportedDriverType, size: int = 10):
2629
"""
2730
:param driver: A driver instance
2831
"""
2932

3033
logger.warning("QuerySessionPool is an experimental API, which could be changed.")
3134
self._driver = driver
32-
33-
def checkout(self) -> "SimpleQuerySessionCheckout":
35+
self._queue = queue.PriorityQueue()
36+
self._current_size = 0
37+
self._size = size
38+
self._should_stop = threading.Event()
39+
self._lock = threading.RLock()
40+
41+
def _create_new_session(self):
42+
session = QuerySessionSync(self._driver)
43+
session.create()
44+
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
45+
return session
46+
47+
def acquire(self, timeout: float) -> QuerySessionSync:
48+
with self._lock:
49+
if self._should_stop.is_set():
50+
logger.error("An attempt to take session from closed session pool.")
51+
raise RuntimeError("An attempt to take session from closed session pool.")
52+
53+
try:
54+
_, session = self._queue.get_nowait()
55+
logger.debug(f"Acquired active session from queue: {session._state.session_id}")
56+
return session if session._state.attached else self._create_new_session()
57+
except queue.Empty:
58+
pass
59+
60+
if self._current_size < self._size:
61+
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
62+
session = self._create_new_session()
63+
self._current_size += 1
64+
return session
65+
66+
try:
67+
_, session = self._queue.get(block=True, timeout=timeout)
68+
return session if session._state.attached else self._create_new_session()
69+
except queue.Empty:
70+
raise issues.SessionPoolEmpty("Timeout on acquire session")
71+
72+
def release(self, session: QuerySessionSync) -> None:
73+
with self._lock:
74+
self._queue.put_nowait((1, session))
75+
logger.debug("Session returned to queue: %s", session._state.session_id)
76+
77+
def checkout(self, timeout: float = 10) -> "SimpleQuerySessionCheckout":
3478
"""WARNING: This API is experimental and could be changed.
3579
Return a Session context manager, that opens session on enter and closes session on exit.
3680
"""
3781

38-
return SimpleQuerySessionCheckout(self)
82+
return SimpleQuerySessionCheckout(self, timeout)
3983

4084
def retry_operation_sync(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs):
4185
"""WARNING: This API is experimental and could be changed.
@@ -85,7 +129,17 @@ def wrapped_callee():
85129
return retry_operation_sync(wrapped_callee, retry_settings)
86130

87131
def stop(self, timeout=None):
88-
pass # TODO: implement
132+
with self._lock:
133+
self._should_stop.set()
134+
while True:
135+
try:
136+
_, session = self._queue.get_nowait()
137+
session.delete()
138+
except queue.Empty:
139+
break
140+
141+
logger.debug("All session were deleted.")
142+
89143

90144
def __enter__(self):
91145
return self
@@ -95,13 +149,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
95149

96150

97151
class SimpleQuerySessionCheckout:
98-
def __init__(self, pool: QuerySessionPool):
152+
def __init__(self, pool: QuerySessionPool, timeout: float):
99153
self._pool = pool
100-
self._session = QuerySessionSync(pool._driver)
154+
self._timeout = timeout
155+
self._session = None
101156

102157
def __enter__(self) -> QuerySessionSync:
103-
self._session.create()
158+
self._session = self._pool.acquire(self._timeout)
104159
return self._session
105160

106161
def __exit__(self, exc_type, exc_val, exc_tb):
107-
self._session.delete()
162+
self._pool.release(self._session)

0 commit comments

Comments
 (0)