Skip to content

Commit e37cae2

Browse files
committed
query session pool timeout refactor
1 parent 40d0d08 commit e37cae2

File tree

3 files changed

+31
-20
lines changed

3 files changed

+31
-20
lines changed

ydb/aio/query/session.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from .transaction import QueryTxContextAsync
99
from .. import _utilities
1010
from ... import issues
11+
from ...settings import BaseRequestSettings
1112
from ..._grpc.grpcwrapper import common_utils
1213
from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
1314

@@ -62,7 +63,7 @@ async def _check_session_status_loop(self) -> None:
6263
self._state.reset()
6364
self._state._change_state(QuerySessionStateEnum.CLOSED)
6465

65-
async def delete(self) -> None:
66+
async def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
6667
"""WARNING: This API is experimental and could be changed.
6768
6869
Deletes a Session of Query Service on server side and releases resources.
@@ -73,10 +74,10 @@ async def delete(self) -> None:
7374
return
7475

7576
self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
76-
await self._delete_call()
77+
await self._delete_call(settings=settings)
7778
self._stream.cancel()
7879

79-
async def create(self) -> "QuerySessionAsync":
80+
async def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessionAsync":
8081
"""WARNING: This API is experimental and could be changed.
8182
8283
Creates a Session of Query Service on server side and attaches it.
@@ -87,7 +88,7 @@ async def create(self) -> "QuerySessionAsync":
8788
return
8889

8990
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
90-
await self._create_call()
91+
await self._create_call(settings=settings)
9192
await self._attach()
9293

