Skip to content

Commit fb15242

Browse files
committed
temp
1 parent db621aa commit fb15242

File tree

6 files changed

+237
-5
lines changed

6 files changed

+237
-5
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
version: "3.3"
22
services:
33
ydb:
4-
image: cr.yandex/yc/yandex-docker-local-ydb:latest
4+
image: ydbplatform/local-ydb:latest
55
restart: always
66
ports:
77
- 2136:2136

examples/query-service/basic_example.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ def main():
1919
session = QuerySessionSync(driver)
2020
session.create()
2121

22-
it = session.execute("select 1; select 2;")
22+
it = session.execute("select 1; select 2;", commit_tx=False)
2323
for result_set in it:
24-
print(f"columns: {str(result_set.columns)}")
25-
print(f"rows: {str(result_set.rows)}")
24+
pass
25+
# print(f"columns: {str(result_set.columns)}")
26+
# print(f"rows: {str(result_set.rows)}")
2627

2728

2829

test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,5 @@ pylint-protobuf
4747
cython
4848
freezegun==1.2.2
4949
pytest-cov
50-
yandexcloud
50+
# yandexcloud
5151
-e .

tests/query/test_query_transaction.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import pytest
2+
3+
import ydb.query.session
4+
5+
class TestQuerySession:
6+
def test_transaction_begin(self, driver_sync):
7+
session = ydb.query.session.QuerySessionSync(driver_sync)
8+
9+
session.create()
10+
11+
tx = session.transaction()
12+
13+
assert tx._tx_state.tx_id == None
14+
15+
tx.begin()
16+
17+
assert tx._tx_state.tx_id != None

ydb/query/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,6 @@ def create_execute_query_request(query: str, session_id: str, commit_tx: bool):
103103

104104
def wrap_execute_query_response(rpc_state, response_pb):
105105

106+
print(response_pb)
107+
106108
return convert.ResultSet.from_message(response_pb.result_set)

