diff --git a/ydb/docs/en/core/dev/example-app/python/index.md b/ydb/docs/en/core/dev/example-app/python/index.md index c165c1dd921d..844fb06d4c57 100644 --- a/ydb/docs/en/core/dev/example-app/python/index.md +++ b/ydb/docs/en/core/dev/example-app/python/index.md @@ -1,6 +1,6 @@ # App in Python -This page contains a detailed description of the code of a [test app](https://github.com/ydb-platform/ydb-python-sdk/tree/master/examples/basic_example_v1) that is available as part of the {{ ydb-short-name }} [Python SDK](https://github.com/ydb-platform/ydb-python-sdk). +This page contains a detailed description of the code of a [test app](https://github.com/ydb-platform/ydb-python-sdk/tree/master/examples/basic_example_v2) that is available as part of the {{ ydb-short-name }} [Python SDK](https://github.com/ydb-platform/ydb-python-sdk). ## Downloading and starting {#download} @@ -21,217 +21,549 @@ Next, from the same working directory, run the command to start the test app. Th App code snippet for driver initialization: -```python -def run(endpoint, database, path): - driver_config = ydb.DriverConfig( - endpoint, database, credentials=ydb.credentials_from_env_variables(), - root_certificates=ydb.load_ydb_root_certificate(), - ) - with ydb.Driver(driver_config) as driver: - try: - driver.wait(timeout=5) - except TimeoutError: - print("Connect failed to YDB") - print("Last reported errors by discovery:") - print(driver.discovery_debug_details()) - exit(1) -``` - -App code snippet for creating a session: - -```python -session = driver.table_client.session().create() -``` +{% list tabs %} + +- Synchronous + + ```python + def run(endpoint, database): + driver_config = ydb.DriverConfig( + endpoint, database, credentials=ydb.credentials_from_env_variables(), + root_certificates=ydb.load_ydb_root_certificate(), + ) + with ydb.Driver(driver_config) as driver: + try: + driver.wait(timeout=5) + except TimeoutError: + print("Connect failed to YDB") + print("Last reported errors by discovery:") + print(driver.discovery_debug_details()) + exit(1) + ``` + +- Asynchronous + + ```python + async def run(endpoint, database): + driver_config = ydb.DriverConfig( + endpoint, database, credentials=ydb.credentials_from_env_variables(), + root_certificates=ydb.load_ydb_root_certificate(), + ) + async with ydb.aio.Driver(driver_config) as driver: + try: + await driver.wait(timeout=5) + except TimeoutError: + print("Connect failed to YDB") + print("Last reported errors by discovery:") + print(driver.discovery_debug_details()) + exit(1) + ``` + +{% endlist %} + +App code snippet for session pool initialization: + +{% list tabs %} + +- Synchronous + + ```python + with ydb.QuerySessionPool(driver) as pool: + pass # operations with pool here + ``` + +- Asynchronous + + ```python + async with ydb.aio.QuerySessionPoolAsync(driver) as pool: + pass # operations with pool here + ``` + +{% endlist %} + +## Executing queries + +{{ ydb-short-name }} Python SDK supports queries described by YQL syntax. +There are two primary methods for executing queries, each with different properties and use cases: + +* `pool.execute_with_retries`: + * Buffers the entire result set in client memory. + * Automatically retries execution in case of retriable issues. + * Does not allow specifying a transaction execution mode. + * Recommended for one-off queries that are expected to produce small result sets. + +* `tx.execute`: + * Returns an iterator over the query results, allowing processing of results that may not fit into client memory. + * Retries must be handled manually via `pool.retry_operation_sync`. + * Allows specifying a transaction execution mode. + * Recommended for scenarios where `pool.execute_with_retries` is insufficient. {% include [create_table.md](../_includes/steps/02_create_table.md) %} -To create tables, use the `session.create_table()` method: - -```python -def create_tables(session, path): - session.create_table( - os.path.join(path, 'series'), - ydb.TableDescription() - .with_column(ydb.Column('series_id', ydb.PrimitiveType.Uint64)) # not null column - .with_column(ydb.Column('title', ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column('series_info', ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column('release_date', ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_primary_key('series_id') - ) -``` - -The path parameter accepts the absolute path starting from the root: +To execute `CREATE TABLE` queries, use the `pool.execute_with_retries()` method: + +{% list tabs %} + +- Synchronous + + ```python + def create_tables(pool: ydb.QuerySessionPool): + print("\nCreating table series...") + pool.execute_with_retries( + """ + CREATE TABLE `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """ + ) + + print("\nCreating table seasons...") + pool.execute_with_retries( + """ + CREATE TABLE `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """ + ) + + print("\nCreating table episodes...") + pool.execute_with_retries( + """ + CREATE TABLE `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """ + ) + ``` + +- Asynchronous + + ```python + async def create_tables(pool: ydb.aio.QuerySessionPoolAsync): + print("\nCreating table series...") + await pool.execute_with_retries( + """ + CREATE TABLE `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """ + ) + + print("\nCreating table seasons...") + await pool.execute_with_retries( + """ + CREATE TABLE `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """ + ) + + print("\nCreating table episodes...") + await pool.execute_with_retries( + """ + CREATE TABLE `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """ + ) + ``` + +{% endlist %} -```python -full_path = os.path.join(database, path) -``` +{% include [steps/03_write_queries.md](../_includes/steps/03_write_queries.md) %} -You can use the `session.describe_table()` method to output information about the table structure and make sure that it was properly created: +Code snippet for data insert/update: -```python -def describe_table(session, path, name): - result = session.describe_table(os.path.join(path, name)) - print("\n> describe table: series") - for column in result.columns: - print("column, name:", column.name, ",", str(column.type.item).strip()) -``` +{% list tabs %} -The given code snippet prints the following text to the console at startup: +- Synchronous -```bash -> describe table: series -('column, name:', 'series_id', ',', 'type_id: UINT64') -('column, name:', 'title', ',', 'type_id: UTF8') -('column, name:', 'series_info', ',', 'type_id: UTF8') -('column, name:', 'release_date', ',', 'type_id: UINT64') -``` -{% include [steps/03_write_queries.md](../_includes/steps/03_write_queries.md) %} + ```python + def upsert_simple(pool: ydb.QuerySessionPool): + print("\nPerforming UPSERT into episodes...") + pool.execute_with_retries( + """ + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """ + ) + ``` -Code snippet for data insert/update: +- Asynchronous -```python -def upsert_simple(session, path): - session.transaction().execute( - """ - PRAGMA TablePathPrefix("{}"); - UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES - (2, 6, 1, "TBD"); - """.format(path), - commit_tx=True, - ) -``` + ```python + async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync): + print("\nPerforming UPSERT into episodes...") + await pool.execute_with_retries( + """ + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """ + ) + ``` -{% include [pragmatablepathprefix.md](../_includes/auxilary/pragmatablepathprefix.md) %} +{% endlist %} {% include [steps/04_query_processing.md](../_includes/steps/04_query_processing.md) %} -To execute YQL queries, use the `session.transaction().execute()` method. -The SDK lets you explicitly control the execution of transactions and configure the transaction execution mode using the `TxControl` class. - -In the code snippet below, the transaction is executed using the `transaction().execute()` method. The transaction execution mode set is `ydb.SerializableReadWrite()`. When all the queries in the transaction are completed, the transaction is automatically committed by explicitly setting the flag `commit_tx=True`. The query body is described using YQL syntax and is passed to the `execute` method as a parameter. - -```python -def select_simple(session, path): - result_sets = session.transaction(ydb.SerializableReadWrite()).execute( - """ - PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); - SELECT - series_id, - title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date - FROM series - WHERE series_id = 1; - """.format(path), - commit_tx=True, - ) - print("\n> select_simple_transaction:") - for row in result_sets[0].rows: - print("series, id: ", row.series_id, ", title: ", row.title, ", release date: ", row.release_date) - - return result_sets[0] -``` - -When the query is executed, `result_set` is returned whose iteration outputs the following text to the console: +To execute YQL queries, the `pool.execute_with_retries()` method is often sufficient. + +{% list tabs %} + +- Synchronous + + ```python + def select_simple(pool: ydb.QuerySessionPool): + print("\nCheck series table...") + result_sets = pool.execute_with_retries( + """ + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """, + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + return first_set + ``` + +- Asynchronous + + ```python + async def select_simple(pool: ydb.aio.QuerySessionPoolAsync): + print("\nCheck series table...") + result_sets = await pool.execute_with_retries( + """ + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """, + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + return first_set + ``` + +{% endlist %} + +As the result of executing the query, a list of `result_set` is returned, iterating on which the text is output to the console: ```bash > SelectSimple: series, Id: 1, title: IT Crowd, Release date: 2006-02-03 ``` - -{% include [param_prep_queries.md](../_includes/steps/07_param_prep_queries.md) %} - -```python -def select_prepared(session, path, series_id, season_id, episode_id): - query = """ - PRAGMA TablePathPrefix("{}"); - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - $format = DateTime::Format("%Y-%m-%d"); - SELECT - title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date - FROM episodes - WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; - """.format(path) - - prepared_query = session.prepare(query) - result_sets = session.transaction(ydb.SerializableReadWrite()).execute( - prepared_query, { - '$seriesId': series_id, - '$seasonId': season_id, - '$episodeId': episode_id, - }, - commit_tx=True - ) - print("\n> select_prepared_transaction:") - for row in result_sets[0].rows: - print("episode title:", row.title, ", air date:", row.air_date) - - return result_sets[0] -``` - -The given code snippet prints the following text to the console at startup: +## Parameterized queries {#param-queries} + +For parameterized query execution, `pool.execute_with_retries()` and `tx.execute()` behave similarly. To execute parameterized queries, you need to pass a dictionary with parameters to one of these functions, where each key is the parameter name, and the value can be one of the following: + +1. A value of a basic Python type +2. A tuple containing the value and its type +3. A special type, `ydb.TypedValue(value=value, value_type=value_type)` + +If you specify a value without an explicit type, the conversion takes place according to the following rules: + +| Python type | {{ ydb-short-name }} type | +|------------|------------------------------| +| `int` | `ydb.PrimitiveType.Int64` | +| `float` | `ydb.PrimitiveType.Double` | +| `str` | `ydb.PrimitiveType.Utf8` | +| `bytes` | `ydb.PrimitiveType.String` | +| `bool` | `ydb.PrimitiveType.Bool` | +| `list` | `ydb.ListType` | +| `dict` | `ydb.DictType` | + +{% note warning %} + +Automatic conversion of lists and dictionaries is possible only if the structures are homogeneous. The type of nested values will be determined recursively according to the rules explained above. In case of using heterogeneous structures, requests will raise `TypeError`. + +{% endnote %} + +A code snippet demonstrating the parameterized query execution: + +{% list tabs %} + +- Synchronous + + ```python + def select_with_parameters(pool: ydb.QuerySessionPool, series_id, season_id, episode_id): + result_sets = pool.execute_with_retries( + """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """, + { + "$seriesId": series_id, # data type could be defined implicitly + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via a tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via a special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + ``` + +- Asynchronous + + ```python + async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id): + result_sets = await pool.execute_with_retries( + """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """, + { + "$seriesId": series_id, # could be defined implicitly + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via a tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via a special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + ``` + +{% endlist %} + +The code snippet above outputs the following text to the console: ```bash > select_prepared_transaction: ('episode title:', u'To Build a Better Beta', ', air date:', '2016-06-05') ``` -{% include [scan_query.md](../_includes/steps/08_scan_query.md) %} - -```python -def executeScanQuery(driver): - query = ydb.ScanQuery(""" - SELECT series_id, season_id, COUNT(*) AS episodes_count - FROM episodes - GROUP BY series_id, season_id - ORDER BY series_id, season_id - """, {}) - - it = driver.table_client.scan_query(query) - - while True: - try: - result = next(it) - print result.result_set.rows - except StopIteration: - break -``` - {% include [transaction_control.md](../_includes/steps/10_transaction_control.md) %} -Code snippet for `transaction().begin()` and `tx.Commit()` calls: +The `session.transaction().execute()` method can also be used to execute YQL queries. Unlike `pool.execute_with_retries`, this method allows explicit control of transaction execution by configuring the desired transaction mode using the `TxControl` class. -```python -def explicit_tcl(session, path, series_id, season_id, episode_id): - query = """ - PRAGMA TablePathPrefix("{}"); +Available transaction modes: +* `ydb.QuerySerializableReadWrite()` (default); +* `ydb.QueryOnlineReadOnly(allow_inconsistent_reads=False)`; +* `ydb.QuerySnapshotReadOnly()`; +* `ydb.QueryStaleReadOnly()`. - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; +For more information about transaction modes, see [{#T}](../../../concepts/transactions.md#modes). - UPDATE episodes - SET air_date = CAST(CurrentUtcDate() AS Uint64) - WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; - """.format(path) - prepared_query = session.prepare(query) +The result of executing `tx.execute()` is an iterator. This iterator allows you to read result rows without loading the entire result set into memory. However, the iterator must be read to the end after each request to correctly maintain the transaction state on the {{ ydb-short-name }} server side. If this is not done, write queries could not be applied on the {{ ydb-short-name }} server side. For convenience, the result of the `tx.execute()` function can be used as a context manager that automatically iterates to the end upon exit. - tx = session.transaction(ydb.SerializableReadWrite()).begin() +{% list tabs %} - tx.execute( - prepared_query, { - '$seriesId': series_id, - '$seasonId': season_id, - '$episodeId': episode_id - } - ) +- Synchronous - print("\n> explicit TCL call") + ```python + with tx.execute(query) as _: + pass + ``` - tx.commit() -``` +- Asynchronous + + ```python + async with await tx.execute(query) as _: + pass + ``` + +{% endlist %} + +The code snippet below demonstrates the explicit use of `transaction().begin()` and `tx.commit()`: + +{% list tabs %} + +- Synchronous + + ```python + def explicit_transaction_control(pool: ydb.QuerySessionPool, series_id, season_id, episode_id): + def callee(session: ydb.QuerySessionSync): + query = """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """ + + # Get newly created transaction id + tx = session.transaction().begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + with tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + tx.commit() + + return pool.retry_operation_sync(callee) + ``` + +- Asynchronous + + ```python + async def explicit_transaction_control( + pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id + ): + async def callee(session: ydb.aio.QuerySessionAsync): + query = """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """ + + # Get newly created transaction id + tx = await session.transaction().begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + async with await tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + await tx.commit() + + return await pool.retry_operation_async(callee) + ``` + +{% endlist %} + +However, a transaction can be opened implicitly with the first request and can be committed automatically by setting the `commit_tx=True` flag in arguments. Implicit transaction management is preferable because it requires fewer server calls. + +## Iterating over query results {#iterating} + +If a `SELECT` query is expected to return a potentially large number of rows, it is recommended to use the `tx.execute` method instead of `pool.execute_with_retries` to avoid excessive memory consumption on the client side. Instead of buffering the entire result set into memory, `tx.execute` returns an iterator for each top-level `SELECT` statement in the query. + +Example of a `SELECT` with unlimited data and implicit transaction control: + +{% list tabs %} + +- Synchronous + + ```python + def huge_select(pool: ydb.QuerySessionPool): + def callee(session: ydb.QuerySessionSync): + query = """SELECT * from episodes;""" + + with session.transaction(ydb.QuerySnapshotReadOnly()).execute( + query, + commit_tx=True, + ) as result_sets: + print("\n> Huge SELECT call") + for result_set in result_sets: + for row in result_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return pool.retry_operation_sync(callee) + ``` + +- Asynchronous + + ```python + async def huge_select(pool: ydb.aio.QuerySessionPoolAsync): + async def callee(session: ydb.aio.QuerySessionAsync): + query = """SELECT * from episodes;""" + + async with await session.transaction(ydb.QuerySnapshotReadOnly()).execute( + query, + commit_tx=True, + ) as result_sets: + print("\n> Huge SELECT call") + async for result_set in result_sets: + for row in result_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return await pool.retry_operation_async(callee) + ``` +{% endlist %} diff --git a/ydb/docs/ru/core/dev/example-app/python/index.md b/ydb/docs/ru/core/dev/example-app/python/index.md index 366284eaced2..2f25d8230d83 100644 --- a/ydb/docs/ru/core/dev/example-app/python/index.md +++ b/ydb/docs/ru/core/dev/example-app/python/index.md @@ -1,6 +1,6 @@ # Приложение на Python -На этой странице подробно разбирается код [тестового приложения](https://github.com/ydb-platform/ydb-python-sdk/tree/master/examples/basic_example_v1), доступного в составе [Python SDK](https://github.com/ydb-platform/ydb-python-sdk) {{ ydb-short-name }}. +На этой странице подробно разбирается код [тестового приложения](https://github.com/ydb-platform/ydb-python-sdk/tree/master/examples/basic_example_v2), доступного в составе [Python SDK](https://github.com/ydb-platform/ydb-python-sdk) {{ ydb-short-name }}. ## Скачивание и запуск {#download} @@ -21,217 +21,551 @@ python3 -m pip install iso8601 Фрагмент кода приложения для инициализации драйвера: -```python -def run(endpoint, database, path): - driver_config = ydb.DriverConfig( - endpoint, database, credentials=ydb.credentials_from_env_variables(), - root_certificates=ydb.load_ydb_root_certificate(), - ) - with ydb.Driver(driver_config) as driver: - try: - driver.wait(timeout=5) - except TimeoutError: - print("Connect failed to YDB") - print("Last reported errors by discovery:") - print(driver.discovery_debug_details()) - exit(1) -``` - -Фрагмент кода приложения для создания сессии: - -```python -session = driver.table_client.session().create() -``` +{% list tabs %} + +- Синхронный + + ```python + def run(endpoint, database): + driver_config = ydb.DriverConfig( + endpoint, database, credentials=ydb.credentials_from_env_variables(), + root_certificates=ydb.load_ydb_root_certificate(), + ) + with ydb.Driver(driver_config) as driver: + try: + driver.wait(timeout=5) + except TimeoutError: + print("Connect failed to YDB") + print("Last reported errors by discovery:") + print(driver.discovery_debug_details()) + exit(1) + ``` + +- Асинхронный + + ```python + async def run(endpoint, database): + driver_config = ydb.DriverConfig( + endpoint, database, credentials=ydb.credentials_from_env_variables(), + root_certificates=ydb.load_ydb_root_certificate(), + ) + async with ydb.aio.Driver(driver_config) as driver: + try: + await driver.wait(timeout=5) + except TimeoutError: + print("Connect failed to YDB") + print("Last reported errors by discovery:") + print(driver.discovery_debug_details()) + exit(1) + ``` + +{% endlist %} + +Фрагмент кода приложения для создания пула сессий: + +{% list tabs %} + +- Синхронный + + ```python + with ydb.QuerySessionPool(driver) as pool: + pass # operations with pool here + ``` + +- Асинхронный + + ```python + async with ydb.aio.QuerySessionPoolAsync(driver) as pool: + pass # operations with pool here + ``` + +{% endlist %} + +## Выполнение запросов + +{{ ydb-short-name }} Python SDK поддерживает выполнение запросов с использованием синтаксиса YQL. +Существует два основных метода для выполнения запросов, которые имеют различные свойства и области применения: + +* `pool.execute_with_retries`: + * Буферизует весь результат в памяти клиента. + * Автоматически перезапускает выполнение в случае ошибок, которые можно устранить перезапуском. + * Не позволяет указать режим выполнения транзакции. + * Рекомендуется для разовых запросов, которые возвращают небольшой по размеру результат. + +* `tx.execute`: + * Возвращает итератор над результатом запроса, что позволяет обработать результат, который может не поместиться в памяти клиента. + * Перезапуски в случае ошибок должны обрабатываться вручную с помощью `pool.retry_operation_sync`. + * Позволяет указать режим выполнения транзакции. + * Рекомендуется для сценариев, где `pool.execute_with_retries` неэффективен. {% include [create_table.md](../_includes/steps/02_create_table.md) %} -Для создания таблиц используется метод `session.create_table()`: - -```python -def create_tables(session, path): - session.create_table( - os.path.join(path, 'series'), - ydb.TableDescription() - .with_column(ydb.Column('series_id', ydb.PrimitiveType.Uint64)) # not null column - .with_column(ydb.Column('title', ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column('series_info', ydb.OptionalType(ydb.PrimitiveType.Utf8))) - .with_column(ydb.Column('release_date', ydb.OptionalType(ydb.PrimitiveType.Uint64))) - .with_primary_key('series_id') - ) -``` +Для выполнения запросов `CREATE TABLE` стоит использовать метод `pool.execute_with_retries()`: + +{% list tabs %} + +- Синхронный + + ```python + def create_tables(pool: ydb.QuerySessionPool): + print("\nCreating table series...") + pool.execute_with_retries( + """ + CREATE TABLE `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """ + ) + + print("\nCreating table seasons...") + pool.execute_with_retries( + """ + CREATE TABLE `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """ + ) + + print("\nCreating table episodes...") + pool.execute_with_retries( + """ + CREATE TABLE `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """ + ) + ``` + +- Асинхронный + + ```python + async def create_tables(pool: ydb.aio.QuerySessionPoolAsync): + print("\nCreating table series...") + await pool.execute_with_retries( + """ + CREATE TABLE `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """ + ) + + print("\nCreating table seasons...") + await pool.execute_with_retries( + """ + CREATE TABLE `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """ + ) + + print("\nCreating table episodes...") + await pool.execute_with_retries( + """ + CREATE TABLE `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """ + ) + ``` + +{% endlist %} -В параметр path передаётся абсолютный путь от корня: +{% include [steps/03_write_queries.md](../_includes/steps/03_write_queries.md) %} -```python -full_path = os.path.join(database, path) -``` +Фрагмент кода, демонстрирующий выполнение запроса на запись/изменение данных: -С помощью метода `session.describe_table()` можно вывести информацию о структуре таблицы и убедиться, что она успешно создалась: +{% list tabs %} -```python -def describe_table(session, path, name): - result = session.describe_table(os.path.join(path, name)) - print("\n> describe table: series") - for column in result.columns: - print("column, name:", column.name, ",", str(column.type.item).strip()) -``` +- Синхронный -Приведенный фрагмент кода при запуске выводит на консоль текст: + ```python + def upsert_simple(pool: ydb.QuerySessionPool): + print("\nPerforming UPSERT into episodes...") + pool.execute_with_retries( + """ + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """ + ) + ``` -```bash -> describe table: series -('column, name:', 'series_id', ',', 'type_id: UINT64') -('column, name:', 'title', ',', 'type_id: UTF8') -('column, name:', 'series_info', ',', 'type_id: UTF8') -('column, name:', 'release_date', ',', 'type_id: UINT64') -``` -{% include [steps/03_write_queries.md](../_includes/steps/03_write_queries.md) %} +- Асинхронный -Фрагмент кода, демонстрирующий выполнение запроса на запись/изменение данных: + ```python + async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync): + print("\nPerforming UPSERT into episodes...") + await pool.execute_with_retries( + """ + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """ + ) + ``` -```python -def upsert_simple(session, path): - session.transaction().execute( - """ - PRAGMA TablePathPrefix("{}"); - UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES - (2, 6, 1, "TBD"); - """.format(path), - commit_tx=True, - ) -``` - -{% include [pragmatablepathprefix.md](../_includes/auxilary/pragmatablepathprefix.md) %} +{% endlist %} {% include [steps/04_query_processing.md](../_includes/steps/04_query_processing.md) %} -Для выполнения YQL-запросов используется метод `session.transaction().execute()`. -SDK позволяет в явном виде контролировать выполнение транзакций и настраивать необходимый режим выполнения транзакций с помощью класса `TxControl`. - -В фрагменте кода, приведенном ниже, транзакция выполняется с помощью метода `transaction().execute()`. Устанавливается режим выполнения транзакции `ydb.SerializableReadWrite()`. После завершения всех запросов транзакции она будет автоматически завершена с помощью явного указания флага: `commit_tx=True`. Тело запроса описано с помощью синтаксиса YQL и как параметр передается методу `execute`. - -```python -def select_simple(session, path): - result_sets = session.transaction(ydb.SerializableReadWrite()).execute( - """ - PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); - SELECT - series_id, - title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date - FROM series - WHERE series_id = 1; - """.format(path), - commit_tx=True, - ) - print("\n> select_simple_transaction:") - for row in result_sets[0].rows: - print("series, id: ", row.series_id, ", title: ", row.title, ", release date: ", row.release_date) - - return result_sets[0] -``` - -В качестве результата выполнения запроса возвращается `result_set`, итерирование по которому выводит на консоль текст: +Для выполнения YQL-запросов метод часто эффективен метод `pool.execute_with_retries()`. + +{% list tabs %} + +- Синхронный + + ```python + def select_simple(pool: ydb.QuerySessionPool): + print("\nCheck series table...") + result_sets = pool.execute_with_retries( + """ + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """, + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + return first_set + ``` + +- Асинхронный + + ```python + async def select_simple(pool: ydb.aio.QuerySessionPoolAsync): + print("\nCheck series table...") + result_sets = await pool.execute_with_retries( + """ + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """, + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + return first_set + ``` + +{% endlist %} + +В качестве результата выполнения запроса возвращается список из `result_set`, итерирование по которым выводит на консоль текст: ```bash > SelectSimple: series, Id: 1, title: IT Crowd, Release date: 2006-02-03 ``` - -{% include [param_prep_queries.md](../_includes/steps/07_param_prep_queries.md) %} - -```python -def select_prepared(session, path, series_id, season_id, episode_id): - query = """ - PRAGMA TablePathPrefix("{}"); - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - $format = DateTime::Format("%Y-%m-%d"); - SELECT - title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date - FROM episodes - WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; - """.format(path) - - prepared_query = session.prepare(query) - result_sets = session.transaction(ydb.SerializableReadWrite()).execute( - prepared_query, { - '$seriesId': series_id, - '$seasonId': season_id, - '$episodeId': episode_id, - }, - commit_tx=True - ) - print("\n> select_prepared_transaction:") - for row in result_sets[0].rows: - print("episode title:", row.title, ", air date:", row.air_date) - - return result_sets[0] -``` - -Приведенный фрагмент кода при запуске выводит на консоль текст: +## Параметризованные запросы {#param-queries} + +Для выполнения параметризованных запросов методы `pool.execute_with_retries()` и `tx.execute()` работают схожим образом - необходимо передать словарь с параметрами специального вида, где ключом служит имя параметра, а значение может быть одним из следующих: + +1. Обычное значение +2. Кортеж со значением и типом +3. Специальный тип `ydb.TypedValue(value=value, value_type=value_type)` + +В случае указания значения без типа, конвертация происходит по следующим правилам: + +| Python type | {{ ydb-short-name }} type | +|------------|------------------------------| +| `int` | `ydb.PrimitiveType.Int64` | +| `float` | `ydb.PrimitiveType.Double` | +| `str` | `ydb.PrimitiveType.Utf8` | +| `bytes` | `ydb.PrimitiveType.String` | +| `bool` | `ydb.PrimitiveType.Bool` | +| `list` | `ydb.ListType` | +| `dict` | `ydb.DictType` | + +{% note warning %} + +Автоматическая конвертация списков и словарей возможна только в случае однородных структур. Тип вложенного значения будет вычисляться рекурсивно по вышеупомянутым правилам. В случае использования неоднородной структуры запросы будут падать с ошибкой типа `TypeError`. + +{% endnote %} + +Фрагмент кода, демонстрирующий возможность использования параметризованных запросов: + +{% list tabs %} + +- Синхронный + + ```python + def select_with_parameters(pool: ydb.QuerySessionPool, series_id, season_id, episode_id): + result_sets = pool.execute_with_retries( + """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """, + { + "$seriesId": series_id, # data type could be defined implicitly + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via a tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via a special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + ``` + +- Асинхронный + + ```python + async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id): + result_sets = await pool.execute_with_retries( + """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """, + { + "$seriesId": series_id, # could be defined implicitly + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via a tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via a special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + ``` + +{% endlist %} + +Фрагмент кода выше при запуске выводит на консоль текст: ```bash > select_prepared_transaction: ('episode title:', u'To Build a Better Beta', ', air date:', '2016-06-05') ``` -{% include [scan_query.md](../_includes/steps/08_scan_query.md) %} - -```python -def executeScanQuery(driver): - query = ydb.ScanQuery(""" - SELECT series_id, season_id, COUNT(*) AS episodes_count - FROM episodes - GROUP BY series_id, season_id - ORDER BY series_id, season_id - """, {}) - - it = driver.table_client.scan_query(query) - - while True: - try: - result = next(it) - print result.result_set.rows - except StopIteration: - break -``` {% include [transaction_control.md](../_includes/steps/10_transaction_control.md) %} -Фрагмент кода, демонстрирующий явное использование вызовов `transaction().begin()` и `tx.Commit()`: +Метод `session.transaction().execute()` так же может быть использован для выполнения YQL запросов. В отличие от `pool.execute_with_retries`, данный метод позволяет в явном виде контролировать выполнение транзакций и настраивать необходимый режим выполнения транзакций с помощью класса `TxControl`. -```python -def explicit_tcl(session, path, series_id, season_id, episode_id): - query = """ - PRAGMA TablePathPrefix("{}"); +Доступные режимы транзакции: +* `ydb.QuerySerializableReadWrite()` (по умолчанию); +* `ydb.QueryOnlineReadOnly(allow_inconsistent_reads=False)`; +* `ydb.QuerySnapshotReadOnly()`; +* `ydb.QueryStaleReadOnly()`. - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; +Подробнее про режимы транзакций описано в [{#T}](../../../concepts/transactions.md#modes). - UPDATE episodes - SET air_date = CAST(CurrentUtcDate() AS Uint64) - WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; - """.format(path) - prepared_query = session.prepare(query) +Результатом выполнения `tx.execute()` является итератор. Итератор позволяет считать неограниченное количество строк и объем данных, не загружая в память весь результат. Однако, для корректного сохранения состояния транзакции на стороне {{ ydb-short-name }} итератор необходимо прочитывать до конца после каждого запроса. Если этого не сделать, пишущие запросы могут не выполниться на стороне {{ ydb-short-name }}. Для удобства результат функции `tx.execute()` представлен в виде контекстного менеджера, который долистывает итератор до конца после выхода. - tx = session.transaction(ydb.SerializableReadWrite()).begin() +{% list tabs %} - tx.execute( - prepared_query, { - '$seriesId': series_id, - '$seasonId': season_id, - '$episodeId': episode_id - } - ) +- Синхронный - print("\n> explicit TCL call") + ```python + with tx.execute(query) as _: + pass + ``` - tx.commit() -``` +- Асинхронный + + ```python + async with await tx.execute(query) as _: + pass + ``` + +{% endlist %} + +Фрагмент кода, демонстрирующий явное использование вызовов `transaction().begin()` и `tx.commit()`: + +{% list tabs %} + +- Синхронный + + ```python + def explicit_transaction_control(pool: ydb.QuerySessionPool, series_id, season_id, episode_id): + def callee(session: ydb.QuerySessionSync): + query = """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """ + + # Get newly created transaction id + tx = session.transaction().begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + with tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + tx.commit() + + return pool.retry_operation_sync(callee) + ``` + +- Асинхронный + + ```python + async def explicit_transaction_control( + pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id + ): + async def callee(session: ydb.aio.QuerySessionAsync): + query = """ + DECLARE $seriesId AS Int64; + DECLARE $seasonId AS Int64; + DECLARE $episodeId AS Int64; + + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """ + + # Get newly created transaction id + tx = await session.transaction().begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + async with await tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + await tx.commit() + + return await pool.retry_operation_async(callee) + ``` + +{% endlist %} + +Однако стоит помнить, что транзакция может быть открыта неявно при первом запросе. Завершиться же она может автоматически с явным указанием флага `commit_tx=True`. +Неявное управление транзакцией предпочтительно, так как требует меньше обращений к серверу. Пример неявного управления будет продемонстрирован в следующем блоке. + +## Итерирование по результатам запроса {#iterating} + +Если ожидается, что результат `SELECT` запроса будет иметь потенциально большое количество найденных строк, рекомендуется использовать метод `tx.execute` вместо `pool.execute_with_retries` для избежания чрезмерного потребления памяти на стороне клиента. + +Пример `SELECT` с неограниченным количеством данных и неявным контролем транзакции: + +{% list tabs %} + +- Синхронный + + ```python + def huge_select(pool: ydb.QuerySessionPool): + def callee(session: ydb.QuerySessionSync): + query = """SELECT * from episodes;""" + + with session.transaction(ydb.QuerySnapshotReadOnly()).execute( + query, + commit_tx=True, + ) as result_sets: + print("\n> Huge SELECT call") + for result_set in result_sets: + for row in result_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return pool.retry_operation_sync(callee) + ``` + +- Асинхронный + + ```python + async def huge_select(pool: ydb.aio.QuerySessionPoolAsync): + async def callee(session: ydb.aio.QuerySessionAsync): + query = """SELECT * from episodes;""" + + async with await session.transaction(ydb.QuerySnapshotReadOnly()).execute( + query, + commit_tx=True, + ) as result_sets: + print("\n> Huge SELECT call") + async for result_set in result_sets: + for row in result_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return await pool.retry_operation_async(callee) + ``` +{% endlist %}