Skip to content

Commit bc285c1

Browse files
committed
New basic example with query service
1 parent 786e044 commit bc285c1

File tree

9 files changed

+575
-7
lines changed

9 files changed

+575
-7
lines changed

examples/basic_example_v2/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# YDB Python SDK Example: basic_example_v2
2+
3+
Example code demonstrating the basic YDB Python SDK operations.
4+
5+
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/basic_example_v2/__main__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
import basic_example
4+
import logging
5+
6+
7+
if __name__ == "__main__":
8+
parser = argparse.ArgumentParser(
9+
formatter_class=argparse.RawDescriptionHelpFormatter,
10+
description="""\033[92mYDB basic example.\x1b[0m\n""",
11+
)
12+
parser.add_argument("-d", "--database", help="Name of the database to use", default="/local")
13+
parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136")
14+
parser.add_argument("-p", "--path", default="")
15+
parser.add_argument("-v", "--verbose", default=False, action="store_true")
16+
17+
args = parser.parse_args()
18+
19+
if args.verbose:
20+
logger = logging.getLogger("ydb.pool.Discovery")
21+
logger.setLevel(logging.INFO)
22+
logger.addHandler(logging.StreamHandler())
23+
24+
basic_example.run(
25+
args.endpoint,
26+
args.database,
27+
args.path,
28+
)
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
# -*- coding: utf-8 -*-
2+
import posixpath
3+
import ydb
4+
import basic_example_data
5+
6+
# Table path prefix allows to put the working tables into the specific directory
7+
# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8+
# at the beginning of the query allows to reference the tables through
9+
# their names "under" the specified directory.
10+
#
11+
# TablePathPrefix has to be defined as an absolute path, which has to be started
12+
# with the current database location.
13+
#
14+
# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15+
16+
DropTablesQuery = """PRAGMA TablePathPrefix("{}");
17+
DROP TABLE IF EXISTS series;
18+
DROP TABLE IF EXISTS seasons;
19+
DROP TABLE IF EXISTS episodes;
20+
"""
21+
22+
FillDataQuery = """PRAGMA TablePathPrefix("{}");
23+
24+
DECLARE $seriesData AS List<Struct<
25+
series_id: Uint64,
26+
title: Utf8,
27+
series_info: Utf8,
28+
release_date: Date>>;
29+
30+
DECLARE $seasonsData AS List<Struct<
31+
series_id: Uint64,
32+
season_id: Uint64,
33+
title: Utf8,
34+
first_aired: Date,
35+
last_aired: Date>>;
36+
37+
DECLARE $episodesData AS List<Struct<
38+
series_id: Uint64,
39+
season_id: Uint64,
40+
episode_id: Uint64,
41+
title: Utf8,
42+
air_date: Date>>;
43+
44+
REPLACE INTO series
45+
SELECT
46+
series_id,
47+
title,
48+
series_info,
49+
CAST(release_date AS Uint64) AS release_date
50+
FROM AS_TABLE($seriesData);
51+
52+
REPLACE INTO seasons
53+
SELECT
54+
series_id,
55+
season_id,
56+
title,
57+
CAST(first_aired AS Uint64) AS first_aired,
58+
CAST(last_aired AS Uint64) AS last_aired
59+
FROM AS_TABLE($seasonsData);
60+
61+
REPLACE INTO episodes
62+
SELECT
63+
series_id,
64+
season_id,
65+
episode_id,
66+
title,
67+
CAST(air_date AS Uint64) AS air_date
68+
FROM AS_TABLE($episodesData);
69+
"""
70+
71+
72+
def fill_tables_with_data(pool, path):
73+
print("\nFilling tables with data...")
74+
75+
global FillDataQuery
76+
77+
def callee(session):
78+
79+
prepared_query = FillDataQuery.format(path)
80+
with session.transaction(ydb.QuerySerializableReadWrite()).execute(
81+
prepared_query,
82+
{
83+
"$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()),
84+
"$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()),
85+
"$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()),
86+
},
87+
commit_tx=True,
88+
) as result_sets:
89+
pass
90+
91+
return pool.retry_operation_sync(callee)
92+
93+
94+
def select_simple(pool, path):
95+
print("\nCheck series table...")
96+
97+
def callee(session):
98+
# new transaction in serializable read write mode
99+
# if query successfully completed you will get result sets.
100+
# otherwise exception will be raised
101+
with session.transaction(ydb.QuerySerializableReadWrite()).execute(
102+
"""
103+
PRAGMA TablePathPrefix("{}");
104+
$format = DateTime::Format("%Y-%m-%d");
105+
SELECT
106+
series_id,
107+
title,
108+
$format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date
109+
FROM series
110+
WHERE series_id = 1;
111+
""".format(
112+
path
113+
),
114+
commit_tx=True,
115+
) as result_sets:
116+
first_set = next(result_sets)
117+
for row in first_set.rows:
118+
print(
119+
"series, id: ",
120+
row.series_id,
121+
", title: ",
122+
row.title,
123+
", release date: ",
124+
row.release_date,
125+
)
126+
127+
return first_set
128+
129+
return pool.retry_operation_sync(callee)
130+
131+
132+
def upsert_simple(pool, path):
133+
print(f"\nPerforming UPSERT into episodes...")
134+
135+
def callee(session):
136+
with session.transaction().execute(
137+
"""
138+
PRAGMA TablePathPrefix("{}");
139+
140+
DECLARE $seriesId AS Uint64;
141+
DECLARE $seasonId AS Uint64;
142+
DECLARE $episodeId AS Uint64;
143+
DECLARE $title AS Utf8;
144+
145+
UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
146+
""".format(
147+
path
148+
),
149+
commit_tx=True,
150+
) as result_sets:
151+
pass
152+
153+
return pool.retry_operation_sync(callee)
154+
155+
156+
def select_with_parameters(pool, path, series_id, season_id, episode_id):
157+
def callee(session):
158+
query = """
159+
PRAGMA TablePathPrefix("{}");
160+
161+
DECLARE $seriesId AS Uint64;
162+
DECLARE $seasonId AS Uint64;
163+
DECLARE $episodeId AS Uint64;
164+
165+
$format = DateTime::Format("%Y-%m-%d");
166+
SELECT
167+
title,
168+
$format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date
169+
FROM episodes
170+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
171+
""".format(
172+
path
173+
)
174+
175+
with session.transaction(ydb.QuerySerializableReadWrite()).execute(
176+
query,
177+
{
178+
"$seriesId": series_id, # could be defined implicit in case of int, str, bool
179+
"$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple
180+
"$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Uint64), # could be defined via special class
181+
},
182+
commit_tx=True,
183+
) as result_sets:
184+
print("\n> select_prepared_transaction:")
185+
first_set = next(result_sets)
186+
for row in first_set.rows:
187+
print("episode title:", row.title, ", air date:", row.air_date)
188+
189+
return first_set
190+
191+
return pool.retry_operation_sync(callee)
192+
193+
194+
# Show usage of explicit Begin/Commit transaction control calls.
195+
# In most cases it's better to use transaction control settings in session.transaction
196+
# calls instead to avoid additional hops to YDB cluster and allow more efficient
197+
# execution of queries.
198+
def explicit_tcl(pool, path, series_id, season_id, episode_id):
199+
def callee(session):
200+
query = """
201+
PRAGMA TablePathPrefix("{}");
202+
203+
DECLARE $seriesId AS Uint64;
204+
DECLARE $seasonId AS Uint64;
205+
DECLARE $episodeId AS Uint64;
206+
207+
UPDATE episodes
208+
SET air_date = CAST(CurrentUtcDate() AS Uint64)
209+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
210+
""".format(
211+
path
212+
)
213+
214+
# Get newly created transaction id
215+
tx = session.transaction(ydb.QuerySerializableReadWrite()).begin()
216+
217+
# Execute data query.
218+
# Transaction control settings continues active transaction (tx)
219+
with tx.execute(
220+
query,
221+
{"$seriesId": series_id, "$seasonId": season_id, "$episodeId": episode_id},
222+
) as result_sets:
223+
pass
224+
225+
print("\n> explicit TCL call")
226+
227+
# Commit active transaction(tx)
228+
tx.commit()
229+
230+
return pool.retry_operation_sync(callee)
231+
232+
233+
def drop_tables(pool, path):
234+
print("\nCleaning up existing tables...")
235+
pool.execute_with_retries(DropTablesQuery.format(path))
236+
237+
238+
def create_tables(pool, path):
239+
print("\nCreating table series...")
240+
pool.execute_with_retries(
241+
"""
242+
PRAGMA TablePathPrefix("{}");
243+
CREATE table `series` (
244+
`series_id` Uint64,
245+
`title` Utf8,
246+
`series_info` Utf8,
247+
`release_date` Uint64,
248+
PRIMARY KEY (`series_id`)
249+
)
250+
""".format(
251+
path
252+
)
253+
)
254+
255+
print("\nCreating table seasons...")
256+
pool.execute_with_retries(
257+
"""
258+
PRAGMA TablePathPrefix("{}");
259+
CREATE table `seasons` (
260+
`series_id` Uint64,
261+
`season_id` Uint64,
262+
`title` Utf8,
263+
`first_aired` Uint64,
264+
`last_aired` Uint64,
265+
PRIMARY KEY (`series_id`, `season_id`)
266+
)
267+
""".format(
268+
path
269+
)
270+
)
271+
272+
print("\nCreating table episodes...")
273+
pool.execute_with_retries(
274+
"""
275+
PRAGMA TablePathPrefix("{}");
276+
CREATE table `episodes` (
277+
`series_id` Uint64,
278+
`season_id` Uint64,
279+
`episode_id` Uint64,
280+
`title` Utf8,
281+
`air_date` Uint64,
282+
PRIMARY KEY (`series_id`, `season_id`, `episode_id`)
283+
)
284+
""".format(
285+
path
286+
)
287+
)
288+
289+
290+
def is_directory_exists(driver, path):
291+
try:
292+
return driver.scheme_client.describe_path(path).is_directory()
293+
except ydb.SchemeError:
294+
return False
295+
296+
297+
def ensure_path_exists(driver, database, path):
298+
paths_to_create = list()
299+
path = path.rstrip("/")
300+
while path not in ("", database):
301+
full_path = posixpath.join(database, path)
302+
if is_directory_exists(driver, full_path):
303+
break
304+
paths_to_create.append(full_path)
305+
path = posixpath.dirname(path).rstrip("/")
306+
307+
while len(paths_to_create) > 0:
308+
full_path = paths_to_create.pop(-1)
309+
driver.scheme_client.make_directory(full_path)
310+
311+
312+
def run(endpoint, database, path):
313+
with ydb.Driver(
314+
endpoint=endpoint,
315+
database=database,
316+
# credentials=ydb.credentials_from_env_variables()
317+
) as driver:
318+
driver.wait(timeout=5, fail_fast=True)
319+
320+
with ydb.QuerySessionPool(driver) as pool:
321+
322+
ensure_path_exists(driver, database, path)
323+
324+
# absolute path - prefix to the table's names,
325+
# including the database location
326+
full_path = posixpath.join(database, path)
327+
328+
drop_tables(pool, full_path)
329+
330+
create_tables(pool, full_path)
331+
332+
fill_tables_with_data(pool, full_path)
333+
334+
select_simple(pool, full_path)
335+
336+
upsert_simple(pool, full_path)
337+
338+
select_with_parameters(pool, full_path, 2, 3, 7)
339+
select_with_parameters(pool, full_path, 2, 3, 8)
340+
341+
explicit_tcl(pool, full_path, 2, 6, 1)
342+
select_with_parameters(pool, full_path, 2, 6, 1)

0 commit comments

Comments
 (0)