Skip to content

Commit 496d4c1

Browse files
committed
aio query session
1 parent 786e044 commit 496d4c1

File tree

5 files changed

+255
-0
lines changed

5 files changed

+255
-0
lines changed

tests/aio/query/__init__.py

Whitespace-only changes.

tests/aio/query/conftest.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import pytest
2+
from ydb.aio.query.session import QuerySessionAsync
3+
4+
@pytest.fixture
5+
async def session(driver):
6+
session = QuerySessionAsync(driver)
7+
8+
yield session
9+
10+
try:
11+
await session.delete()
12+
except BaseException:
13+
pass

tests/aio/query/test_query_session.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import pytest
2+
from ydb.aio.query.session import QuerySessionAsync
3+
4+
5+
def _check_session_state_empty(session: QuerySessionAsync):
6+
assert session._state.session_id is None
7+
assert session._state.node_id is None
8+
assert not session._state.attached
9+
10+
11+
def _check_session_state_full(session: QuerySessionAsync):
12+
assert session._state.session_id is not None
13+
assert session._state.node_id is not None
14+
assert session._state.attached
15+
16+
17+
class TestAsyncQuerySession:
18+
@pytest.mark.asyncio
19+
async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
20+
_check_session_state_empty(session)
21+
22+
await session.create()
23+
_check_session_state_full(session)
24+
25+
await session.delete()
26+
_check_session_state_empty(session)
27+
28+
@pytest.mark.asyncio
29+
async def test_second_create_do_nothing(self, session: QuerySessionAsync):
30+
await session.create()
31+
_check_session_state_full(session)
32+
33+
session_id_before = session._state.session_id
34+
node_id_before = session._state.node_id
35+
36+
await session.create()
37+
_check_session_state_full(session)
38+
39+
assert session._state.session_id == session_id_before
40+
assert session._state.node_id == node_id_before
41+
42+
@pytest.mark.asyncio
43+
async def test_second_delete_do_nothing(self, session: QuerySessionAsync):
44+
await session.create()
45+
46+
await session.delete()
47+
await session.delete()
48+
49+
@pytest.mark.asyncio
50+
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync):
51+
with pytest.raises(RuntimeError):
52+
await session.delete()
53+
54+
@pytest.mark.asyncio
55+
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync):
56+
await session.create()
57+
await session.delete()
58+
with pytest.raises(RuntimeError):
59+
await session.create()
60+
61+
# def test_transaction_before_create_raises(self, session: QuerySessionAsync):
62+
# with pytest.raises(RuntimeError):
63+
# session.transaction()
64+
65+
# def test_transaction_after_delete_raises(self, session: QuerySessionAsync):
66+
# session.create()
67+
68+
# session.delete()
69+
70+
# with pytest.raises(RuntimeError):
71+
# session.transaction()
72+
73+
# def test_transaction_after_create_not_raises(self, session: QuerySessionAsync):
74+
# session.create()
75+
# session.transaction()
76+
77+
@pytest.mark.asyncio
78+
async def test_execute_before_create_raises(self, session: QuerySessionAsync):
79+
with pytest.raises(RuntimeError):
80+
await session.execute("select 1;")
81+
82+
@pytest.mark.asyncio
83+
async def test_execute_after_delete_raises(self, session: QuerySessionAsync):
84+
await session.create()
85+
await session.delete()
86+
with pytest.raises(RuntimeError):
87+
await session.execute("select 1;")
88+
89+
@pytest.mark.asyncio
90+
async def test_basic_execute(self, session: QuerySessionAsync):
91+
await session.create()
92+
it = await session.execute("select 1;")
93+
result_sets = [result_set async for result_set in it]
94+
95+
assert len(result_sets) == 1
96+
assert len(result_sets[0].rows) == 1
97+
assert len(result_sets[0].columns) == 1
98+
assert list(result_sets[0].rows[0].values()) == [1]
99+
100+
@pytest.mark.asyncio
101+
async def test_two_results(self, session: QuerySessionAsync):
102+
await session.create()
103+
res = []
104+
105+
async with await session.execute("select 1; select 2") as results:
106+
async for result_set in results:
107+
if len(result_set.rows) > 0:
108+
res.append(list(result_set.rows[0].values()))
109+
110+
assert res == [[1], [2]]

ydb/aio/query/__init__.py

Whitespace-only changes.