9394
return self
@@ -110,6 +111,7 @@ async def execute(
110111
syntax: base.QuerySyntax = None,
111112
exec_mode: base.QueryExecMode = None,
112113
concurrent_result_sets: bool = False,
114+
settings: Optional[BaseRequestSettings] = None,
113115
) -> AsyncResponseContextIterator:
114116
"""WARNING: This API is experimental and could be changed.
115117
@@ -132,6 +134,7 @@ async def execute(
132134
exec_mode=exec_mode,
133135
parameters=parameters,
134136
concurrent_result_sets=concurrent_result_sets,
137+
settings=settings,
135138
)
136139

137140
return AsyncResponseContextIterator(

ydb/query/pool.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from .. import issues
1919
from .. import convert
20+
from ..settings import BaseRequestSettings
2021
from .._grpc.grpcwrapper import common_utils
2122

2223

@@ -39,9 +40,9 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
3940
self._should_stop = threading.Event()
4041
self._lock = threading.RLock()
4142

42-
def _create_new_session(self):
43+
def _create_new_session(self, timeout: float):
4344
session = QuerySessionSync(self._driver)
44-
session.create()
45+
session.create(settings=BaseRequestSettings().with_timeout(timeout))
4546
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
4647
return session
4748

@@ -73,12 +74,9 @@ def acquire(self, timeout: float) -> QuerySessionSync:
7374
logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
7475

7576
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
76-
session = self._create_new_session()
77-
7877
finish = time.monotonic()
79-
if finish - start > timeout:
80-
session.delete()
81-
raise issues.SessionPoolEmpty("Timeout on acquire session")
78+
time_left = timeout - (finish - start)
79+
session = self._create_new_session(time_left)
8280

8381
self._current_size += 1
8482
return session

ydb/query/session.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import abc
22
import enum
33
import logging
4+
import time
45
import threading
56
from typing import (
67
Iterable,
@@ -10,6 +11,7 @@
1011
from . import base
1112

1213
from .. import _apis, issues, _utilities
14+
from ..settings import BaseRequestSettings
1315
from ..connection import _RpcState as RpcState
1416
from .._grpc.grpcwrapper import common_utils
1517
from .._grpc.grpcwrapper import ydb_query as _ydb_query
@@ -136,29 +138,32 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
136138
self._settings = settings if settings is not None else base.QueryClientSettings()
137139
self._state = QuerySessionState(settings)
138140

139-
def _create_call(self) -> "BaseQuerySession":
141+
def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
140142
return self._driver(
141143
_apis.ydb_query.CreateSessionRequest(),
142144
_apis.QueryService.Stub,
143145
_apis.QueryService.CreateSession,
144146
wrap_result=wrapper_create_session,
145147
wrap_args=(self._state, self),
148+
settings=settings,
146149
)
147150

148-
def _delete_call(self) -> "BaseQuerySession":
151+
def _delete_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
149152
return self._driver(
150153
_apis.ydb_query.DeleteSessionRequest(session_id=self._state.session_id),
151154
_apis.QueryService.Stub,
152155
_apis.QueryService.DeleteSession,
153156
wrap_result=wrapper_delete_session,
154157
wrap_args=(self._state, self),
158+
settings=settings,
155159
)
156160

157-
def _attach_call(self) -> Iterable[_apis.ydb_query.SessionState]:
161+
def _attach_call(self, settings: Optional[BaseRequestSettings] = None) -> Iterable[_apis.ydb_query.SessionState]:
158162
return self._driver(
159163
_apis.ydb_query.AttachSessionRequest(session_id=self._state.session_id),
160164
_apis.QueryService.Stub,
161165
_apis.QueryService.AttachSession,
166+
settings=settings,
162167
)
163168

164169
def _execute_call(
@@ -169,6 +174,7 @@ def _execute_call(
169174
exec_mode: base.QueryExecMode = None,
170175
parameters: dict = None,
171176
concurrent_result_sets: bool = False,
177+
settings: Optional[BaseRequestSettings] = None,
172178
) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]:
173179
request = base.create_execute_query_request(
174180
query=query,
@@ -186,6 +192,7 @@ def _execute_call(
186192
request.to_proto(),
187193
_apis.QueryService.Stub,
188194
_apis.QueryService.ExecuteQuery,
195+
settings=settings,
189196
)
190197

191198

@@ -196,8 +203,8 @@ class QuerySessionSync(BaseQuerySession):
196203

197204
_stream = None
198205

199-
def _attach(self) -> None:
200-
self._stream = self._attach_call()
206+
def _attach(self, settings: Optional[BaseRequestSettings] = None) -> None:
207+
self._stream = self._attach_call(settings=settings)
201208
status_stream = _utilities.SyncResponseIterator(
202209
self._stream,
203210
lambda response: common_utils.ServerStatus.from_proto(response),
@@ -228,7 +235,7 @@ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseItera
228235
self._state.reset()
229236
self._state._change_state(QuerySessionStateEnum.CLOSED)
230237

231-
def delete(self) -> None:
238+
def delete(self, settings: Optional[BaseRequestSettings] = None) -> None:
232239
"""WARNING: This API is experimental and could be changed.
233240
234241
Deletes a Session of Query Service on server side and releases resources.
@@ -239,10 +246,10 @@ def delete(self) -> None:
239246
return
240247

241248
self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
242-
self._delete_call()
249+
self._delete_call(settings=settings)
243250
self._stream.cancel()
244251

245-
def create(self) -> "QuerySessionSync":
252+
def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessionSync":
246253
"""WARNING: This API is experimental and could be changed.
247254
248255
Creates a Session of Query Service on server side and attaches it.
@@ -253,7 +260,8 @@ def create(self) -> "QuerySessionSync":
253260
return
254261

255262
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
256-
self._create_call()
263+
264+
self._create_call(settings=settings)
257265
self._attach()
258266

259267
return self
@@ -289,6 +297,7 @@ def execute(
289297
syntax: base.QuerySyntax = None,
290298
exec_mode: base.QueryExecMode = None,
291299
concurrent_result_sets: bool = False,
300+
settings: Optional[BaseRequestSettings] = None,
292301
) -> base.SyncResponseContextIterator:
293302
"""WARNING: This API is experimental and could be changed.
294303
@@ -311,6 +320,7 @@ def execute(
311320
exec_mode=exec_mode,
312321
parameters=parameters,
313322
concurrent_result_sets=concurrent_result_sets,
323+
settings=settings,
314324
)
315325

316326
return base.SyncResponseContextIterator(

0 commit comments

Comments
 (0)