Skip to content

Commit 145054f

Browse files
committed
review fixes
1 parent af4d5d9 commit 145054f

File tree

6 files changed

+41
-35
lines changed

6 files changed

+41
-35
lines changed

ydb/aio/query/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ async def __aenter__(self) -> "AsyncResponseContextIterator":
66
return self
77

88
async def __aexit__(self, exc_type, exc_val, exc_tb):
9+
# To close stream on YDB it is necessary to scroll through it to the end
910
async for _ in self:
1011
pass

ydb/aio/query/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ async def _check_session_status_loop(self) -> None:
5858
self._state.reset()
5959
self._state._change_state(QuerySessionStateEnum.CLOSED)
6060
except Exception:
61-
pass
61+
if not self._state._already_in(QuerySessionStateEnum.CLOSED):
62+
self._state.reset()
63+
self._state._change_state(QuerySessionStateEnum.CLOSED)
6264

6365
async def delete(self) -> None:
6466
"""WARNING: This API is experimental and could be changed.

ydb/aio/query/transaction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ async def __aexit__(self, *args, **kwargs):
2828
Closes a transaction context manager and rollbacks transaction if
2929
it is not finished explicitly
3030
"""
31-
self._ensure_prev_stream_finished()
31+
await self._ensure_prev_stream_finished()
3232
if self._tx_state._state == QueryTxStateEnum.BEGINED:
3333
# It's strictly recommended to close transactions directly
3434
# by using commit_tx=True flag while executing statement or by
@@ -42,7 +42,7 @@ async def __aexit__(self, *args, **kwargs):
4242

4343
async def _ensure_prev_stream_finished(self) -> None:
4444
if self._prev_stream is not None:
45-
async for _ in self._prev_stream:
45+
async with self._prev_stream:
4646
pass
4747
self._prev_stream = None
4848

ydb/query/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ def __enter__(self) -> "SyncResponseContextIterator":
4848
return self
4949

5050
def __exit__(self, exc_type, exc_val, exc_tb):
51+
# To close stream on YDB it is necessary to scroll through it to the end
5152
for _ in self:
5253
pass
5354

ydb/query/session.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ def _check_session_status_loop(self, status_stream: _utilities.SyncResponseItera
224224
self._state.reset()
225225
self._state._change_state(QuerySessionStateEnum.CLOSED)
226226
except Exception:
227-
pass
227+
if not self._state._already_in(QuerySessionStateEnum.CLOSED):
228+
self._state.reset()
229+
self._state._change_state(QuerySessionStateEnum.CLOSED)
228230

229231
def delete(self) -> None:
230232
"""WARNING: This API is experimental and could be changed.

ydb/query/transaction.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -196,31 +196,6 @@ def __init__(self, driver, session_state, session, tx_mode):
196196
self.session = session
197197
self._prev_stream = None
198198

199-
def __enter__(self) -> "BaseQueryTxContext":
200-
"""
201-
Enters a context manager and returns a transaction
202-
203-
:return: A transaction instance
204-
"""
205-
return self
206-
207-
def __exit__(self, *args, **kwargs):
208-
"""
209-
Closes a transaction context manager and rollbacks transaction if
210-
it is not finished explicitly
211-
"""
212-
self._ensure_prev_stream_finished()
213-
if self._tx_state._state == QueryTxStateEnum.BEGINED:
214-
# It's strictly recommended to close transactions directly
215-
# by using commit_tx=True flag while executing statement or by
216-
# .commit() or .rollback() methods, but here we trying to do best
217-
# effort to avoid useless open transactions
218-
logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
219-
try:
220-
self.rollback()
221-
except issues.Error:
222-
logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
223-
224199
@property
225200
def session_id(self) -> str:
226201
"""
@@ -304,12 +279,6 @@ def _execute_call(
304279
_apis.QueryService.ExecuteQuery,
305280
)
306281

307-
def _ensure_prev_stream_finished(self) -> None:
308-
if self._prev_stream is not None:
309-
for _ in self._prev_stream:
310-
pass
311-
self._prev_stream = None
312-
313282
def _move_to_beginned(self, tx_id: str) -> None:
314283
if self._tx_state._already_in(QueryTxStateEnum.BEGINED):
315284
return
@@ -323,6 +292,37 @@ def _move_to_commited(self) -> None:
323292

324293

325294
class QueryTxContextSync(BaseQueryTxContext):
295+
def __enter__(self) -> "BaseQueryTxContext":
296+
"""
297+
Enters a context manager and returns a transaction
298+
299+
:return: A transaction instance
300+
"""
301+
return self
302+
303+
def __exit__(self, *args, **kwargs):
304+
"""
305+
Closes a transaction context manager and rollbacks transaction if
306+
it is not finished explicitly
307+
"""
308+
self._ensure_prev_stream_finished()
309+
if self._tx_state._state == QueryTxStateEnum.BEGINED:
310+
# It's strictly recommended to close transactions directly
311+
# by using commit_tx=True flag while executing statement or by
312+
# .commit() or .rollback() methods, but here we trying to do best
313+
# effort to avoid useless open transactions
314+
logger.warning("Potentially leaked tx: %s", self._tx_state.tx_id)
315+
try:
316+
self.rollback()
317+
except issues.Error:
318+
logger.warning("Failed to rollback leaked tx: %s", self._tx_state.tx_id)
319+
320+
def _ensure_prev_stream_finished(self) -> None:
321+
if self._prev_stream is not None:
322+
with self._prev_stream:
323+
pass
324+
self._prev_stream = None
325+
326326
def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextSync":
327327
"""WARNING: This API is experimental and could be changed.
328328

0 commit comments

Comments
 (0)