Skip to content

Add huge select example to basic example v2 #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions examples/basic_example_v2/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
formatter_class=argparse.RawDescriptionHelpFormatter,
description="""\033[92mYDB basic example.\x1b[0m\n""",
)
parser.add_argument("-d", "--database", help="Name of the database to use", default="/local")
parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136")
parser.add_argument("-p", "--path", default="")
parser.add_argument("-d", "--database", help="Name of the database to use", default="/local")
parser.add_argument("-v", "--verbose", default=False, action="store_true")
parser.add_argument("-m", "--mode", default="sync", help="Mode of example: sync or async")

Expand All @@ -28,15 +27,13 @@
run_sync(
args.endpoint,
args.database,
args.path,
)
elif args.mode == "async":
print("Running async example")
asyncio.run(
run_async(
args.endpoint,
args.database,
args.path,
)
)
else:
Expand Down
132 changes: 54 additions & 78 deletions examples/basic_example_v2/basic_example.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,15 @@
# -*- coding: utf-8 -*-
import posixpath
import ydb
import basic_example_data

# Table path prefix allows to put the working tables into the specific directory
# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")`
# at the beginning of the query allows to reference the tables through
# their names "under" the specified directory.
#
# TablePathPrefix has to be defined as an absolute path, which has to be started
# with the current database location.
#
# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix

DropTablesQuery = """PRAGMA TablePathPrefix("{}");

DropTablesQuery = """
DROP TABLE IF EXISTS series;
DROP TABLE IF EXISTS seasons;
DROP TABLE IF EXISTS episodes;
"""

FillDataQuery = """PRAGMA TablePathPrefix("{}");

FillDataQuery = """
DECLARE $seriesData AS List<Struct<
series_id: Int64,
title: Utf8,
Expand Down Expand Up @@ -69,13 +58,11 @@
"""


def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str):
def fill_tables_with_data(pool: ydb.QuerySessionPool):
print("\nFilling tables with data...")

query = FillDataQuery.format(path)

pool.execute_with_retries(
query,
FillDataQuery,
{
"$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()),
"$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()),
Expand All @@ -84,11 +71,10 @@ def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str):
)


def select_simple(pool: ydb.QuerySessionPool, path: str):
def select_simple(pool: ydb.QuerySessionPool):
print("\nCheck series table...")
result_sets = pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
SELECT
series_id,
title,
Expand All @@ -111,21 +97,23 @@ def select_simple(pool: ydb.QuerySessionPool, path: str):
return first_set


def upsert_simple(pool: ydb.QuerySessionPool, path: str):
def upsert_simple(pool: ydb.QuerySessionPool):
print("\nPerforming UPSERT into episodes...")

pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD");
"""
)


def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id):
def select_with_parameters(pool: ydb.QuerySessionPool, series_id, season_id, episode_id):
result_sets = pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
DECLARE $episodeId AS Int64;

SELECT
title,
air_date
Expand All @@ -151,10 +139,13 @@ def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, sea
# In most cases it's better to use transaction control settings in session.transaction
# calls instead to avoid additional hops to YDB cluster and allow more efficient
# execution of queries.
def explicit_transaction_control(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id):
def explicit_transaction_control(pool: ydb.QuerySessionPool, series_id, season_id, episode_id):
def callee(session: ydb.QuerySessionSync):
query = f"""
PRAGMA TablePathPrefix("{path}");
query = """
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
DECLARE $episodeId AS Int64;

UPDATE episodes
SET air_date = CurrentUtcDate()
WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId;
Expand Down Expand Up @@ -183,16 +174,31 @@ def callee(session: ydb.QuerySessionSync):
return pool.retry_operation_sync(callee)


def drop_tables(pool: ydb.QuerySessionPool, path: str):
def huge_select(pool: ydb.QuerySessionPool):
def callee(session: ydb.QuerySessionSync):
query = """SELECT * from episodes;"""

with session.transaction().execute(
query,
commit_tx=True,
) as result_sets:
print("\n> Huge SELECT call")
for result_set in result_sets:
for row in result_set.rows:
print("episode title:", row.title, ", air date:", row.air_date)

return pool.retry_operation_sync(callee)


def drop_tables(pool: ydb.QuerySessionPool):
print("\nCleaning up existing tables...")
pool.execute_with_retries(DropTablesQuery.format(path))
pool.execute_with_retries(DropTablesQuery)


def create_tables(pool: ydb.QuerySessionPool, path: str):
def create_tables(pool: ydb.QuerySessionPool):
print("\nCreating table series...")
pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
CREATE table `series` (
`series_id` Int64,
`title` Utf8,
Expand All @@ -205,8 +211,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):

print("\nCreating table seasons...")
pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
CREATE table `seasons` (
`series_id` Int64,
`season_id` Int64,
Expand All @@ -220,8 +225,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):

print("\nCreating table episodes...")
pool.execute_with_retries(
f"""
PRAGMA TablePathPrefix("{path}");
"""
CREATE table `episodes` (
`series_id` Int64,
`season_id` Int64,
Expand All @@ -234,29 +238,7 @@ def create_tables(pool: ydb.QuerySessionPool, path: str):
)


def is_directory_exists(driver: ydb.Driver, path: str):
try:
return driver.scheme_client.describe_path(path).is_directory()
except ydb.SchemeError:
return False


def ensure_path_exists(driver, database, path):
paths_to_create = list()
path = path.rstrip("/")
while path not in ("", database):
full_path = posixpath.join(database, path)
if is_directory_exists(driver, full_path):
break
paths_to_create.append(full_path)
path = posixpath.dirname(path).rstrip("/")

while len(paths_to_create) > 0:
full_path = paths_to_create.pop(-1)
driver.scheme_client.make_directory(full_path)


def run(endpoint, database, path):
def run(endpoint, database):
with ydb.Driver(
endpoint=endpoint,
database=database,
Expand All @@ -265,25 +247,19 @@ def run(endpoint, database, path):
driver.wait(timeout=5, fail_fast=True)

with ydb.QuerySessionPool(driver) as pool:
drop_tables(pool)

ensure_path_exists(driver, database, path)

# absolute path - prefix to the table's names,
# including the database location
full_path = posixpath.join(database, path)

drop_tables(pool, full_path)

create_tables(pool, full_path)
create_tables(pool)

fill_tables_with_data(pool, full_path)
fill_tables_with_data(pool)

select_simple(pool, full_path)
select_simple(pool)

upsert_simple(pool, full_path)
upsert_simple(pool)

select_with_parameters(pool, full_path, 2, 3, 7)
select_with_parameters(pool, full_path, 2, 3, 8)
select_with_parameters(pool, 2, 3, 7)
select_with_parameters(pool, 2, 3, 8)

explicit_transaction_control(pool, full_path, 2, 6, 1)
select_with_parameters(pool, full_path, 2, 6, 1)
explicit_transaction_control(pool, 2, 6, 1)
select_with_parameters(pool, 2, 6, 1)
huge_select(pool)
Loading
Loading