Skip to content

Commit d227971

Browse files
committed
query service asyncio example
1 parent 6266f08 commit d227971

File tree

4 files changed

+154
-1
lines changed

4 files changed

+154
-1
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
import asyncio
2+
import ydb
3+
4+
5+
async def main():
6+
driver_config = ydb.DriverConfig(
7+
endpoint="grpc://localhost:2136",
8+
database="/local",
9+
# credentials=ydb.credentials_from_env_variables(),
10+
# root_certificates=ydb.load_ydb_root_certificate(),
11+
)
12+
try:
13+
driver = ydb.aio.Driver(driver_config)
14+
await driver.wait(timeout=5)
15+
except TimeoutError:
16+
raise RuntimeError("Connect failed to YDB")
17+
18+
pool = ydb.aio.QuerySessionPoolAsync(driver)
19+
20+
print("=" * 50)
21+
print("DELETE TABLE IF EXISTS")
22+
await pool.execute_with_retries("DROP TABLE IF EXISTS example")
23+
24+
print("=" * 50)
25+
print("CREATE TABLE")
26+
await pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))")
27+
28+
await pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')")
29+
30+
async def callee(session):
31+
print("=" * 50)
32+
async with await session.execute("DELETE FROM example"):
33+
pass
34+
35+
print("BEFORE ACTION")
36+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
37+
async for result_set in results:
38+
print(f"rows: {str(result_set.rows)}")
39+
40+
print("=" * 50)
41+
print("INSERT WITH COMMIT TX")
42+
43+
async with session.transaction() as tx:
44+
await tx.begin()
45+
46+
async with await tx.execute("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')"):
47+
pass
48+
49+
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
50+
async for result_set in results:
51+
print(f"rows: {str(result_set.rows)}")
52+
53+
await tx.commit()
54+
55+
print("=" * 50)
56+
print("AFTER COMMIT TX")
57+
58+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
59+
async for result_set in results:
60+
print(f"rows: {str(result_set.rows)}")
61+
62+
print("=" * 50)
63+
print("INSERT WITH ROLLBACK TX")
64+
65+
async with session.transaction() as tx:
66+
await tx.begin()
67+
68+
async with await tx.execute("INSERT INTO example (key, value) VALUES (2, 'onepieceisreal')"):
69+
pass
70+
71+
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
72+
async for result_set in results:
73+
print(f"rows: {str(result_set.rows)}")
74+
75+
await tx.rollback()
76+
77+
print("=" * 50)
78+
print("AFTER ROLLBACK TX")
79+
80+
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
81+
async for result_set in results:
82+
print(f"rows: {str(result_set.rows)}")
83+
84+
await pool.retry_operation_async(callee)
85+
86+
async def callee(session: ydb.aio.QuerySessionAsync):
87+
query_print = """select $a"""
88+
89+
print("=" * 50)
90+
print("Check implicit typed parameters")
91+
92+
values = [
93+
1,
94+
1.0,
95+
True,
96+
"text",
97+
{"4": 8, "15": 16, "23": 42},
98+
[{"name": "Michael"}, {"surname": "Scott"}],
99+
]
100+
101+
for value in values:
102+
print(f"value: {value}")
103+
async with await session.transaction().execute(
104+
query=query_print,
105+
parameters={"$a": value},
106+
commit_tx=True,
107+
) as results:
108+
async for result_set in results:
109+
print(f"rows: {str(result_set.rows)}")
110+
111+
print("=" * 50)
112+
print("Check typed parameters as tuple pair")
113+
114+
typed_value = ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))
115+
print(f"value: {typed_value}")
116+
117+
async with await session.transaction().execute(
118+
query=query_print,
119+
parameters={"$a": typed_value},
120+
commit_tx=True,
121+
) as results:
122+
async for result_set in results:
123+
print(f"rows: {str(result_set.rows)}")
124+
125+
print("=" * 50)
126+
print("Check typed parameters as ydb.TypedValue")
127+
128+
typed_value = ydb.TypedValue(111, ydb.PrimitiveType.Int64)
129+
print(f"value: {typed_value}")
130+
131+
async with await session.transaction().execute(
132+
query=query_print,
133+
parameters={"$a": typed_value},
134+
commit_tx=True,
135+
) as results:
136+
async for result_set in results:
137+
print(f"rows: {str(result_set.rows)}")
138+
139+
await pool.retry_operation_async(callee)
140+
141+
142+
if __name__ == "__main__":
143+
asyncio.run(main())

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from google.protobuf.duration_pb2 import Duration as ProtoDuration
2525
from google.protobuf.timestamp_pb2 import Timestamp as ProtoTimeStamp
2626

27+
from ...driver import Driver
28+
from ...aio.driver import Driver as DriverIO
2729
import ydb.aio
2830

2931
# Workaround for good IDE and universal for runtime
@@ -142,7 +144,7 @@ def close(self):
142144
...
143145

144146

145-
SupportedDriverType = Union[ydb.Driver, ydb.aio.Driver]
147+
SupportedDriverType = Union[Driver, DriverIO]
146148

147149

148150
class GrpcWrapperAsyncIO(IGrpcWrapperAsyncIO):

ydb/aio/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
from .driver import Driver # noqa
22
from .table import SessionPool, retry_operation # noqa
3+
from .query import QuerySessionPoolAsync, QuerySessionAsync # noqa

ydb/aio/query/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
__all__ = [
2+
"QuerySessionPoolAsync",
3+
"QuerySessionAsync",
4+
]
5+
6+
from .pool import QuerySessionPoolAsync
7+
from .session import QuerySessionAsync

0 commit comments

Comments
 (0)