ydb/aio/query/session.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import asyncio
2+
3+
from typing import (
4+
Optional,
5+
)
6+
7+
from .. import _utilities
8+
from ... import issues
9+
from ..._grpc.grpcwrapper import common_utils
10+
from ...query import base
11+
from ...query.session import (
12+
BaseQuerySession,
13+
QuerySessionStateEnum,
14+
)
15+
16+
17+
class AsyncResponseContextIterator(_utilities.AsyncResponseIterator):
18+
async def __aenter__(self) -> "AsyncResponseContextIterator":
19+
return self
20+
21+
async def __aexit__(self, exc_type, exc_val, exc_tb):
22+
async for _ in self:
23+
pass
24+
25+
26+
class QuerySessionAsync(BaseQuerySession):
27+
"""Session object for Query Service. It is not recommended to control
28+
session's lifecycle manually - use a QuerySessionPool is always a better choise.
29+
"""
30+
31+
_loop: asyncio.AbstractEventLoop
32+
_status_stream: _utilities.AsyncResponseIterator = None
33+
34+
def __init__(self, driver: base.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
35+
super(QuerySessionAsync, self).__init__(driver, settings)
36+
self._loop = asyncio.get_running_loop()
37+
38+
async def _attach(self) -> None:
39+
self._stream = await self._attach_call()
40+
self._status_stream = _utilities.AsyncResponseIterator(
41+
self._stream,
42+
lambda response: common_utils.ServerStatus.from_proto(response),
43+
)
44+
45+
first_response = await self._status_stream.next()
46+
if first_response.status != issues.StatusCode.SUCCESS:
47+
pass
48+
49+
self._state.set_attached(True)
50+
self._state._change_state(QuerySessionStateEnum.CREATED)
51+
52+
self._loop.create_task(self._check_session_status_loop(), name="check session status task")
53+
54+
55+
async def _check_session_status_loop(self) -> None:
56+
try:
57+
async for status in self._status_stream:
58+
if status.status != issues.StatusCode.SUCCESS:
59+
self._state.reset()
60+
self._state._change_state(QuerySessionStateEnum.CLOSED)
61+
except Exception:
62+
pass
63+
64+
async def delete(self) -> None:
65+
"""WARNING: This API is experimental and could be changed.
66+
67+
Deletes a Session of Query Service on server side and releases resources.
68+
69+
:return: None
70+
"""
71+
if self._state._already_in(QuerySessionStateEnum.CLOSED):
72+
return
73+
74+
self._state._check_invalid_transition(QuerySessionStateEnum.CLOSED)
75+
await self._delete_call()
76+
self._stream.cancel()
77+
78+
async def create(self) -> "QuerySessionAsync":
79+
"""WARNING: This API is experimental and could be changed.
80+
81+
Creates a Session of Query Service on server side and attaches it.
82+
83+
:return: QuerySessionSync object.
84+
"""
85+
if self._state._already_in(QuerySessionStateEnum.CREATED):
86+
return
87+
88+
self._state._check_invalid_transition(QuerySessionStateEnum.CREATED)
89+
await self._create_call()
90+
await self._attach()
91+
92+
return self
93+
94+
async def transaction(self, tx_mode) -> base.IQueryTxContext:
95+
return super().transaction(tx_mode)
96+
97+
async def execute(
98+
self,
99+
query: str,
100+
parameters: dict = None,
101+
commit_tx: bool = False,
102+
syntax: base.QuerySyntax = None,
103+
exec_mode: base.QueryExecMode = None,
104+
concurrent_result_sets: bool = False,
105+
) -> AsyncResponseContextIterator:
106+
"""WARNING: This API is experimental and could be changed.
107+
108+
Sends a query to Query Service
109+
:param query: (YQL or SQL text) to be executed.
110+
:param syntax: Syntax of the query, which is a one from the following choises:
111+
1) QuerySyntax.YQL_V1, which is default;
112+
2) QuerySyntax.PG.
113+
:param parameters: dict with parameters and YDB types;
114+
:param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False;
115+
116+
:return: Iterator with result sets
117+
"""
118+
self._state._check_session_ready_to_use()
119+
120+
stream_it = await self._execute_call(
121+
query=query,
122+
commit_tx=True,
123+
syntax=syntax,
124+
exec_mode=exec_mode,
125+
parameters=parameters,
126+
concurrent_result_sets=concurrent_result_sets,
127+
)
128+
129+
return AsyncResponseContextIterator(
130+
stream_it,
131+
lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp),
132+
)

0 commit comments

Comments
 (0)