Skip to content

Commit 766e863

Browse files
authored
Merge pull request #471 from ydb-platform/extend_basic_example_async
Extend basic example async
2 parents 9247707 + b15a1dd commit 766e863

File tree

7 files changed

+357
-10
lines changed

7 files changed

+357
-10
lines changed

examples/basic_example_v2/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# YDB Python SDK Example: basic_example_v2
22

33
Example code demonstrating the basic YDB Python SDK operations.
4+
Example is awailable in two modes:
5+
1. `sync` - synchronous implementation;
6+
1. `async` - asynchronous implementation using asyncio.
7+
8+
To spesify mode, use argument `-m async` or `--mode async`.
49

510
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/basic_example_v2/__main__.py

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# -*- coding: utf-8 -*-
22
import argparse
3-
import basic_example
3+
import asyncio
4+
from basic_example import run as run_sync
5+
from basic_example_async import run as run_async
46
import logging
57

68

@@ -13,16 +15,29 @@
1315
parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136")
1416
parser.add_argument("-p", "--path", default="")
1517
parser.add_argument("-v", "--verbose", default=False, action="store_true")
18+
parser.add_argument("-m", "--mode", default="sync", help="Mode of example: sync or async")
1619

1720
args = parser.parse_args()
1821

1922
if args.verbose:
2023
logger = logging.getLogger("ydb.pool.Discovery")
2124
logger.setLevel(logging.INFO)
2225
logger.addHandler(logging.StreamHandler())
23-
24-
basic_example.run(
25-
args.endpoint,
26-
args.database,
27-
args.path,
28-
)
26+
if args.mode == "sync":
27+
print("Running sync example")
28+
run_sync(
29+
args.endpoint,
30+
args.database,
31+
args.path,
32+
)
33+
elif args.mode == "async":
34+
print("Running async example")
35+
asyncio.run(
36+
run_async(
37+
args.endpoint,
38+
args.database,
39+
args.path,
40+
)
41+
)
42+
else:
43+
raise ValueError(f"Unsupported mode: {args.mode}, use one of sync|async")
Lines changed: 305 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,305 @@
1+
# -*- coding: utf-8 -*-
2+
import posixpath
3+
import ydb
4+
import basic_example_data
5+
6+
# Table path prefix allows to put the working tables into the specific directory
7+
# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8+
# at the beginning of the query allows to reference the tables through
9+
# their names "under" the specified directory.
10+
#
11+
# TablePathPrefix has to be defined as an absolute path, which has to be started
12+
# with the current database location.
13+
#
14+
# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15+
16+
DropTablesQuery = """PRAGMA TablePathPrefix("{}");
17+
DROP TABLE IF EXISTS series;
18+
DROP TABLE IF EXISTS seasons;
19+
DROP TABLE IF EXISTS episodes;
20+
"""
21+
22+
FillDataQuery = """PRAGMA TablePathPrefix("{}");
23+
24+
DECLARE $seriesData AS List<Struct<
25+
series_id: Int64,
26+
title: Utf8,
27+
series_info: Utf8,
28+
release_date: Date>>;
29+
30+
DECLARE $seasonsData AS List<Struct<
31+
series_id: Int64,
32+
season_id: Int64,
33+
title: Utf8,
34+
first_aired: Date,
35+
last_aired: Date>>;
36+
37+
DECLARE $episodesData AS List<Struct<
38+
series_id: Int64,
39+
season_id: Int64,
40+
episode_id: Int64,
41+
title: Utf8,
42+
air_date: Date>>;
43+
44+
REPLACE INTO series
45+
SELECT
46+
series_id,
47+
title,
48+
series_info,
49+
release_date
50+
FROM AS_TABLE($seriesData);
51+
52+
REPLACE INTO seasons
53+
SELECT
54+
series_id,
55+
season_id,
56+
title,
57+
first_aired,
58+
last_aired
59+
FROM AS_TABLE($seasonsData);
60+
61+
REPLACE INTO episodes
62+
SELECT
63+
series_id,
64+
season_id,
65+
episode_id,
66+
title,
67+
air_date
68+
FROM AS_TABLE($episodesData);
69+
"""
70+
71+
72+
async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync, path: str):
73+
print("\nFilling tables with data...")
74+
75+
query = FillDataQuery.format(path)
76+
77+
await pool.execute_with_retries(
78+
query,
79+
{
80+
"$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()),
81+
"$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()),
82+
"$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()),
83+
},
84+
)
85+
86+
87+
async def select_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str):
88+
print("\nCheck series table...")
89+
result_sets = await pool.execute_with_retries(
90+
"""
91+
PRAGMA TablePathPrefix("{}");
92+
SELECT
93+
series_id,
94+
title,
95+
release_date
96+
FROM series
97+
WHERE series_id = 1;
98+
""".format(
99+
path
100+
),
101+
)
102+
first_set = result_sets[0]
103+
for row in first_set.rows:
104+
print(
105+
"series, id: ",
106+
row.series_id,
107+
", title: ",
108+
row.title,
109+
", release date: ",
110+
row.release_date,
111+
)
112+
113+
return first_set
114+
115+
116+
async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str):
117+
print("\nPerforming UPSERT into episodes...")
118+
119+
await pool.execute_with_retries(
120+
"""
121+
PRAGMA TablePathPrefix("{}");
122+
UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
123+
""".format(
124+
path
125+
)
126+
)
127+
128+
129+
async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id):
130+
result_sets = await pool.execute_with_retries(
131+
"""
132+
PRAGMA TablePathPrefix("{}");
133+
SELECT
134+
title,
135+
air_date
136+
FROM episodes
137+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
138+
""".format(
139+
path
140+
),
141+
{
142+
"$seriesId": series_id, # could be defined implicit
143+
"$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple
144+
"$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class
145+
},
146+
)
147+
148+
print("\n> select_with_parameters:")
149+
first_set = result_sets[0]
150+
for row in first_set.rows:
151+
print("episode title:", row.title, ", air date:", row.air_date)
152+
153+
return first_set
154+
155+
156+
# Show usage of explicit Begin/Commit transaction control calls.
157+
# In most cases it's better to use transaction control settings in session.transaction
158+
# calls instead to avoid additional hops to YDB cluster and allow more efficient
159+
# execution of queries.
160+
async def explicit_transaction_control(
161+
pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id
162+
):
163+
async def callee(session: ydb.aio.QuerySessionAsync):
164+
query = """
165+
PRAGMA TablePathPrefix("{}");
166+
UPDATE episodes
167+
SET air_date = CurrentUtcDate()
168+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
169+
""".format(
170+
path
171+
)
172+
173+
# Get newly created transaction id
174+
tx = await session.transaction(ydb.QuerySerializableReadWrite()).begin()
175+
176+
# Execute data query.
177+
# Transaction control settings continues active transaction (tx)
178+
async with await tx.execute(
179+
query,
180+
{
181+
"$seriesId": (series_id, ydb.PrimitiveType.Int64),
182+
"$seasonId": (season_id, ydb.PrimitiveType.Int64),
183+
"$episodeId": (episode_id, ydb.PrimitiveType.Int64),
184+
},
185+
) as _:
186+
pass
187+
188+
print("\n> explicit TCL call")
189+
190+
# Commit active transaction(tx)
191+
await tx.commit()
192+
193+
return await pool.retry_operation_async(callee)
194+
195+
196+
async def drop_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str):
197+
print("\nCleaning up existing tables...")
198+
await pool.execute_with_retries(DropTablesQuery.format(path))
199+
200+
201+
async def create_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str):
202+
print("\nCreating table series...")
203+
await pool.execute_with_retries(
204+
"""
205+
PRAGMA TablePathPrefix("{}");
206+
CREATE table `series` (
207+
`series_id` Int64,
208+
`title` Utf8,
209+
`series_info` Utf8,
210+
`release_date` Date,
211+
PRIMARY KEY (`series_id`)
212+
)
213+
""".format(
214+
path
215+
)
216+
)
217+
218+
print("\nCreating table seasons...")
219+
await pool.execute_with_retries(
220+
"""
221+
PRAGMA TablePathPrefix("{}");
222+
CREATE table `seasons` (
223+
`series_id` Int64,
224+
`season_id` Int64,
225+
`title` Utf8,
226+
`first_aired` Date,
227+
`last_aired` Date,
228+
PRIMARY KEY (`series_id`, `season_id`)
229+
)
230+
""".format(
231+
path
232+
)
233+
)
234+
235+
print("\nCreating table episodes...")
236+
await pool.execute_with_retries(
237+
"""
238+
PRAGMA TablePathPrefix("{}");
239+
CREATE table `episodes` (
240+
`series_id` Int64,
241+
`season_id` Int64,
242+
`episode_id` Int64,
243+
`title` Utf8,
244+
`air_date` Date,
245+
PRIMARY KEY (`series_id`, `season_id`, `episode_id`)
246+
)
247+
""".format(
248+
path
249+
)
250+
)
251+
252+
253+
async def is_directory_exists(driver: ydb.aio.Driver, path: str):
254+
try:
255+
return await driver.scheme_client.describe_path(path).is_directory()
256+
except ydb.SchemeError:
257+
return False
258+
259+
260+
async def ensure_path_exists(driver: ydb.aio.Driver, database, path):
261+
paths_to_create = list()
262+
path = path.rstrip("/")
263+
while path not in ("", database):
264+
full_path = posixpath.join(database, path)
265+
if await is_directory_exists(driver, full_path):
266+
break
267+
paths_to_create.append(full_path)
268+
path = posixpath.dirname(path).rstrip("/")
269+
270+
while len(paths_to_create) > 0:
271+
full_path = paths_to_create.pop(-1)
272+
await driver.scheme_client.make_directory(full_path)
273+
274+
275+
async def run(endpoint, database, path):
276+
async with ydb.aio.Driver(
277+
endpoint=endpoint,
278+
database=database,
279+
credentials=ydb.credentials_from_env_variables(),
280+
) as driver:
281+
await driver.wait(timeout=5, fail_fast=True)
282+
283+
async with ydb.aio.QuerySessionPoolAsync(driver) as pool:
284+
285+
await ensure_path_exists(driver, database, path)
286+
287+
# absolute path - prefix to the table's names,
288+
# including the database location
289+
full_path = posixpath.join(database, path)
290+
291+
await drop_tables(pool, full_path)
292+
293+
await create_tables(pool, full_path)
294+
295+
await fill_tables_with_data(pool, full_path)
296+
297+
await select_simple(pool, full_path)
298+
299+
await upsert_simple(pool, full_path)
300+
301+
await select_with_parameters(pool, full_path, 2, 3, 7)
302+
await select_with_parameters(pool, full_path, 2, 3, 8)
303+
304+
await explicit_transaction_control(pool, full_path, 2, 6, 1)
305+
await select_with_parameters(pool, full_path, 2, 6, 1)

0 commit comments

Comments
 (0)