diff --git a/examples/basic_example_v2/README.md b/examples/basic_example_v2/README.md index 92ebe21a..396e0e7f 100644 --- a/examples/basic_example_v2/README.md +++ b/examples/basic_example_v2/README.md @@ -1,5 +1,10 @@ # YDB Python SDK Example: basic_example_v2 Example code demonstrating the basic YDB Python SDK operations. +Example is awailable in two modes: +1. `sync` - synchronous implementation; +1. `async` - asynchronous implementation using asyncio. + +To spesify mode, use argument `-m async` or `--mode async`. See the top-level [README.md](../README.md) file for instructions on running this example. diff --git a/examples/basic_example_v2/__main__.py b/examples/basic_example_v2/__main__.py index 2397a2c1..9c68c00d 100644 --- a/examples/basic_example_v2/__main__.py +++ b/examples/basic_example_v2/__main__.py @@ -1,6 +1,8 @@ # -*- coding: utf-8 -*- import argparse -import basic_example +import asyncio +from basic_example import run as run_sync +from basic_example_async import run as run_async import logging @@ -13,6 +15,7 @@ parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136") parser.add_argument("-p", "--path", default="") parser.add_argument("-v", "--verbose", default=False, action="store_true") + parser.add_argument("-m", "--mode", default="sync", help="Mode of example: sync or async") args = parser.parse_args() @@ -20,9 +23,21 @@ logger = logging.getLogger("ydb.pool.Discovery") logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) - - basic_example.run( - args.endpoint, - args.database, - args.path, - ) + if args.mode == "sync": + print("Running sync example") + run_sync( + args.endpoint, + args.database, + args.path, + ) + elif args.mode == "async": + print("Running async example") + asyncio.run( + run_async( + args.endpoint, + args.database, + args.path, + ) + ) + else: + raise ValueError(f"Unsupported mode: {args.mode}, use one of sync|async") diff --git a/examples/basic_example_v2/basic_example_async.py b/examples/basic_example_v2/basic_example_async.py new file mode 100644 index 00000000..0966b592 --- /dev/null +++ b/examples/basic_example_v2/basic_example_async.py @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- +import posixpath +import ydb +import basic_example_data + +# Table path prefix allows to put the working tables into the specific directory +# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")` +# at the beginning of the query allows to reference the tables through +# their names "under" the specified directory. +# +# TablePathPrefix has to be defined as an absolute path, which has to be started +# with the current database location. +# +# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix + +DropTablesQuery = """PRAGMA TablePathPrefix("{}"); +DROP TABLE IF EXISTS series; +DROP TABLE IF EXISTS seasons; +DROP TABLE IF EXISTS episodes; +""" + +FillDataQuery = """PRAGMA TablePathPrefix("{}"); + +DECLARE $seriesData AS List>; + +DECLARE $seasonsData AS List>; + +DECLARE $episodesData AS List>; + +REPLACE INTO series +SELECT + series_id, + title, + series_info, + release_date +FROM AS_TABLE($seriesData); + +REPLACE INTO seasons +SELECT + series_id, + season_id, + title, + first_aired, + last_aired +FROM AS_TABLE($seasonsData); + +REPLACE INTO episodes +SELECT + series_id, + season_id, + episode_id, + title, + air_date +FROM AS_TABLE($episodesData); +""" + + +async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync, path: str): + print("\nFilling tables with data...") + + query = FillDataQuery.format(path) + + await pool.execute_with_retries( + query, + { + "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), + "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), + "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), + }, + ) + + +async def select_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str): + print("\nCheck series table...") + result_sets = await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """.format( + path + ), + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + + return first_set + + +async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync, path: str): + print("\nPerforming UPSERT into episodes...") + + await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """.format( + path + ) + ) + + +async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id): + result_sets = await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ), + { + "$seriesId": series_id, # could be defined implicit + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + + +# Show usage of explicit Begin/Commit transaction control calls. +# In most cases it's better to use transaction control settings in session.transaction +# calls instead to avoid additional hops to YDB cluster and allow more efficient +# execution of queries. +async def explicit_transaction_control( + pool: ydb.aio.QuerySessionPoolAsync, path: str, series_id, season_id, episode_id +): + async def callee(session: ydb.aio.QuerySessionAsync): + query = """ + PRAGMA TablePathPrefix("{}"); + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ) + + # Get newly created transaction id + tx = await session.transaction(ydb.QuerySerializableReadWrite()).begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + async with await tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + await tx.commit() + + return await pool.retry_operation_async(callee) + + +async def drop_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str): + print("\nCleaning up existing tables...") + await pool.execute_with_retries(DropTablesQuery.format(path)) + + +async def create_tables(pool: ydb.aio.QuerySessionPoolAsync, path: str): + print("\nCreating table series...") + await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table seasons...") + await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table episodes...") + await pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """.format( + path + ) + ) + + +async def is_directory_exists(driver: ydb.aio.Driver, path: str): + try: + return await driver.scheme_client.describe_path(path).is_directory() + except ydb.SchemeError: + return False + + +async def ensure_path_exists(driver: ydb.aio.Driver, database, path): + paths_to_create = list() + path = path.rstrip("/") + while path not in ("", database): + full_path = posixpath.join(database, path) + if await is_directory_exists(driver, full_path): + break + paths_to_create.append(full_path) + path = posixpath.dirname(path).rstrip("/") + + while len(paths_to_create) > 0: + full_path = paths_to_create.pop(-1) + await driver.scheme_client.make_directory(full_path) + + +async def run(endpoint, database, path): + async with ydb.aio.Driver( + endpoint=endpoint, + database=database, + credentials=ydb.credentials_from_env_variables(), + ) as driver: + await driver.wait(timeout=5, fail_fast=True) + + async with ydb.aio.QuerySessionPoolAsync(driver) as pool: + + await ensure_path_exists(driver, database, path) + + # absolute path - prefix to the table's names, + # including the database location + full_path = posixpath.join(database, path) + + await drop_tables(pool, full_path) + + await create_tables(pool, full_path) + + await fill_tables_with_data(pool, full_path) + + await select_simple(pool, full_path) + + await upsert_simple(pool, full_path) + + await select_with_parameters(pool, full_path, 2, 3, 7) + await select_with_parameters(pool, full_path, 2, 3, 8) + + await explicit_transaction_control(pool, full_path, 2, 6, 1) + await select_with_parameters(pool, full_path, 2, 6, 1) diff --git a/ydb/aio/query/pool.py b/ydb/aio/query/pool.py index 53f11a03..f91f7465 100644 --- a/ydb/aio/query/pool.py +++ b/ydb/aio/query/pool.py @@ -57,7 +57,12 @@ async def wrapped_callee(): return await retry_operation_async(wrapped_callee, retry_settings) async def execute_with_retries( - self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. Special interface to execute a one-shot queries in a safe, retriable way. @@ -65,6 +70,7 @@ async def execute_with_retries( method with huge read queries. :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors. @@ -74,11 +80,20 @@ async def execute_with_retries( async def wrapped_callee(): async with self.checkout() as session: - it = await session.execute(query, *args, **kwargs) + it = await session.execute(query, parameters, *args, **kwargs) return [result_set async for result_set in it] return await retry_operation_async(wrapped_callee, retry_settings) + async def stop(self, timeout=None): + pass # TODO: implement + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.stop() + class SimpleQuerySessionCheckoutAsync: def __init__(self, pool: QuerySessionPoolAsync): diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 3b918e61..627a41d8 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -136,5 +136,9 @@ async def execute( return AsyncResponseContextIterator( stream_it, - lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp), + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + settings=self._settings, + ), ) diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index f8e332fa..e9993fcc 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -56,6 +56,7 @@ async def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "Q :return: None or exception if begin is failed """ await self._begin_call(settings) + return self async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: """WARNING: This API is experimental and could be changed. @@ -146,6 +147,7 @@ async def execute( response_pb=resp, tx=self, commit_tx=commit_tx, + settings=settings, ), ) return self._prev_stream diff --git a/ydb/query/pool.py b/ydb/query/pool.py index bf868352..afe39f06 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -69,6 +69,7 @@ def execute_with_retries( method with huge read queries. :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; :param retry_settings: RetrySettings object. :return: Result sets or exception in case of execution errors.