Skip to content

Commit c90ccf4

Browse files
committed
ability to execute ddl queries
1 parent 424f831 commit c90ccf4

File tree

5 files changed

+96
-43
lines changed

5 files changed

+96
-43
lines changed

examples/query-service/basic_example.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,22 @@ def main():
1414
except TimeoutError:
1515
raise RuntimeError("Connect failed to YDB")
1616

17-
# client = ydb.QueryClientSync(driver)
18-
# session = client.session().create()
1917
pool = ydb.QuerySessionPool(driver)
20-
# with pool.checkout() as session:
18+
19+
print("=" * 50)
20+
print("DELETE TABLE IF EXISTS")
21+
pool.execute_with_retries("drop table if exists example;", ddl=True)
22+
23+
print("=" * 50)
24+
print("CREATE TABLE")
25+
pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key));", ddl=True)
2126

2227
def callee(session):
2328
print("=" * 50)
29+
session.execute("""delete from example;""")
30+
2431
print("BEFORE ACTION")
25-
it = session.execute("""SELECT COUNT(*) FROM example;""")
32+
it = session.execute("""SELECT COUNT(*) as rows_count FROM example;""")
2633
for result_set in it:
2734
print(f"rows: {str(result_set.rows)}")
2835

@@ -34,15 +41,15 @@ def callee(session):
3441

3542
tx.execute("""INSERT INTO example (key, value) VALUES (0055, "onepieceisreal");""")
3643

37-
for result_set in tx.execute("""SELECT COUNT(*) FROM example;"""):
44+
for result_set in tx.execute("""SELECT COUNT(*) as rows_count FROM example;"""):
3845
print(f"rows: {str(result_set.rows)}")
3946

4047
tx.commit()
4148

4249
print("=" * 50)
4350
print("AFTER COMMIT TX")
4451

45-
for result_set in session.execute("""SELECT COUNT(*) FROM example;"""):
52+
for result_set in session.execute("""SELECT COUNT(*) as rows_count FROM example;"""):
4653
print(f"rows: {str(result_set.rows)}")
4754

4855
print("=" * 50)
@@ -54,15 +61,15 @@ def callee(session):
5461

5562
tx.execute("""INSERT INTO example (key, value) VALUES (0066, "onepieceisreal");""")
5663

57-
for result_set in tx.execute("""SELECT COUNT(*) FROM example;"""):
64+
for result_set in tx.execute("""SELECT COUNT(*) as rows_count FROM example;"""):
5865
print(f"rows: {str(result_set.rows)}")
5966

6067
tx.rollback()
6168

6269
print("=" * 50)
6370
print("AFTER ROLLBACK TX")
6471

65-
for result_set in session.execute("""SELECT COUNT(*) FROM example;"""):
72+
for result_set in session.execute("""SELECT COUNT(*) as rows_count FROM example;"""):
6673
print(f"rows: {str(result_set.rows)}")
6774

6875
pool.retry_operation_sync(callee)

ydb/_grpc/grpcwrapper/ydb_query.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,17 @@ def to_proto(self) -> ydb_query_pb2.TransactionControl:
161161
class ExecuteQueryRequest(IToProto):
162162
session_id: str
163163
query_content: QueryContent
164-
tx_control: TransactionControl
164+
tx_control: Optional[TransactionControl] = None
165165
concurrent_result_sets: Optional[bool] = False
166166
exec_mode: Optional[int] = None
167167
parameters: Optional[dict] = None
168168
stats_mode: Optional[int] = None
169169

