Skip to content

Commit 663a283

Browse files
authored
Merge pull request #469 from ydb-platform/basic_example_v2
basic example with query service
2 parents 85ca8cf + 61c40fd commit 663a283

File tree

9 files changed

+580
-12
lines changed

9 files changed

+580
-12
lines changed

examples/basic_example_v1/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# YDB Python SDK Example: basic_example_v1
22

3+
**This example is outdated, please see [example with new API](../basic_example_v2/)**
4+
35
Example code demonstrating the basic YDB Python SDK operations.
46

57
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/basic_example_v2/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# YDB Python SDK Example: basic_example_v2
2+
3+
Example code demonstrating the basic YDB Python SDK operations.
4+
5+
See the top-level [README.md](../README.md) file for instructions on running this example.

examples/basic_example_v2/__main__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# -*- coding: utf-8 -*-
2+
import argparse
3+
import basic_example
4+
import logging
5+
6+
7+
if __name__ == "__main__":
8+
parser = argparse.ArgumentParser(
9+
formatter_class=argparse.RawDescriptionHelpFormatter,
10+
description="""\033[92mYDB basic example.\x1b[0m\n""",
11+
)
12+
parser.add_argument("-d", "--database", help="Name of the database to use", default="/local")
13+
parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136")
14+
parser.add_argument("-p", "--path", default="")
15+
parser.add_argument("-v", "--verbose", default=False, action="store_true")
16+
17+
args = parser.parse_args()
18+
19+
if args.verbose:
20+
logger = logging.getLogger("ydb.pool.Discovery")
21+
logger.setLevel(logging.INFO)
22+
logger.addHandler(logging.StreamHandler())
23+
24+
basic_example.run(
25+
args.endpoint,
26+
args.database,
27+
args.path,
28+
)
Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
# -*- coding: utf-8 -*-
2+
import posixpath
3+
import ydb
4+
import basic_example_data
5+
6+
# Table path prefix allows to put the working tables into the specific directory
7+
# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
8+
# at the beginning of the query allows to reference the tables through
9+
# their names "under" the specified directory.
10+
#
11+
# TablePathPrefix has to be defined as an absolute path, which has to be started
12+
# with the current database location.
13+
#
14+
# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix
15+
16+
DropTablesQuery = """PRAGMA TablePathPrefix("{}");
17+
DROP TABLE IF EXISTS series;
18+
DROP TABLE IF EXISTS seasons;
19+
DROP TABLE IF EXISTS episodes;
20+
"""
21+
22+
FillDataQuery = """PRAGMA TablePathPrefix("{}");
23+
24+
DECLARE $seriesData AS List<Struct<
25+
series_id: Int64,
26+
title: Utf8,
27+
series_info: Utf8,
28+
release_date: Date>>;
29+
30+
DECLARE $seasonsData AS List<Struct<
31+
series_id: Int64,
32+
season_id: Int64,
33+
title: Utf8,
34+
first_aired: Date,
35+
last_aired: Date>>;
36+
37+
DECLARE $episodesData AS List<Struct<
38+
series_id: Int64,
39+
season_id: Int64,
40+
episode_id: Int64,
41+
title: Utf8,
42+
air_date: Date>>;
43+
44+
REPLACE INTO series
45+
SELECT
46+
series_id,
47+
title,
48+
series_info,
49+
release_date
50+
FROM AS_TABLE($seriesData);
51+
52+
REPLACE INTO seasons
53+
SELECT
54+
series_id,
55+
season_id,
56+
title,
57+
first_aired,
58+
last_aired
59+
FROM AS_TABLE($seasonsData);
60+
61+
REPLACE INTO episodes
62+
SELECT
63+
series_id,
64+
season_id,
65+
episode_id,
66+
title,
67+
air_date
68+
FROM AS_TABLE($episodesData);
69+
"""
70+
71+
72+
def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str):
73+
print("\nFilling tables with data...")
74+
75+
query = FillDataQuery.format(path)
76+
77+
pool.execute_with_retries(
78+
query,
79+
{
80+
"$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()),
81+
"$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()),
82+
"$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()),
83+
},
84+
)
85+
86+
87+
def select_simple(pool: ydb.QuerySessionPool, path: str):
88+
print("\nCheck series table...")
89+
result_sets = pool.execute_with_retries(
90+
"""
91+
PRAGMA TablePathPrefix("{}");
92+
SELECT
93+
series_id,
94+
title,
95+
release_date
96+
FROM series
97+
WHERE series_id = 1;
98+
""".format(
99+
path
100+
),
101+
)
102+
first_set = result_sets[0]
103+
for row in first_set.rows:
104+
print(
105+
"series, id: ",
106+
row.series_id,
107+
", title: ",
108+
row.title,
109+
", release date: ",
110+
row.release_date,
111+
)
112+
113+
return first_set
114+
115+
116+
def upsert_simple(pool: ydb.QuerySessionPool, path: str):
117+
print("\nPerforming UPSERT into episodes...")
118+
119+
pool.execute_with_retries(
120+
"""
121+
PRAGMA TablePathPrefix("{}");
122+
UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
123+
""".format(
124+
path
125+
)
126+
)
127+
128+
129+
def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id):
130+
result_sets = pool.execute_with_retries(
131+
"""
132+
PRAGMA TablePathPrefix("{}");
133+
SELECT
134+
title,
135+
air_date
136+
FROM episodes
137+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
138+
""".format(
139+
path
140+
),
141+
{
142+
"$seriesId": series_id, # could be defined implicit
143+
"$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple
144+
"$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class
145+
},
146+
)
147+
148+
print("\n> select_with_parameters:")
149+
first_set = result_sets[0]
150+
for row in first_set.rows:
151+
print("episode title:", row.title, ", air date:", row.air_date)
152+
153+
return first_set
154+
155+
156+
# Show usage of explicit Begin/Commit transaction control calls.
157+
# In most cases it's better to use transaction control settings in session.transaction
158+
# calls instead to avoid additional hops to YDB cluster and allow more efficient
159+
# execution of queries.
160+
def explicit_transaction_control(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id):
161+
def callee(session: ydb.QuerySessionSync):
162+
query = """
163+
PRAGMA TablePathPrefix("{}");
164+
UPDATE episodes
165+
SET air_date = CurrentUtcDate()
166+
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
167+
""".format(
168+
path
169+
)
170+
171+
# Get newly created transaction id
172+
tx = session.transaction(ydb.QuerySerializableReadWrite()).begin()
173+
174+
# Execute data query.
175+
# Transaction control settings continues active transaction (tx)
176+
with tx.execute(
177+
query,
178+
{
179+
"$seriesId": (series_id, ydb.PrimitiveType.Int64),
180+
"$seasonId": (season_id, ydb.PrimitiveType.Int64),
181+
"$episodeId": (episode_id, ydb.PrimitiveType.Int64),
182+
},
183+
) as _:
184+
pass
185+
186+
print("\n> explicit TCL call")
187+
188+
# Commit active transaction(tx)
189+
tx.commit()
190+
191+
return pool.retry_operation_sync(callee)
192+
193+
194+
def drop_tables(pool: ydb.QuerySessionPool, path: str):
195+
print("\nCleaning up existing tables...")
196+
pool.execute_with_retries(DropTablesQuery.format(path))
197+
198+
199+
def create_tables(pool: ydb.QuerySessionPool, path: str):
200+
print("\nCreating table series...")
201+
pool.execute_with_retries(
202+
"""
203+
PRAGMA TablePathPrefix("{}");
204+
CREATE table `series` (
205+
`series_id` Int64,
206+
`title` Utf8,
207+
`series_info` Utf8,
208+
`release_date` Date,
209+
PRIMARY KEY (`series_id`)
210+
)
211+
""".format(
212+
path
213+
)
214+
)
215+
216+
print("\nCreating table seasons...")
217+
pool.execute_with_retries(
218+
"""
219+
PRAGMA TablePathPrefix("{}");
220+
CREATE table `seasons` (
221+
`series_id` Int64,
222+
`season_id` Int64,
223+
`title` Utf8,
224+
`first_aired` Date,
225+
`last_aired` Date,
226+
PRIMARY KEY (`series_id`, `season_id`)
227+
)
228+
""".format(
229+
path
230+
)
231+
)
232+
233+
print("\nCreating table episodes...")
234+
pool.execute_with_retries(
235+
"""
236+
PRAGMA TablePathPrefix("{}");
237+
CREATE table `episodes` (
238+
`series_id` Int64,
239+
`season_id` Int64,
240+
`episode_id` Int64,
241+
`title` Utf8,
242+
`air_date` Date,
243+
PRIMARY KEY (`series_id`, `season_id`, `episode_id`)
244+
)
245+
""".format(
246+
path
247+
)
248+
)
249+
250+
251+
def is_directory_exists(driver: ydb.Driver, path: str):
252+
try:
253+
return driver.scheme_client.describe_path(path).is_directory()
254+
except ydb.SchemeError:
255+
return False
256+
257+
258+
def ensure_path_exists(driver, database, path):
259+
paths_to_create = list()
260+
path = path.rstrip("/")
261+
while path not in ("", database):
262+
full_path = posixpath.join(database, path)
263+
if is_directory_exists(driver, full_path):
264+
break
265+
paths_to_create.append(full_path)
266+
path = posixpath.dirname(path).rstrip("/")
267+
268+
while len(paths_to_create) > 0:
269+
full_path = paths_to_create.pop(-1)
270+
driver.scheme_client.make_directory(full_path)
271+
272+
273+
def run(endpoint, database, path):
274+
with ydb.Driver(
275+
endpoint=endpoint,
276+
database=database,
277+
credentials=ydb.credentials_from_env_variables(),
278+
) as driver:
279+
driver.wait(timeout=5, fail_fast=True)
280+
281+
with ydb.QuerySessionPool(driver) as pool:
282+
283+
ensure_path_exists(driver, database, path)
284+
285+
# absolute path - prefix to the table's names,
286+
# including the database location
287+
full_path = posixpath.join(database, path)
288+
289+
drop_tables(pool, full_path)
290+
291+
create_tables(pool, full_path)
292+
293+
fill_tables_with_data(pool, full_path)
294+
295+
select_simple(pool, full_path)
296+
297+
upsert_simple(pool, full_path)
298+
299+
select_with_parameters(pool, full_path, 2, 3, 7)
300+
select_with_parameters(pool, full_path, 2, 3, 8)
301+
302+
explicit_transaction_control(pool, full_path, 2, 6, 1)
303+
select_with_parameters(pool, full_path, 2, 6, 1)

0 commit comments

Comments
 (0)