Skip to content

Commit 9dca466

Browse files
committed
simple execute on session
1 parent de683c0 commit 9dca466

File tree

4 files changed

+104
-21
lines changed

4 files changed

+104
-21
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import ydb
2+
3+
from ydb.query.session import QuerySessionSync
4+
5+
6+
def main():
7+
driver_config = ydb.DriverConfig(
8+
endpoint="grpc://localhost:2136",
9+
database="/local",
10+
# credentials=ydb.credentials_from_env_variables(),
11+
# root_certificates=ydb.load_ydb_root_certificate(),
12+
)
13+
try:
14+
driver = ydb.Driver(driver_config)
15+
driver.wait(timeout=5)
16+
except TimeoutError:
17+
raise RuntimeError("Connect failed to YDB")
18+
19+
session = QuerySessionSync(driver)
20+
session.create()
21+
22+
it = session.execute("select 1; select 2;")
23+
for result_set in it:
24+
print(f"columns: {str(result_set.columns)}")
25+
print(f"rows: {str(result_set.rows)}")
26+
27+
28+
29+
if __name__ == '__main__':
30+
main()

ydb/_grpc/grpcwrapper/ydb_query.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def to_proto(self) -> ydb_query_pb2.TransactionSettings:
9292
return ydb_query_pb2.TransactionSettings(online_read_only=self.tx_mode.to_proto())
9393
if self.tx_mode.name == 'stale_read_only':
9494
return ydb_query_pb2.TransactionSettings(stale_read_only=self.tx_mode.to_proto())
95-
# TODO: add exception
9695

9796
@dataclass
9897
class BeginTransactionRequest(IToProto):
@@ -102,7 +101,7 @@ class BeginTransactionRequest(IToProto):
102101
def to_proto(self) -> ydb_query_pb2.BeginTransactionRequest:
103102
return ydb_query_pb2.BeginTransactionRequest(
104103
session_id=self.session_id,
105-
tx_settings=self.tx_settings
104+
tx_settings=self.tx_settings.to_proto(),
106105
)
107106

108107
@dataclass
@@ -120,7 +119,7 @@ def from_proto(msg: ydb_query_pb2.BeginTransactionResponse) -> "BeginTransaction
120119
@dataclass
121120
class QueryContent(IFromPublic, IToProto):
122121
text: str
123-
syntax: Optional[str]
122+
syntax: Optional[str] = None
124123

125124
@staticmethod
126125
def from_public(query: str) -> "QueryContent":
@@ -132,9 +131,9 @@ def to_proto(self) -> ydb_query_pb2.QueryContent:
132131

133132
@dataclass
134133
class TransactionControl(IToProto):
135-
begin_tx: Optional[TransactionSettings]
136-
commit_tx: Optional[bool]
137-
tx_id: Optional[str]
134+
begin_tx: Optional[TransactionSettings] = None
135+
commit_tx: Optional[bool] = None
136+
tx_id: Optional[str] = None
138137

139138
def to_proto(self) -> ydb_query_pb2.TransactionControl:
140139
if self.tx_id:
@@ -143,19 +142,28 @@ def to_proto(self) -> ydb_query_pb2.TransactionControl:
143142
commit_tx=self.commit_tx,
144143
)
145144
return ydb_query_pb2.TransactionControl(
146-
begin_tx=self.begin_tx,
145+
begin_tx=self.begin_tx.to_proto(),
147146
commit_tx=self.commit_tx
148147
)
149148

150149

151150
@dataclass
152-
class ExecuteQueryRequest:
153-
exec_mode: Optional[str]
154-
concurrent_result_sets: bool = False
155-
parameters: Optional[dict]
156-
query_content: QueryContent
151+
class ExecuteQueryRequest(IToProto):
157152
session_id: str
158-
stats_mode: Optional[str]
153+
query_content: QueryContent
159154
tx_control: TransactionControl
155+
concurrent_result_sets: Optional[bool] = False
156+
exec_mode: Optional[str] = None
157+
parameters: Optional[dict] = None
158+
stats_mode: Optional[str] = None
160159

