Skip to content

Commit 7bfb416

Browse files
authored
Merge pull request #467 from ydb-platform/query_service_async
Query Service AIO
2 parents 663a283 + 145054f commit 7bfb416

21 files changed

+966
-304
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import asyncio
2+
import ydb
3+
4+
5+
async def main():
6+
driver_config = ydb.DriverConfig(
7+
endpoint="grpc://localhost:2136",
8+
database="/local",
9+
# credentials=ydb.credentials_from_env_variables(),
10+
# root_certificates=ydb.load_ydb_root_certificate(),
11+
)
12+
try:
13+
driver = ydb.aio.Driver(driver_config)
14+
await driver.wait(timeout=5)
15+
except TimeoutError:
16+
raise RuntimeError("Connect failed to YDB")
17+
18+
pool = ydb.aio.QuerySessionPoolAsync(driver)
19+
20+
print("=" * 50)
21+
print("DELETE TABLE IF EXISTS")
22+
await pool.execute_with_retries("DROP TABLE IF EXISTS example")
23+
24+
print("=" * 50)
25+
print("CREATE TABLE")
26+
await pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))")
27+
28+
await pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')")
29+
30+
async def callee(session):
31+
print("=" * 50)
32+
async with await session.execute("DELETE FROM example"):
33+
pass
34+
35+
print("BEFORE ACTION")
36+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
37+
async for result_set in results:
38+
print(f"rows: {str(result_set.rows)}")
39+
40+
print("=" * 50)
41+
print("INSERT WITH COMMIT TX")
42+
43+
async with session.transaction() as tx:
44+
await tx.begin()
45+
46+
async with await tx.execute("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')"):
47+
pass
48+
49+
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
50+
async for result_set in results:
51+
print(f"rows: {str(result_set.rows)}")
52+
53+
await tx.commit()
54+
55+
print("=" * 50)
56+
print("AFTER COMMIT TX")
57+
58+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
59+
async for result_set in results:
60+
print(f"rows: {str(result_set.rows)}")
61+
62+
print("=" * 50)
63+
print("INSERT WITH ROLLBACK TX")
64+
65+
async with session.transaction() as tx:
66+
await tx.begin()
67+
68+
async with await tx.execute("INSERT INTO example (key, value) VALUES (2, 'onepieceisreal')"):
69+
pass
70+
71+
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
72+
async for result_set in results:
73+
print(f"rows: {str(result_set.rows)}")
74+
75+
await tx.rollback()
76+
77+
print("=" * 50)
78+
print("AFTER ROLLBACK TX")
79+
80+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
81+
async for result_set in results:
82+
print(f"rows: {str(result_set.rows)}")
83+
84+
await pool.retry_operation_async(callee)
85+
86+
async def callee(session: ydb.aio.QuerySessionAsync):
87+
query_print = """select $a"""
88+
89+
print("=" * 50)
90+
print("Check implicit typed parameters")
91+
92+
values = [
93+
1,
94+
1.0,
95+
True,
96+
"text",
97+
{"4": 8, "15": 16, "23": 42},
98+
[{"name": "Michael"}, {"surname": "Scott"}],
99+
]
100+
101+
for value in values:
102+
print(f"value: {value}")
103+
async with await session.transaction().execute(
104+
query=query_print,
105+
parameters={"$a": value},
106+
commit_tx=True,
107+
) as results:
108+
async for result_set in results:
109+
print(f"rows: {str(result_set.rows)}")
110+
111+
print("=" * 50)
112+
print("Check typed parameters as tuple pair")
113+
114+
typed_value = ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))
115+
print(f"value: {typed_value}")
116+
117+
async with await session.transaction().execute(
118+
query=query_print,
119+
parameters={"$a": typed_value},
120+
commit_tx=True,
121+
) as results:
122+
async for result_set in results:
123+
print(f"rows: {str(result_set.rows)}")
124+
125+
print("=" * 50)
126+
print("Check typed parameters as ydb.TypedValue")
127+
128+
typed_value = ydb.TypedValue(111, ydb.PrimitiveType.Int64)
129+
print(f"value: {typed_value}")
130+
131+
async with await session.transaction().execute(
132+
query=query_print,
133+
parameters={"$a": typed_value},
134+
commit_tx=True,
135+
) as results:
136+
async for result_set in results:
137+
print(f"rows: {str(result_set.rows)}")
138+
139+
await pool.retry_operation_async(callee)
140+
141+
142+
if __name__ == "__main__":
143+
asyncio.run(main())

