1
1
import abc
2
2
import enum
3
3
import logging
4
+ import time
4
5
import threading
5
6
from typing import (
6
7
Iterable ,
10
11
from . import base
11
12
12
13
from .. import _apis , issues , _utilities
14
+ from ..settings import BaseRequestSettings
13
15
from ..connection import _RpcState as RpcState
14
16
from .._grpc .grpcwrapper import common_utils
15
17
from .._grpc .grpcwrapper import ydb_query as _ydb_query
@@ -136,29 +138,32 @@ def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[
136
138
self ._settings = settings if settings is not None else base .QueryClientSettings ()
137
139
self ._state = QuerySessionState (settings )
138
140
139
- def _create_call (self ) -> "BaseQuerySession" :
141
+ def _create_call (self , settings : Optional [ BaseRequestSettings ] = None ) -> "BaseQuerySession" :
140
142
return self ._driver (
141
143
_apis .ydb_query .CreateSessionRequest (),
142
144
_apis .QueryService .Stub ,
143
145
_apis .QueryService .CreateSession ,
144
146
wrap_result = wrapper_create_session ,
145
147
wrap_args = (self ._state , self ),
148
+ settings = settings ,
146
149
)
147
150
148
- def _delete_call (self ) -> "BaseQuerySession" :
151
+ def _delete_call (self , settings : Optional [ BaseRequestSettings ] = None ) -> "BaseQuerySession" :
149
152
return self ._driver (
150
153
_apis .ydb_query .DeleteSessionRequest (session_id = self ._state .session_id ),
151
154
_apis .QueryService .Stub ,
152
155
_apis .QueryService .DeleteSession ,
153
156
wrap_result = wrapper_delete_session ,
154
157
wrap_args = (self ._state , self ),
158
+ settings = settings ,
155
159
)
156
160
157
- def _attach_call (self ) -> Iterable [_apis .ydb_query .SessionState ]:
161
+ def _attach_call (self , settings : Optional [ BaseRequestSettings ] = None ) -> Iterable [_apis .ydb_query .SessionState ]:
158
162
return self ._driver (
159
163
_apis .ydb_query .AttachSessionRequest (session_id = self ._state .session_id ),
160
164
_apis .QueryService .Stub ,
161
165
_apis .QueryService .AttachSession ,
166
+ settings = settings ,
162
167
)
163
168
164
169
def _execute_call (
@@ -169,6 +174,7 @@ def _execute_call(
169
174
exec_mode : base .QueryExecMode = None ,
170
175
parameters : dict = None ,
171
176
concurrent_result_sets : bool = False ,
177
+ settings : Optional [BaseRequestSettings ] = None ,
172
178
) -> Iterable [_apis .ydb_query .ExecuteQueryResponsePart ]:
173
179
request = base .create_execute_query_request (
174
180
query = query ,
@@ -186,6 +192,7 @@ def _execute_call(
186
192
request .to_proto (),
187
193
_apis .QueryService .Stub ,
188
194
_apis .QueryService .ExecuteQuery ,
195
+ settings = settings ,
189
196
)
190
197
191
198
@@ -196,8 +203,8 @@ class QuerySessionSync(BaseQuerySession):
196
203
197
204
_stream = None
198
205
199
- def _attach (self ) -> None :
200
- self ._stream = self ._attach_call ()
206
+ def _attach (self , settings : Optional [ BaseRequestSettings ] = None ) -> None :
207
+ self ._stream = self ._attach_call (settings = settings )
201
208
status_stream = _utilities .SyncResponseIterator (
202
209
self ._stream ,
203
210
lambda response : common_utils .ServerStatus .from_proto (response ),
@@ -228,7 +235,7 @@ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseItera
228
235
self ._state .reset ()
229
236
self ._state ._change_state (QuerySessionStateEnum .CLOSED )
230
237
231
- def delete (self ) -> None :
238
+ def delete (self , settings : Optional [ BaseRequestSettings ] = None ) -> None :
232
239
"""WARNING: This API is experimental and could be changed.
233
240
234
241
Deletes a Session of Query Service on server side and releases resources.
@@ -239,10 +246,10 @@ def delete(self) -> None:
239
246
return
240
247
241
248
self ._state ._check_invalid_transition (QuerySessionStateEnum .CLOSED )
242
- self ._delete_call ()
249
+ self ._delete_call (settings = settings )
243
250
self ._stream .cancel ()
244
251
245
- def create (self ) -> "QuerySessionSync" :
252
+ def create (self , settings : Optional [ BaseRequestSettings ] = None ) -> "QuerySessionSync" :
246
253
"""WARNING: This API is experimental and could be changed.
247
254
248
255
Creates a Session of Query Service on server side and attaches it.
@@ -253,7 +260,8 @@ def create(self) -> "QuerySessionSync":
253
260
return
254
261
255
262
self ._state ._check_invalid_transition (QuerySessionStateEnum .CREATED )
256
- self ._create_call ()
263
+
264
+ self ._create_call (settings = settings )
257
265
self ._attach ()
258
266
259
267
return self
@@ -289,6 +297,7 @@ def execute(
289
297
syntax : base .QuerySyntax = None ,
290
298
exec_mode : base .QueryExecMode = None ,
291
299
concurrent_result_sets : bool = False ,
300
+ settings : Optional [BaseRequestSettings ] = None ,
292
301
) -> base .SyncResponseContextIterator :
293
302
"""WARNING: This API is experimental and could be changed.
294
303
@@ -311,6 +320,7 @@ def execute(
311
320
exec_mode = exec_mode ,
312
321
parameters = parameters ,
313
322
concurrent_result_sets = concurrent_result_sets ,
323
+ settings = settings ,
314
324
)
315
325
316
326
return base .SyncResponseContextIterator (
0 commit comments