161-
160+
def to_proto(self) -> ydb_query_pb2.ExecuteQueryRequest:
161+
return ydb_query_pb2.ExecuteQueryRequest(
162+
session_id=self.session_id,
163+
tx_control=self.tx_control.to_proto(),
164+
query_content=self.query_content.to_proto(),
165+
exec_mode=ydb_query_pb2.EXEC_MODE_EXECUTE,
166+
stats_mode=self.stats_mode,
167+
concurrent_result_sets=self.concurrent_result_sets,
168+
parameters=self.parameters,
169+
)

ydb/query/base.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,12 @@
77
from .._grpc.grpcwrapper.common_utils import (
88
SupportedDriverType,
99
)
10-
11-
from .._grpc.grpcwrapper.ydb_query_public_types import BaseQueryTxMode
12-
10+
from .._grpc.grpcwrapper import ydb_query
11+
from .._grpc.grpcwrapper.ydb_query_public_types import (
12+
BaseQueryTxMode,
13+
QuerySerializableReadWrite
14+
)
15+
from .. import convert
1316

1417
class QueryClientSettings: ...
1518

@@ -82,4 +85,22 @@ def session(self) -> IQuerySession:
8285
pass
8386

8487

88+
def create_execute_query_request(query: str, session_id: str, commit_tx: bool):
89+
req = ydb_query.ExecuteQueryRequest(
90+
session_id=session_id,
91+
query_content=ydb_query.QueryContent.from_public(
92+
query=query,
93+
),
94+
tx_control=ydb_query.TransactionControl(
95+
begin_tx=ydb_query.TransactionSettings(
96+
tx_mode=QuerySerializableReadWrite(),
97+
),
98+
commit_tx=commit_tx
99+
),
100+
)
101+
102+
return req.to_proto()
103+
104+
def wrap_execute_query_response(rpc_state, response_pb):
85105

106+
return convert.ResultSet.from_message(response_pb.result_set)

ydb/query/session.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,19 @@ def _attach_call(self):
7171
_apis.QueryService.AttachSession,
7272
)
7373

74+
def _execute_call(self, query: str, commit_tx: bool):
75+
request = base.create_execute_query_request(
76+
query=query,
77+
session_id=self._state.session_id,
78+
commit_tx=commit_tx
79+
)
80+
return self._driver(
81+
request,
82+
_apis.QueryService.Stub,
83+
_apis.QueryService.ExecuteQuery,
84+
# base.wrap_execute_query_response
85+
)
86+
7487
class QuerySessionSync(BaseQuerySession):
7588
_stream = None
7689

@@ -96,15 +109,13 @@ def _attach(self):
96109
).start()
97110

98111
def _chech_session_status_loop(self, status_stream):
99-
print("CHECK STATUS")
100112
try:
101113
for status in status_stream:
102114
if status.status != issues.StatusCode.SUCCESS:
103115
print("STATUS NOT SUCCESS")
104116
self._state.reset(False)
105117
except Exception as e:
106118
pass
107-
print("CHECK STATUS STOP")
108119

109120

110121
def delete(self) -> None:
@@ -119,7 +130,20 @@ def create(self) -> None:
119130
self._create_call()
120131
self._attach()
121132

122-
def transaction(self, tx_mode: base.BaseQueryTxMode) -> base.IQueryTxContext:
133+
def transaction(self, tx_mode: base.BaseQueryTxMode = None) -> base.IQueryTxContext:
123134
if not self._state.session_id:
124135
return
125-
return BaseTxContext(tx_mode)
136+
return BaseTxContext(
137+
self._driver,
138+
self._state,
139+
self,
140+
tx_mode,
141+
)
142+
143+
def execute(self, query: str, commit_tx: bool = True):
144+
stream_it = self._execute_call(query, commit_tx)
145+
146+
return _utilities.SyncResponseIterator(
147+
stream_it,
148+
lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp),
149+
)

0 commit comments

Comments
 (0)