170170
def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
171+
tx_control = self.tx_control.to_proto() if self.tx_control is not None else self.tx_control
171172
return ydb_query_pb2.ExecuteQueryRequest(
172173
session_id=self.session_id,
173-
tx_control=self.tx_control.to_proto(),
174+
tx_control=tx_control,
174175
query_content=self.query_content.to_proto(),
175176
exec_mode=self.exec_mode,
176177
stats_mode=self.stats_mode,

ydb/query/base.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -155,43 +155,40 @@ def create_execute_query_request(
155155
exec_mode: QueryExecMode = None,
156156
parameters: dict = None,
157157
concurrent_result_sets: bool = False,
158+
empty_tx_control: bool = False,
158159
):
159160
syntax = QuerySyntax.YQL_V1 if not syntax else syntax
160161
exec_mode = QueryExecMode.EXECUTE if not exec_mode else exec_mode
161-
if tx_id:
162-
req = ydb_query.ExecuteQueryRequest(
163-
session_id=session_id,
164-
query_content=ydb_query.QueryContent.from_public(
165-
query=query,
166-
syntax=syntax,
167-
),
168-
tx_control=ydb_query.TransactionControl(
169-
tx_id=tx_id,
170-
commit_tx=commit_tx,
171-
),
172-
exec_mode=exec_mode,
173-
parameters=parameters,
174-
concurrent_result_sets=concurrent_result_sets,
162+
163+
tx_control = None
164+
if empty_tx_control:
165+
tx_control = None
166+
elif tx_id:
167+
tx_control = ydb_query.TransactionControl(
168+
tx_id=tx_id,
169+
commit_tx=commit_tx,
175170
)
176171
else:
177172
tx_mode = tx_mode if tx_mode is not None else QuerySerializableReadWrite()
178-
req = ydb_query.ExecuteQueryRequest(
179-
session_id=session_id,
180-
query_content=ydb_query.QueryContent.from_public(
181-
query=query,
182-
syntax=syntax,
173+
tx_control = ydb_query.TransactionControl(
174+
begin_tx=ydb_query.TransactionSettings(
175+
tx_mode=tx_mode,
183176
),
184-
tx_control=ydb_query.TransactionControl(
185-
begin_tx=ydb_query.TransactionSettings(
186-
tx_mode=tx_mode,
187-
),
188-
commit_tx=commit_tx,
189-
),
190-
exec_mode=exec_mode,
191-
parameters=parameters,
192-
concurrent_result_sets=concurrent_result_sets,
177+
commit_tx=commit_tx,
193178
)
194179

180+
req = ydb_query.ExecuteQueryRequest(
181+
session_id=session_id,
182+
query_content=ydb_query.QueryContent.from_public(
183+
query=query,
184+
syntax=syntax,
185+
),
186+
tx_control=tx_control,
187+
exec_mode=exec_mode,
188+
parameters=parameters,
189+
concurrent_result_sets=concurrent_result_sets,
190+
)
191+
195192
return req.to_proto()
196193

197194

ydb/query/pool.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ def retry_operation_impl(callee: Callable, retry_settings: RetrySettings = None,
9191
# you should provide your own handler you want
9292
retry_settings.unknown_error_handler(e)
9393
raise
94-
95-
raise status
94+
if status:
95+
raise status
9696

9797

9898
class QuerySessionPool:
@@ -116,6 +116,20 @@ def wrapped_callee():
116116
else:
117117
return next_opt.result
118118

119+
def execute_with_retries(self, query: str, ddl: bool = False, retry_settings: RetrySettings = None, *args, **kwargs):
120+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
121+
with self.checkout() as session:
122+
def wrapped_callee():
123+
it = session.execute(query, empty_tx_control=ddl)
124+
return [result_set for result_set in it]
125+
126+
opt_generator = retry_operation_impl(wrapped_callee, retry_settings, *args, **kwargs)
127+
for next_opt in opt_generator:
128+
if isinstance(next_opt, YdbRetryOperationSleepOpt):
129+
time.sleep(next_opt.timeout)
130+
else:
131+
return next_opt.result
132+
119133

120134
class SimpleQuerySessionCheckout:
121135
def __init__(self, pool: QuerySessionPool):

ydb/query/session.py

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,11 +150,27 @@ def _attach_call(self):
150150
_apis.QueryService.AttachSession,
151151
)
152152

153-
def _execute_call(self, query: str, commit_tx: bool):
153+
def _execute_call(
154+
self,
155+
query: str,
156+
commit_tx: bool = False,
157+
tx_mode: base.BaseQueryTxMode = None,
158+
syntax: base.QuerySyntax = None,
159+
exec_mode: base.QueryExecMode = None,
160+
parameters: dict = None,
161+
concurrent_result_sets: bool = False,
162+
empty_tx_control: bool = False,
163+
):
154164
request = base.create_execute_query_request(
155165
query=query,
156166
session_id=self._state.session_id,
157167
commit_tx=commit_tx,
168+
tx_mode=tx_mode,
169+
syntax=syntax,
170+
exec_mode=exec_mode,
171+
parameters=parameters,
172+
concurrent_result_sets=concurrent_result_sets,
173+
empty_tx_control=empty_tx_control,
158174
)
159175

160176
return self._driver(
@@ -225,10 +241,28 @@ def transaction(self, tx_mode: base.BaseQueryTxMode = None) -> base.IQueryTxCont
225241
tx_mode,
226242
)
227243

228-
def execute(self, query: str, parameters=None):
244+
def execute(
245+
self,
246+
query: str,
247+
tx_mode: base.BaseQueryTxMode = None,
248+
syntax: base.QuerySyntax = None,
249+
exec_mode: base.QueryExecMode = None,
250+
parameters: dict = None,
251+
concurrent_result_sets: bool = False,
252+
empty_tx_control: bool = False
253+
):
229254
self._state._check_session_ready_to_use()
230255

231-
stream_it = self._execute_call(query, commit_tx=True)
256+
stream_it = self._execute_call(
257+
query=query,
258+
commit_tx=True,
259+
tx_mode=tx_mode,
260+
syntax=syntax,
261+
exec_mode=exec_mode,
262+
parameters=parameters,
263+
concurrent_result_sets=concurrent_result_sets,
264+
empty_tx_control=empty_tx_control,
265+
)
232266

233267
return _utilities.SyncResponseIterator(
234268
stream_it,

0 commit comments

Comments
 (0)