ydb/query/transaction.py

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
import abc
2+
import logging
3+
4+
from .. import (
5+
_apis,
6+
issues,
7+
_utilities,
8+
)
9+
from .._grpc.grpcwrapper import ydb_query as _ydb_query
10+
from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public
11+
12+
from .._tx_ctx_impl import TxState, reset_tx_id_handler
13+
from .._session_impl import bad_session_handler
14+
from ..table import (
15+
AbstractTransactionModeBuilder,
16+
ITxContext,
17+
SerializableReadWrite
18+
)
19+
20+
logger = logging.getLogger(__name__)
21+
22+
def patch_table_service_tx_mode_to_query_service(tx_mode: AbstractTransactionModeBuilder):
23+
if tx_mode.name == 'snapshot_read_only':
24+
tx_mode = _ydb_query_public.QuerySnapshotReadOnly()
25+
elif tx_mode.name == 'serializable_read_write':
26+
tx_mode = _ydb_query_public.QuerySerializableReadWrite()
27+
elif tx_mode.name =='online_read_only':
28+
tx_mode = _ydb_query_public.QueryOnlineReadOnly()
29+
elif tx_mode.name == 'stale_read_only':
30+
tx_mode = _ydb_query_public.QueryStaleReadOnly()
31+
else:
32+
raise issues.YDBInvalidArgumentError(f'Unknown transaction mode: {tx_mode.name}')
33+
34+
return tx_mode
35+
36+
37+
def _construct_tx_settings(tx_state):
38+
tx_settings = _ydb_query.TransactionSettings.from_public(tx_state.tx_mode)
39+
return tx_settings
40+
41+
42+
def _create_begin_transaction_request(session_state, tx_state):
43+
request = _ydb_query.BeginTransactionRequest(
44+
session_id=session_state.session_id,
45+
tx_settings=_construct_tx_settings(tx_state),
46+
).to_proto()
47+
48+
print(request)
49+
50+
return request
51+
52+
53+
def _create_commit_transaction_request(session_state, tx_state):
54+
request = _apis.ydb_query.CommitTransactionRequest()
55+
request.tx_id = tx_state.tx_id
56+
request.session_id = session_state.session_id
57+
return request
58+
59+
def _create_rollback_transaction_request(session_state, tx_state):
60+
request = _apis.ydb_query.RollbackTransactionRequest()
61+
request.tx_id = tx_state.tx_id
62+
request.session_id = session_state.session_id
63+
return request
64+
65+
66+
@bad_session_handler
67+
def wrap_tx_begin_response(rpc_state, response_pb, session_state, tx_state, tx):
68+
# session_state.complete_query()
69+
# issues._process_response(response_pb.operation)
70+
print("wrap result")
71+
message = _ydb_query.BeginTransactionResponse.from_proto(response_pb)
72+
tx_state.tx_id = message.tx_meta.id
73+
return tx
74+
75+
76+
@bad_session_handler
77+
@reset_tx_id_handler
78+
def wrap_result_on_rollback_or_commit_tx(rpc_state, response_pb, session_state, tx_state, tx):
79+
80+
# issues._process_response(response_pb.operation)
81+
# transaction successfully committed or rolled back
82+
tx_state.tx_id = None
83+
return tx
84+
85+
86+
class BaseTxContext(ITxContext):
87+
88+
_COMMIT = "commit"
89+
_ROLLBACK = "rollback"
90+
91+
def __init__(self, driver, session_state, session, tx_mode=None):
92+
"""
93+
An object that provides a simple transaction context manager that allows statements execution
94+
in a transaction. You don't have to open transaction explicitly, because context manager encapsulates
95+
transaction control logic, and opens new transaction if:
96+
97+
1) By explicit .begin() and .async_begin() methods;
98+
2) On execution of a first statement, which is strictly recommended method, because that avoids useless round trip
99+
100+
This context manager is not thread-safe, so you should not manipulate on it concurrently.
101+
102+
:param driver: A driver instance
103+
:param session_state: A state of session
104+
:param tx_mode: A transaction mode, which is a one from the following choices:
105+
1) SerializableReadWrite() which is default mode;
106+
2) OnlineReadOnly();
107+
3) StaleReadOnly().
108+
"""
109+
self._driver = driver
110+
if tx_mode is None:
111+
tx_mode = patch_table_service_tx_mode_to_query_service(SerializableReadWrite())
112+
else:
113+
tx_mode = patch_table_service_tx_mode_to_query_service(tx_mode)
114+
self._tx_state = TxState(tx_mode)
115+
self._session_state = session_state
116+
self.session = session
117+
self._finished = ""
118+
119+
def __enter__(self):
120+
"""
121+
Enters a context manager and returns a session
122+
123+
:return: A session instance
124+
"""
125+
return self
126+
127+
def __exit__(self, *args, **kwargs):
128+
"""
129+
Closes a transaction context manager and rollbacks transaction if
130+
it is not rolled back explicitly
131+
"""
132+
if self._tx_state.tx_id is not None:
133+
# It's strictly recommended to close transactions directly
134+
# by using commit_tx=True flag while executing statement or by
135+
# .commit() or .rollback() methods, but here we trying to do best
136+
# effort to avoid useless open transactions
137+
logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
138+
try:
139+
self.rollback()
140+
except issues.Error:
141+
logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
142+
143+
self._tx_state.tx_id = None
144+
145+
@property
146+
def session_id(self):
147+
"""
148+
A transaction's session id
149+
150+
:return: A transaction's session id
151+
"""
152+
return self._session_state.session_id
153+
154+
@property
155+
def tx_id(self):
156+
"""
157+
Returns a id of open transaction or None otherwise
158+
159+
:return: A id of open transaction or None otherwise
160+
"""
161+
return self._tx_state.tx_id
162+
163+
def begin(self, settings=None):
164+
"""
165+
Explicitly begins a transaction
166+
167+
:param settings: A request settings
168+
169+
:return: An open transaction
170+
"""
171+
if self._tx_state.tx_id is not None:
172+
return self
173+
174+
print('try to begin tx')
175+
176+
return self._driver(
177+
_create_begin_transaction_request(self._session_state, self._tx_state),
178+
_apis.QueryService.Stub,
179+
_apis.QueryService.BeginTransaction,
180+
wrap_result=wrap_tx_begin_response,
181+
wrap_args=(self._session_state, self._tx_state, self),
182+
)
183+
184+
def commit(self, settings=None):
185+
"""
186+
Calls commit on a transaction if it is open otherwise is no-op. If transaction execution
187+
failed then this method raises PreconditionFailed.
188+
189+
:param settings: A request settings
190+
191+
:return: A committed transaction or exception if commit is failed
192+
"""
193+
194+
self._set_finish(self._COMMIT)
195+
196+
if self._tx_state.tx_id is None and not self._tx_state.dead:
197+
return self
198+
199+
return self._driver(
200+
_create_commit_transaction_request(self._session_state, self._tx_state),
201+
_apis.QueryService.Stub,
202+
_apis.QueryService.CommitTransaction,
203+
wrap_result_on_rollback_or_commit_tx,
204+
settings,
205+
(self._session_state, self._tx_state, self),
206+
)
207+
208+
def rollback(self, settings=None):
209+
pass
210+
211+
def execute(self, query, parameters=None, commit_tx=False, settings=None):
212+
pass

0 commit comments

Comments
 (0)