Skip to content

Commit f3bd613

Browse files
committed
Extend example with async mode
1 parent 145054f commit f3bd613

File tree

7 files changed

+355
-10
lines changed

7 files changed

+355
-10
lines changed

examples/basic_example_v2/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# YDB Python SDK Example: basic_example_v2
22

33
Example code demonstrating the basic YDB Python SDK operations.
4+
Example is awailable in two modes:
5+
1. `sync` - synchronous implementation;
6+
1. `async` - asynchronous implementation using asyncio.
7+
8+
To spesify mode, use argument `-m async` or `--mode async`.
49

510
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/basic_example_v2/__main__.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# -*- coding: utf-8 -*-
22
import argparse
3-
import basic_example
3+
import asyncio
4+
from basic_example import run as run_sync
5+
from basic_example_async import run as run_async
46
import logging
57

68

@@ -13,16 +15,29 @@
1315
parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136")
1416
parser.add_argument("-p", "--path", default="")
1517
parser.add_argument("-v", "--verbose", default=False, action="store_true")
18+
parser.add_argument("-m", "--mode", default="sync", help="Mode of example: sync or async")
1619

1720
args = parser.parse_args()
1821

1922
if args.verbose:
2023
logger = logging.getLogger("ydb.pool.Discovery")
2124
logger.setLevel(logging.INFO)
2225
logger.addHandler(logging.StreamHandler())
23-
24-
basic_example.run(
25-
args.endpoint,
26-
args.database,
27-
args.path,
28-
)
26+
if args.mode == "sync":
27+
print("Running sync example")
28+
run_sync(
29+
args.endpoint,
30+
args.database,
31+
args.path,
32+
)
33+
elif args.mode == "async":
34+
print("Running async example")
35+
asyncio.run(
36+
run_async(
37+
args.endpoint,
38+
args.database,
39+
args.path,
40+
)
41+
)
42+
else:
43+
raise ValueError(f"Unsupported mode: {args.mode}, use one of sync|async")
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
# -*- coding: utf-8 -*-
2+
import posixpath
3+
import ydb
4+
import basic_example_data
5+
6+
# Table path prefix allows to put the working tables into the specific directory
7+
# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8+
# at the beginning of the query allows to reference the tables through
9+
# their names "under" the specified directory.
10+
#
11+
# TablePathPrefix has to be defined as an absolute path, which has to be started
12+
# with the current database location.
13+
#
14+
# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15+
16+
DropTablesQuery = """PRAGMA TablePathPrefix("{}");
17+
DROP TABLE IF EXISTS series;
18+
DROP TABLE IF EXISTS seasons;
19+
DROP TABLE IF EXISTS episodes;
20+
"""
21+
22+
FillDataQuery = """PRAGMA TablePathPrefix("{}");
23+
24+
DECLARE $seriesData AS List<Struct<
25+
series_id: Int64,
26+
title: Utf8,
27+
series_info: Utf8,
28+
release_date: Date>>;
29+
30+
DECLARE $seasonsData AS List<Struct<
31+
series_id: Int64,
32+
season_id: Int64,
33+
title: Utf8,
34+
first_aired: Date,
35+
last_aired: Date>>;
36+
37+
DECLARE $episodesData AS List<Struct<
38+
series_id: Int64,
39+
season_id: Int64,
40+
episode_id: Int64,
41+
title: Utf8,
42+
air_date: Date>>;
43+
44+
REPLACE INTO series
45+
SELECT
46+
series_id,
47+
title,
48+
series_info,
49+
release_date
50+
FROM AS_TABLE($seriesData);
51+
52+
REPLACE INTO seasons
53+
SELECT
54+
series_id,
55+
season_id,
56+
title,
57+
first_aired,
58+
last_aired
59+
FROM AS_TABLE($seasonsData);
60+
61+
REPLACE INTO episodes
62+
SELECT
63+
series_id,
64+
season_id,
65+
episode_id,
66+
title,
67+
air_date
68+
FROM AS_TABLE($episodesData);
69+
"""
70+
71+
72+
async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync, path: str):
73+
print("\nFilling tables with data...")
74+
75+
query = FillDataQuery.format(path)
76+
77+
await pool.execute_with_retries(
78+
query,
79+
{
80+
"$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()),
81+
"$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()),
82+
"$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()),
83+
},
84+
)
85+
86+
87+
async def select_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str):
88+
print("\nCheck series table...")
89+
result_sets = await pool.execute_with_retries(
90+
"""
91+
PRAGMA TablePathPrefix("{}");
92+
SELECT
93+
series_id,
94+
title,
95+
release_date
96+
FROM series
97+
WHERE series_id = 1;
98+
""".format(
99+
path
100+
),
101+
)
102+
first_set = result_sets[0]
103+
for row in first_set.rows:
104+
print(
105+
"series, id: ",
106+
row.series_id,
107+
", title: ",
108+
row.title,
109+
", release date: ",
110+
row.release_date,
111+
)
112+
113+
return first_set
114+
115+
116+
async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str):
117+
print("\nPerforming UPSERT into episodes...")
118+
119+
await pool.execute_with_retries(
120+
"""
121+
PRAGMA TablePathPrefix("{}");
122+
UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
123+
""".format(
124+
path
125+
)
126+
)
127+
128+
129+
async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id):
130+
result_sets = await pool.execute_with_retries(
131+
"""
132+
PRAGMA TablePathPrefix("{}");
133+
SELECT
134+
title,
135+
air_date
136+
FROM episodes
137+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
138+
""".format(
139+
path
140+
),
141+
{
142+
"$seriesId": series_id, # could be defined implicit
143+
"$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple
144+
"$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class
145+
},
146+
)
147+
148+
print("\n> select_with_parameters:")
149+
first_set = result_sets[0]
150+
for row in first_set.rows:
151+
print("episode title:", row.title, ", air date:", row.air_date)
152+
153+
return first_set
154+
155+
156+
# Show usage of explicit Begin/Commit transaction control calls.
157+
# In most cases it's better to use transaction control settings in session.transaction
158+
# calls instead to avoid additional hops to YDB cluster and allow more efficient
159+
# execution of queries.
160+
async def explicit_transaction_control(pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id):
161+
async def callee(session: ydb.aio.QuerySessionAsync):
162+
query = """
163+
PRAGMA TablePathPrefix("{}");
164+
UPDATE episodes
165+
SET air_date = CurrentUtcDate()
166+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
167+
""".format(
168+
path
169+
)
170+
171+
# Get newly created transaction id
172+
tx = await session.transaction(ydb.QuerySerializableReadWrite()).begin()
173+
174+
# Execute data query.
175+
# Transaction control settings continues active transaction (tx)
176+
async with await tx.execute(
177+
query,
178+
{
179+
"$seriesId": (series_id, ydb.PrimitiveType.Int64),
180+
"$seasonId": (season_id, ydb.PrimitiveType.Int64),
181+
"$episodeId": (episode_id, ydb.PrimitiveType.Int64),
182+
},
183+
) as _:
184+
pass
185+
186+
print("\n> explicit TCL call")
187+
188+
# Commit active transaction(tx)
189+
await tx.commit()
190+
191+
return await pool.retry_operation_async(callee)
192+
193+
194+
async def drop_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str):
195+
print("\nCleaning up existing tables...")
196+
await pool.execute_with_retries(DropTablesQuery.format(path))
197+
198+
199+
async def create_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str):
200+
print("\nCreating table series...")
201+
await pool.execute_with_retries(
202+
"""
203+
PRAGMA TablePathPrefix("{}");
204+
CREATE table `series` (
205+
`series_id` Int64,
206+
`title` Utf8,
207+
`series_info` Utf8,
208+
`release_date` Date,
209+
PRIMARY KEY (`series_id`)
210+
)
211+
""".format(
212+
path
213+
)
214+
)
215+
216+
print("\nCreating table seasons...")
217+
await pool.execute_with_retries(
218+
"""
219+
PRAGMA TablePathPrefix("{}");
220+
CREATE table `seasons` (
221+
`series_id` Int64,
222+
`season_id` Int64,
223+
`title` Utf8,
224+
`first_aired` Date,
225+
`last_aired` Date,
226+
PRIMARY KEY (`series_id`, `season_id`)
227+
)
228+
""".format(
229+
path
230+
)
231+
)
232+
233+
print("\nCreating table episodes...")
234+
await pool.execute_with_retries(
235+
"""
236+
PRAGMA TablePathPrefix("{}");
237+
CREATE table `episodes` (
238+
`series_id` Int64,
239+
`season_id` Int64,
240+
`episode_id` Int64,
241+
`title` Utf8,
242+
`air_date` Date,
243+
PRIMARY KEY (`series_id`, `season_id`, `episode_id`)
244+
)
245+
""".format(
246+
path
247+
)
248+
)
249+
250+
251+
async def is_directory_exists(driver: ydb.aio.Driver, path: str):
252+
try:
253+
return await driver.scheme_client.describe_path(path).is_directory()
254+
except ydb.SchemeError:
255+
return False
256+
257+
258+
async def ensure_path_exists(driver: ydb.aio.Driver, database, path):
259+
paths_to_create = list()
260+
path = path.rstrip("/")
261+
while path not in ("", database):
262+
full_path = posixpath.join(database, path)
263+
if await is_directory_exists(driver, full_path):
264+
break
265+
paths_to_create.append(full_path)
266+
path = posixpath.dirname(path).rstrip("/")
267+
268+
while len(paths_to_create) > 0:
269+
full_path = paths_to_create.pop(-1)
270+
await driver.scheme_client.make_directory(full_path)
271+
272+
273+
async def run(endpoint, database, path):
274+
async with ydb.aio.Driver(
275+
endpoint=endpoint,
276+
database=database,
277+
credentials=ydb.credentials_from_env_variables(),
278+
) as driver:
279+
await driver.wait(timeout=5, fail_fast=True)
280+
281+
async with ydb.aio.QuerySessionPoolAsync(driver) as pool:
282+
283+
await ensure_path_exists(driver, database, path)
284+
285+
# absolute path - prefix to the table's names,
286+
# including the database location
287+
full_path = posixpath.join(database, path)
288+
289+
await drop_tables(pool, full_path)
290+
291+
await create_tables(pool, full_path)
292+
293+
await fill_tables_with_data(pool, full_path)
294+
295+
await select_simple(pool, full_path)
296+
297+
await upsert_simple(pool, full_path)
298+
299+
await select_with_parameters(pool, full_path, 2, 3, 7)
300+
await select_with_parameters(pool, full_path, 2, 3, 8)
301+
302+
await explicit_transaction_control(pool, full_path, 2, 6, 1)
303+
await select_with_parameters(pool, full_path, 2, 6, 1)

0 commit comments

Comments
 (0)