tests/aio/query/__init__.py

Whitespace-only changes.

tests/aio/query/conftest.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import pytest
2+
from ydb.aio.query.session import QuerySessionAsync
3+
from ydb.aio.query.pool import QuerySessionPoolAsync
4+
5+
6+
@pytest.fixture
7+
async def session(driver):
8+
session = QuerySessionAsync(driver)
9+
10+
yield session
11+
12+
try:
13+
await session.delete()
14+
except BaseException:
15+
pass
16+
17+
18+
@pytest.fixture
19+
async def tx(session):
20+
await session.create()
21+
transaction = session.transaction()
22+
23+
yield transaction
24+
25+
try:
26+
await transaction.rollback()
27+
except BaseException:
28+
pass
29+
30+
31+
@pytest.fixture
32+
def pool(driver):
33+
pool = QuerySessionPoolAsync(driver)
34+
yield pool

tests/aio/query/test_query_session.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import pytest
2+
from ydb.aio.query.session import QuerySessionAsync
3+
4+
5+
def _check_session_state_empty(session: QuerySessionAsync):
6+
assert session._state.session_id is None
7+
assert session._state.node_id is None
8+
assert not session._state.attached
9+
10+
11+
def _check_session_state_full(session: QuerySessionAsync):
12+
assert session._state.session_id is not None
13+
assert session._state.node_id is not None
14+
assert session._state.attached
15+
16+
17+
class TestAsyncQuerySession:
18+
@pytest.mark.asyncio
19+
async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
20+
_check_session_state_empty(session)
21+
22+
await session.create()
23+
_check_session_state_full(session)
24+
25+
await session.delete()
26+
_check_session_state_empty(session)
27+
28+
@pytest.mark.asyncio
29+
async def test_second_create_do_nothing(self, session: QuerySessionAsync):
30+
await session.create()
31+
_check_session_state_full(session)
32+
33+
session_id_before = session._state.session_id
34+
node_id_before = session._state.node_id
35+
36+
await session.create()
37+
_check_session_state_full(session)
38+
39+
assert session._state.session_id == session_id_before
40+
assert session._state.node_id == node_id_before
41+
42+
@pytest.mark.asyncio
43+
async def test_second_delete_do_nothing(self, session: QuerySessionAsync):
44+
await session.create()
45+
46+
await session.delete()
47+
await session.delete()
48+
49+
@pytest.mark.asyncio
50+
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync):
51+
with pytest.raises(RuntimeError):
52+
await session.delete()
53+
54+
@pytest.mark.asyncio
55+
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync):
56+
await session.create()
57+
await session.delete()
58+
with pytest.raises(RuntimeError):
59+
await session.create()
60+
61+
def test_transaction_before_create_raises(self, session: QuerySessionAsync):
62+
with pytest.raises(RuntimeError):
63+
session.transaction()
64+
65+
@pytest.mark.asyncio
66+
async def test_transaction_after_delete_raises(self, session: QuerySessionAsync):
67+
await session.create()
68+
69+
await session.delete()
70+
71+
with pytest.raises(RuntimeError):
72+
session.transaction()
73+
74+
@pytest.mark.asyncio
75+
async def test_transaction_after_create_not_raises(self, session: QuerySessionAsync):
76+
await session.create()
77+
session.transaction()
78+
79+
@pytest.mark.asyncio
80+
async def test_execute_before_create_raises(self, session: QuerySessionAsync):
81+
with pytest.raises(RuntimeError):
82+
await session.execute("select 1;")
83+
84+
@pytest.mark.asyncio
85+
async def test_execute_after_delete_raises(self, session: QuerySessionAsync):
86+
await session.create()
87+
await session.delete()
88+
with pytest.raises(RuntimeError):
89+
await session.execute("select 1;")
90+
91+
@pytest.mark.asyncio
92+
async def test_basic_execute(self, session: QuerySessionAsync):
93+
await session.create()
94+
it = await session.execute("select 1;")
95+
result_sets = [result_set async for result_set in it]
96+
97+
assert len(result_sets) == 1
98+
assert len(result_sets[0].rows) == 1
99+
assert len(result_sets[0].columns) == 1
100+
assert list(result_sets[0].rows[0].values()) == [1]
101+
102+
@pytest.mark.asyncio
103+
async def test_two_results(self, session: QuerySessionAsync):
104+
await session.create()
105+
res = []
106+
107+
async with await session.execute("select 1; select 2") as results:
108+
async for result_set in results:
109+
if len(result_set.rows) > 0:
110+
res.append(list(result_set.rows[0].values()))
111+
112+
assert res == [[1], [2]]
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import pytest
2+
import ydb
3+
from ydb.aio.query.pool import QuerySessionPoolAsync
4+
from ydb.aio.query.session import QuerySessionAsync, QuerySessionStateEnum
5+
6+
7+
class TestQuerySessionPoolAsync:
8+
@pytest.mark.asyncio
9+
async def test_checkout_provides_created_session(self, pool: QuerySessionPoolAsync):
10+
async with pool.checkout() as session:
11+
assert session._state._state == QuerySessionStateEnum.CREATED
12+
13+
assert session._state._state == QuerySessionStateEnum.CLOSED
14+
15+
@pytest.mark.asyncio
16+
async def test_oneshot_query_normal(self, pool: QuerySessionPoolAsync):
17+
res = await pool.execute_with_retries("select 1;")
18+
assert len(res) == 1
19+
20+
@pytest.mark.asyncio
21+
async def test_oneshot_ddl_query(self, pool: QuerySessionPoolAsync):
22+
await pool.execute_with_retries("create table Queen(key UInt64, PRIMARY KEY (key));")
23+
await pool.execute_with_retries("drop table Queen;")
24+
25+
@pytest.mark.asyncio
26+
async def test_oneshot_query_raises(self, pool: QuerySessionPoolAsync):
27+
with pytest.raises(ydb.GenericError):
28+
await pool.execute_with_retries("Is this the real life? Is this just fantasy?")
29+
30+
@pytest.mark.asyncio
31+
async def test_retry_op_uses_created_session(self, pool: QuerySessionPoolAsync):
32+
async def callee(session: QuerySessionAsync):
33+
assert session._state._state == QuerySessionStateEnum.CREATED
34+
35+
await pool.retry_operation_async(callee)
36+
37+
@pytest.mark.asyncio
38+
async def test_retry_op_normal(self, pool: QuerySessionPoolAsync):
39+
async def callee(session: QuerySessionAsync):
40+
async with session.transaction() as tx:
41+
iterator = await tx.execute("select 1;", commit_tx=True)
42+
return [result_set async for result_set in iterator]
43+
44+
res = await pool.retry_operation_async(callee)
45+
assert len(res) == 1
46+
47+
@pytest.mark.asyncio
48+
async def test_retry_op_raises(self, pool: QuerySessionPoolAsync):
49+
class CustomException(Exception):
50+
pass
51+
52+
async def callee(session: QuerySessionAsync):
53+
raise CustomException()
54+
55+
with pytest.raises(CustomException):
56+
await pool.retry_operation_async(callee)

0 commit comments

Comments
 (0)