From 3e2440f966a19569219f257cd6816f9094a5e076 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 11:03:50 +0300 Subject: [PATCH 01/13] basic example with query service --- examples/basic_example_v2/README.md | 5 + examples/basic_example_v2/__main__.py | 28 ++ examples/basic_example_v2/basic_example.py | 342 ++++++++++++++++++ .../basic_example_v2/basic_example_data.py | 182 ++++++++++ ydb/convert.py | 2 +- ydb/query/base.py | 4 +- ydb/query/pool.py | 9 + ydb/query/session.py | 2 +- ydb/query/transaction.py | 8 +- 9 files changed, 575 insertions(+), 7 deletions(-) create mode 100644 examples/basic_example_v2/README.md create mode 100644 examples/basic_example_v2/__main__.py create mode 100644 examples/basic_example_v2/basic_example.py create mode 100644 examples/basic_example_v2/basic_example_data.py diff --git a/examples/basic_example_v2/README.md b/examples/basic_example_v2/README.md new file mode 100644 index 00000000..92ebe21a --- /dev/null +++ b/examples/basic_example_v2/README.md @@ -0,0 +1,5 @@ +# YDB Python SDK Example: basic_example_v2 + +Example code demonstrating the basic YDB Python SDK operations. + +See the top-level [README.md](../README.md) file for instructions on running this example. diff --git a/examples/basic_example_v2/__main__.py b/examples/basic_example_v2/__main__.py new file mode 100644 index 00000000..2397a2c1 --- /dev/null +++ b/examples/basic_example_v2/__main__.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import argparse +import basic_example +import logging + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="""\033[92mYDB basic example.\x1b[0m\n""", + ) + parser.add_argument("-d", "--database", help="Name of the database to use", default="/local") + parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136") + parser.add_argument("-p", "--path", default="") + parser.add_argument("-v", "--verbose", default=False, action="store_true") + + args = parser.parse_args() + + if args.verbose: + logger = logging.getLogger("ydb.pool.Discovery") + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler()) + + basic_example.run( + args.endpoint, + args.database, + args.path, + ) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py new file mode 100644 index 00000000..f691e85a --- /dev/null +++ b/examples/basic_example_v2/basic_example.py @@ -0,0 +1,342 @@ +# -*- coding: utf-8 -*- +import posixpath +import ydb +import basic_example_data + +# Table path prefix allows to put the working tables into the specific directory +# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")` +# at the beginning of the query allows to reference the tables through +# their names "under" the specified directory. +# +# TablePathPrefix has to be defined as an absolute path, which has to be started +# with the current database location. +# +# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix + +DropTablesQuery = """PRAGMA TablePathPrefix("{}"); +DROP TABLE IF EXISTS series; +DROP TABLE IF EXISTS seasons; +DROP TABLE IF EXISTS episodes; +""" + +FillDataQuery = """PRAGMA TablePathPrefix("{}"); + +DECLARE $seriesData AS List>; + +DECLARE $seasonsData AS List>; + +DECLARE $episodesData AS List>; + +REPLACE INTO series +SELECT + series_id, + title, + series_info, + CAST(release_date AS Uint64) AS release_date +FROM AS_TABLE($seriesData); + +REPLACE INTO seasons +SELECT + series_id, + season_id, + title, + CAST(first_aired AS Uint64) AS first_aired, + CAST(last_aired AS Uint64) AS last_aired +FROM AS_TABLE($seasonsData); + +REPLACE INTO episodes +SELECT + series_id, + season_id, + episode_id, + title, + CAST(air_date AS Uint64) AS air_date +FROM AS_TABLE($episodesData); +""" + + +def fill_tables_with_data(pool, path): + print("\nFilling tables with data...") + + global FillDataQuery + + def callee(session): + + prepared_query = FillDataQuery.format(path) + with session.transaction(ydb.QuerySerializableReadWrite()).execute( + prepared_query, + { + "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), + "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), + "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), + }, + commit_tx=True, + ) as result_sets: + pass + + return pool.retry_operation_sync(callee) + + +def select_simple(pool, path): + print("\nCheck series table...") + + def callee(session): + # new transaction in serializable read write mode + # if query successfully completed you will get result sets. + # otherwise exception will be raised + with session.transaction(ydb.QuerySerializableReadWrite()).execute( + """ + PRAGMA TablePathPrefix("{}"); + $format = DateTime::Format("%Y-%m-%d"); + SELECT + series_id, + title, + $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date + FROM series + WHERE series_id = 1; + """.format( + path + ), + commit_tx=True, + ) as result_sets: + first_set = next(result_sets) + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + + return first_set + + return pool.retry_operation_sync(callee) + + +def upsert_simple(pool, path): + print(f"\nPerforming UPSERT into episodes...") + + def callee(session): + with session.transaction().execute( + """ + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $seasonId AS Uint64; + DECLARE $episodeId AS Uint64; + DECLARE $title AS Utf8; + + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """.format( + path + ), + commit_tx=True, + ) as result_sets: + pass + + return pool.retry_operation_sync(callee) + + +def select_with_parameters(pool, path, series_id, season_id, episode_id): + def callee(session): + query = """ + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $seasonId AS Uint64; + DECLARE $episodeId AS Uint64; + + $format = DateTime::Format("%Y-%m-%d"); + SELECT + title, + $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ) + + with session.transaction(ydb.QuerySerializableReadWrite()).execute( + query, + { + "$seriesId": series_id, # could be defined implicit in case of int, str, bool + "$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Uint64), # could be defined via special class + }, + commit_tx=True, + ) as result_sets: + print("\n> select_prepared_transaction:") + first_set = next(result_sets) + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + + return pool.retry_operation_sync(callee) + + +# Show usage of explicit Begin/Commit transaction control calls. +# In most cases it's better to use transaction control settings in session.transaction +# calls instead to avoid additional hops to YDB cluster and allow more efficient +# execution of queries. +def explicit_tcl(pool, path, series_id, season_id, episode_id): + def callee(session): + query = """ + PRAGMA TablePathPrefix("{}"); + + DECLARE $seriesId AS Uint64; + DECLARE $seasonId AS Uint64; + DECLARE $episodeId AS Uint64; + + UPDATE episodes + SET air_date = CAST(CurrentUtcDate() AS Uint64) + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ) + + # Get newly created transaction id + tx = session.transaction(ydb.QuerySerializableReadWrite()).begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + with tx.execute( + query, + {"$seriesId": series_id, "$seasonId": season_id, "$episodeId": episode_id}, + ) as result_sets: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + tx.commit() + + return pool.retry_operation_sync(callee) + + +def drop_tables(pool, path): + print("\nCleaning up existing tables...") + pool.execute_with_retries(DropTablesQuery.format(path)) + + +def create_tables(pool, path): + print("\nCreating table series...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `series` ( + `series_id` Uint64, + `title` Utf8, + `series_info` Utf8, + `release_date` Uint64, + PRIMARY KEY (`series_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table seasons...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `seasons` ( + `series_id` Uint64, + `season_id` Uint64, + `title` Utf8, + `first_aired` Uint64, + `last_aired` Uint64, + PRIMARY KEY (`series_id`, `season_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table episodes...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `episodes` ( + `series_id` Uint64, + `season_id` Uint64, + `episode_id` Uint64, + `title` Utf8, + `air_date` Uint64, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """.format( + path + ) + ) + + +def is_directory_exists(driver, path): + try: + return driver.scheme_client.describe_path(path).is_directory() + except ydb.SchemeError: + return False + + +def ensure_path_exists(driver, database, path): + paths_to_create = list() + path = path.rstrip("/") + while path not in ("", database): + full_path = posixpath.join(database, path) + if is_directory_exists(driver, full_path): + break + paths_to_create.append(full_path) + path = posixpath.dirname(path).rstrip("/") + + while len(paths_to_create) > 0: + full_path = paths_to_create.pop(-1) + driver.scheme_client.make_directory(full_path) + + +def run(endpoint, database, path): + with ydb.Driver( + endpoint=endpoint, + database=database, + # credentials=ydb.credentials_from_env_variables() + ) as driver: + driver.wait(timeout=5, fail_fast=True) + + with ydb.QuerySessionPool(driver) as pool: + + ensure_path_exists(driver, database, path) + + # absolute path - prefix to the table's names, + # including the database location + full_path = posixpath.join(database, path) + + drop_tables(pool, full_path) + + create_tables(pool, full_path) + + fill_tables_with_data(pool, full_path) + + select_simple(pool, full_path) + + upsert_simple(pool, full_path) + + select_with_parameters(pool, full_path, 2, 3, 7) + select_with_parameters(pool, full_path, 2, 3, 8) + + explicit_tcl(pool, full_path, 2, 6, 1) + select_with_parameters(pool, full_path, 2, 6, 1) diff --git a/examples/basic_example_v2/basic_example_data.py b/examples/basic_example_v2/basic_example_data.py new file mode 100644 index 00000000..2363fe2e --- /dev/null +++ b/examples/basic_example_v2/basic_example_data.py @@ -0,0 +1,182 @@ +# -*- coding: utf-8 -*- +import iso8601 +import ydb + + +def to_days(date): + timedelta = iso8601.parse_date(date) - iso8601.parse_date("1970-1-1") + return timedelta.days + + +class Series(object): + __slots__ = ("series_id", "title", "release_date", "series_info") + + def __init__(self, series_id, title, release_date, series_info): + self.series_id = series_id + self.title = title + self.release_date = to_days(release_date) + self.series_info = series_info + + +class Season(object): + __slots__ = ("series_id", "season_id", "title", "first_aired", "last_aired") + + def __init__(self, series_id, season_id, title, first_aired, last_aired): + self.series_id = series_id + self.season_id = season_id + self.title = title + self.first_aired = to_days(first_aired) + self.last_aired = to_days(last_aired) + + +class Episode(object): + __slots__ = ("series_id", "season_id", "episode_id", "title", "air_date") + + def __init__(self, series_id, season_id, episode_id, title, air_date): + self.series_id = series_id + self.season_id = season_id + self.episode_id = episode_id + self.title = title + self.air_date = to_days(air_date) + + +def get_series_data(): + return [ + Series( + 1, + "IT Crowd", + "2006-02-03", + "The IT Crowd is a British sitcom produced by Channel 4, written by Graham Linehan, produced by " + "Ash Atalla and starring Chris O'Dowd, Richard Ayoade, Katherine Parkinson, and Matt Berry.", + ), + Series( + 2, + "Silicon Valley", + "2014-04-06", + "Silicon Valley is an American comedy television series created by Mike Judge, John Altschuler and " + "Dave Krinsky. The series focuses on five young men who founded a startup company in Silicon Valley.", + ), + ] + + +def get_series_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("series_info", ydb.PrimitiveType.Utf8) + struct_type.add_member("release_date", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) + + +def get_seasons_data(): + return [ + Season(1, 1, "Season 1", "2006-02-03", "2006-03-03"), + Season(1, 2, "Season 2", "2007-08-24", "2007-09-28"), + Season(1, 3, "Season 3", "2008-11-21", "2008-12-26"), + Season(1, 4, "Season 4", "2010-06-25", "2010-07-30"), + Season(2, 1, "Season 1", "2014-04-06", "2014-06-01"), + Season(2, 2, "Season 2", "2015-04-12", "2015-06-14"), + Season(2, 3, "Season 3", "2016-04-24", "2016-06-26"), + Season(2, 4, "Season 4", "2017-04-23", "2017-06-25"), + Season(2, 5, "Season 5", "2018-03-25", "2018-05-13"), + ] + + +def get_seasons_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("first_aired", ydb.PrimitiveType.Date) + struct_type.add_member("last_aired", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) + + +def get_episodes_data(): + return [ + Episode(1, 1, 1, "Yesterday's Jam", "2006-02-03"), + Episode(1, 1, 2, "Calamity Jen", "2006-02-03"), + Episode(1, 1, 3, "Fifty-Fifty", "2006-02-10"), + Episode(1, 1, 4, "The Red Door", "2006-02-17"), + Episode(1, 1, 5, "The Haunting of Bill Crouse", "2006-02-24"), + Episode(1, 1, 6, "Aunt Irma Visits", "2006-03-03"), + Episode(1, 2, 1, "The Work Outing", "2006-08-24"), + Episode(1, 2, 2, "Return of the Golden Child", "2007-08-31"), + Episode(1, 2, 3, "Moss and the German", "2007-09-07"), + Episode(1, 2, 4, "The Dinner Party", "2007-09-14"), + Episode(1, 2, 5, "Smoke and Mirrors", "2007-09-21"), + Episode(1, 2, 6, "Men Without Women", "2007-09-28"), + Episode(1, 3, 1, "From Hell", "2008-11-21"), + Episode(1, 3, 2, "Are We Not Men?", "2008-11-28"), + Episode(1, 3, 3, "Tramps Like Us", "2008-12-05"), + Episode(1, 3, 4, "The Speech", "2008-12-12"), + Episode(1, 3, 5, "Friendface", "2008-12-19"), + Episode(1, 3, 6, "Calendar Geeks", "2008-12-26"), + Episode(1, 4, 1, "Jen The Fredo", "2010-06-25"), + Episode(1, 4, 2, "The Final Countdown", "2010-07-02"), + Episode(1, 4, 3, "Something Happened", "2010-07-09"), + Episode(1, 4, 4, "Italian For Beginners", "2010-07-16"), + Episode(1, 4, 5, "Bad Boys", "2010-07-23"), + Episode(1, 4, 6, "Reynholm vs Reynholm", "2010-07-30"), + ] + + +def get_episodes_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("episode_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("air_date", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) + + +def get_episodes_data_for_bulk_upsert(): + return [ + Episode(2, 1, 1, "Minimum Viable Product", "2014-04-06"), + Episode(2, 1, 2, "The Cap Table", "2014-04-13"), + Episode(2, 1, 3, "Articles of Incorporation", "2014-04-20"), + Episode(2, 1, 4, "Fiduciary Duties", "2014-04-27"), + Episode(2, 1, 5, "Signaling Risk", "2014-05-04"), + Episode(2, 1, 6, "Third Party Insourcing", "2014-05-11"), + Episode(2, 1, 7, "Proof of Concept", "2014-05-18"), + Episode(2, 1, 8, "Optimal Tip-to-Tip Efficiency", "2014-06-01"), + Episode(2, 2, 1, "Sand Hill Shuffle", "2015-04-12"), + Episode(2, 2, 2, "Runaway Devaluation", "2015-04-19"), + Episode(2, 2, 3, "Bad Money", "2015-04-26"), + Episode(2, 2, 4, "The Lady", "2015-05-03"), + Episode(2, 2, 5, "Server Space", "2015-05-10"), + Episode(2, 2, 6, "Homicide", "2015-05-17"), + Episode(2, 2, 7, "Adult Content", "2015-05-24"), + Episode(2, 2, 8, "White Hat/Black Hat", "2015-05-31"), + Episode(2, 2, 9, "Binding Arbitration", "2015-06-07"), + Episode(2, 2, 10, "Two Days of the Condor", "2015-06-14"), + Episode(2, 3, 1, "Founder Friendly", "2016-04-24"), + Episode(2, 3, 2, "Two in the Box", "2016-05-01"), + Episode(2, 3, 3, "Meinertzhagen's Haversack", "2016-05-08"), + Episode(2, 3, 4, "Maleant Data Systems Solutions", "2016-05-15"), + Episode(2, 3, 5, "The Empty Chair", "2016-05-22"), + Episode(2, 3, 6, "Bachmanity Insanity", "2016-05-29"), + Episode(2, 3, 7, "To Build a Better Beta", "2016-06-05"), + Episode(2, 3, 8, "Bachman's Earnings Over-Ride", "2016-06-12"), + Episode(2, 3, 9, "Daily Active Users", "2016-06-19"), + Episode(2, 3, 10, "The Uptick", "2016-06-26"), + Episode(2, 4, 1, "Success Failure", "2017-04-23"), + Episode(2, 4, 2, "Terms of Service", "2017-04-30"), + Episode(2, 4, 3, "Intellectual Property", "2017-05-07"), + Episode(2, 4, 4, "Teambuilding Exercise", "2017-05-14"), + Episode(2, 4, 5, "The Blood Boy", "2017-05-21"), + Episode(2, 4, 6, "Customer Service", "2017-05-28"), + Episode(2, 4, 7, "The Patent Troll", "2017-06-04"), + Episode(2, 4, 8, "The Keenan Vortex", "2017-06-11"), + Episode(2, 4, 9, "Hooli-Con", "2017-06-18"), + Episode(2, 4, 10, "Server Error", "2017-06-25"), + Episode(2, 5, 1, "Grow Fast or Die Slow", "2018-03-25"), + Episode(2, 5, 2, "Reorientation", "2018-04-01"), + Episode(2, 5, 3, "Chief Operating Officer", "2018-04-08"), + Episode(2, 5, 4, "Tech Evangelist", "2018-04-15"), + Episode(2, 5, 5, "Facial Recognition", "2018-04-22"), + Episode(2, 5, 6, "Artificial Emotional Intelligence", "2018-04-29"), + Episode(2, 5, 7, "Initial Coin Offering", "2018-05-06"), + Episode(2, 5, 8, "Fifty-One Percent", "2018-05-13"), + ] diff --git a/ydb/convert.py b/ydb/convert.py index 63a5dbe4..9e1478b5 100644 --- a/ydb/convert.py +++ b/ydb/convert.py @@ -303,7 +303,7 @@ def query_parameters_to_pb(parameters): _from_python_type_map = { - int: types.PrimitiveType.Int64, + int: types.PrimitiveType.Uint64, float: types.PrimitiveType.Float, bool: types.PrimitiveType.Bool, str: types.PrimitiveType.Utf8, diff --git a/ydb/query/base.py b/ydb/query/base.py index e08d9f52..9c6462ce 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -238,14 +238,14 @@ def tx_id(self) -> Optional[str]: pass @abc.abstractmethod - def begin(self, settings: Optional[QueryClientSettings] = None) -> None: + def begin(self, settings: Optional[QueryClientSettings] = None) -> "IQueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction :param settings: A request settings - :return: None or exception if begin is failed + :return: Transaction object or exception if begin is failed """ pass diff --git a/ydb/query/pool.py b/ydb/query/pool.py index e7514cdf..51efcb25 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -77,6 +77,15 @@ def wrapped_callee(): return retry_operation_sync(wrapped_callee, retry_settings) + def stop(self, timeout=None): + pass # TODO: implement + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + class SimpleQuerySessionCheckout: def __init__(self, pool: QuerySessionPool): diff --git a/ydb/query/session.py b/ydb/query/session.py index d6034d34..87351b75 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -283,9 +283,9 @@ def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQ def execute( self, query: str, + parameters: dict = None, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, concurrent_result_sets: bool = False, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 0a493202..ba0d4a4d 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -313,19 +313,21 @@ def _move_to_commited(self) -> None: return self._tx_state._change_state(QueryTxStateEnum.COMMITTED) - def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "BaseQueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction :param settings: A request settings - :return: None or exception if begin is failed + :return: Transaction object or exception if begin is failed """ self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) self._begin_call(settings) + return self + def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: """WARNING: This API is experimental and could be changed. @@ -365,10 +367,10 @@ def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: def execute( self, query: str, + parameters: Optional[dict] = None, commit_tx: Optional[bool] = False, syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, - parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. From a299e2291e3805aeeee99c8bfb196c8f28fb6508 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 11:08:11 +0300 Subject: [PATCH 02/13] style fixes --- examples/basic_example_v2/basic_example.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index f691e85a..b008d73c 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -177,7 +177,10 @@ def callee(session): { "$seriesId": series_id, # could be defined implicit in case of int, str, bool "$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple - "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Uint64), # could be defined via special class + "$episodeId": ydb.TypedValue( + episode_id, + ydb.PrimitiveType.Uint64 + ), # could be defined via special class }, commit_tx=True, ) as result_sets: @@ -282,7 +285,7 @@ def create_tables(pool, path): PRIMARY KEY (`series_id`, `season_id`, `episode_id`) ) """.format( - path + path ) ) From e207c998bc06f1012ad7be6e2d9db586b076bb34 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 11:18:43 +0300 Subject: [PATCH 03/13] style fixes --- examples/basic_example_v2/basic_example.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index b008d73c..2f638f4f 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -178,8 +178,7 @@ def callee(session): "$seriesId": series_id, # could be defined implicit in case of int, str, bool "$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple "$episodeId": ydb.TypedValue( - episode_id, - ydb.PrimitiveType.Uint64 + episode_id, ydb.PrimitiveType.Uint64 ), # could be defined via special class }, commit_tx=True, From 565451259987a0509062968ce3a4be2bbecf6520 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 11:26:19 +0300 Subject: [PATCH 04/13] style fixes --- examples/basic_example_v2/basic_example.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 2f638f4f..2a6fe504 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -85,7 +85,7 @@ def callee(session): "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), }, commit_tx=True, - ) as result_sets: + ) as _: pass return pool.retry_operation_sync(callee) @@ -130,7 +130,7 @@ def callee(session): def upsert_simple(pool, path): - print(f"\nPerforming UPSERT into episodes...") + print("\nPerforming UPSERT into episodes...") def callee(session): with session.transaction().execute( @@ -147,7 +147,7 @@ def callee(session): path ), commit_tx=True, - ) as result_sets: + ) as _: pass return pool.retry_operation_sync(callee) @@ -221,7 +221,7 @@ def callee(session): with tx.execute( query, {"$seriesId": series_id, "$seasonId": season_id, "$episodeId": episode_id}, - ) as result_sets: + ) as _: pass print("\n> explicit TCL call") From 5491395fe4d0b7b0ca1d5293b445860231bcceab Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 12:49:13 +0300 Subject: [PATCH 05/13] revert Uint64->Int64 --- examples/basic_example_v2/basic_example.py | 24 ++++++---------------- ydb/convert.py | 2 +- 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 2a6fe504..1cded980 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -136,12 +136,6 @@ def callee(session): with session.transaction().execute( """ PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - DECLARE $title AS Utf8; - UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); """.format( path @@ -157,11 +151,6 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - $format = DateTime::Format("%Y-%m-%d"); SELECT title, @@ -175,7 +164,7 @@ def callee(session): with session.transaction(ydb.QuerySerializableReadWrite()).execute( query, { - "$seriesId": series_id, # could be defined implicit in case of int, str, bool + "$seriesId": (series_id, ydb.PrimitiveType.Uint64), "$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple "$episodeId": ydb.TypedValue( episode_id, ydb.PrimitiveType.Uint64 @@ -201,11 +190,6 @@ def explicit_tcl(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); - - DECLARE $seriesId AS Uint64; - DECLARE $seasonId AS Uint64; - DECLARE $episodeId AS Uint64; - UPDATE episodes SET air_date = CAST(CurrentUtcDate() AS Uint64) WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; @@ -220,7 +204,11 @@ def callee(session): # Transaction control settings continues active transaction (tx) with tx.execute( query, - {"$seriesId": series_id, "$seasonId": season_id, "$episodeId": episode_id}, + { + "$seriesId": (series_id, ydb.PrimitiveType.Uint64), + "$seasonId": (season_id, ydb.PrimitiveType.Uint64), + "$episodeId": (episode_id, ydb.PrimitiveType.Uint64), + }, ) as _: pass diff --git a/ydb/convert.py b/ydb/convert.py index 9e1478b5..63a5dbe4 100644 --- a/ydb/convert.py +++ b/ydb/convert.py @@ -303,7 +303,7 @@ def query_parameters_to_pb(parameters): _from_python_type_map = { - int: types.PrimitiveType.Uint64, + int: types.PrimitiveType.Int64, float: types.PrimitiveType.Float, bool: types.PrimitiveType.Bool, str: types.PrimitiveType.Utf8, From eaa8df2df3603fab15970c31974d716cf1b7aad2 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 13 Aug 2024 16:02:24 +0300 Subject: [PATCH 06/13] move bulk upsert data to simple upsert --- examples/basic_example_v2/basic_example.py | 2 +- .../basic_example_v2/basic_example_data.py | 25 ++++++++----------- 2 files changed, 11 insertions(+), 16 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 1cded980..443a32b5 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -303,7 +303,7 @@ def run(endpoint, database, path): with ydb.Driver( endpoint=endpoint, database=database, - # credentials=ydb.credentials_from_env_variables() + credentials=ydb.credentials_from_env_variables() ) as driver: driver.wait(timeout=5, fail_fast=True) diff --git a/examples/basic_example_v2/basic_example_data.py b/examples/basic_example_v2/basic_example_data.py index 2363fe2e..1aebde89 100644 --- a/examples/basic_example_v2/basic_example_data.py +++ b/examples/basic_example_v2/basic_example_data.py @@ -118,21 +118,6 @@ def get_episodes_data(): Episode(1, 4, 4, "Italian For Beginners", "2010-07-16"), Episode(1, 4, 5, "Bad Boys", "2010-07-23"), Episode(1, 4, 6, "Reynholm vs Reynholm", "2010-07-30"), - ] - - -def get_episodes_data_type(): - struct_type = ydb.StructType() - struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("episode_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("title", ydb.PrimitiveType.Utf8) - struct_type.add_member("air_date", ydb.PrimitiveType.Date) - return ydb.ListType(struct_type) - - -def get_episodes_data_for_bulk_upsert(): - return [ Episode(2, 1, 1, "Minimum Viable Product", "2014-04-06"), Episode(2, 1, 2, "The Cap Table", "2014-04-13"), Episode(2, 1, 3, "Articles of Incorporation", "2014-04-20"), @@ -180,3 +165,13 @@ def get_episodes_data_for_bulk_upsert(): Episode(2, 5, 7, "Initial Coin Offering", "2018-05-06"), Episode(2, 5, 8, "Fifty-One Percent", "2018-05-13"), ] + + +def get_episodes_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("episode_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("air_date", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) From 45807f5e8ff0a64545ceed749ddcebd139352a4a Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 10:25:30 +0300 Subject: [PATCH 07/13] review fixes --- examples/basic_example_v2/basic_example.py | 122 ++++++++---------- .../basic_example_v2/basic_example_data.py | 12 +- 2 files changed, 62 insertions(+), 72 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 443a32b5..fab88702 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -22,22 +22,22 @@ FillDataQuery = """PRAGMA TablePathPrefix("{}"); DECLARE $seriesData AS List>; DECLARE $seasonsData AS List>; DECLARE $episodesData AS List>; @@ -46,7 +46,7 @@ series_id, title, series_info, - CAST(release_date AS Uint64) AS release_date + release_date FROM AS_TABLE($seriesData); REPLACE INTO seasons @@ -54,8 +54,8 @@ series_id, season_id, title, - CAST(first_aired AS Uint64) AS first_aired, - CAST(last_aired AS Uint64) AS last_aired + first_aired, + last_aired FROM AS_TABLE($seasonsData); REPLACE INTO episodes @@ -64,7 +64,7 @@ season_id, episode_id, title, - CAST(air_date AS Uint64) AS air_date + air_date FROM AS_TABLE($episodesData); """ @@ -76,9 +76,9 @@ def fill_tables_with_data(pool, path): def callee(session): - prepared_query = FillDataQuery.format(path) + query = FillDataQuery.format(path) with session.transaction(ydb.QuerySerializableReadWrite()).execute( - prepared_query, + query, { "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), @@ -93,40 +93,31 @@ def callee(session): def select_simple(pool, path): print("\nCheck series table...") + result_sets = pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """.format( + path + ), + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) - def callee(session): - # new transaction in serializable read write mode - # if query successfully completed you will get result sets. - # otherwise exception will be raised - with session.transaction(ydb.QuerySerializableReadWrite()).execute( - """ - PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); - SELECT - series_id, - title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date - FROM series - WHERE series_id = 1; - """.format( - path - ), - commit_tx=True, - ) as result_sets: - first_set = next(result_sets) - for row in first_set.rows: - print( - "series, id: ", - row.series_id, - ", title: ", - row.title, - ", release date: ", - row.release_date, - ) - - return first_set - - return pool.retry_operation_sync(callee) + return first_set def upsert_simple(pool, path): @@ -151,10 +142,9 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); SELECT title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date + air_date FROM episodes WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; """.format( @@ -164,10 +154,10 @@ def callee(session): with session.transaction(ydb.QuerySerializableReadWrite()).execute( query, { - "$seriesId": (series_id, ydb.PrimitiveType.Uint64), - "$seasonId": (season_id, ydb.PrimitiveType.Uint64), # could be defined via tuple + "$seriesId": series_id, # could be defined implicit + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple "$episodeId": ydb.TypedValue( - episode_id, ydb.PrimitiveType.Uint64 + episode_id, ydb.PrimitiveType.Int64 ), # could be defined via special class }, commit_tx=True, @@ -186,12 +176,12 @@ def callee(session): # In most cases it's better to use transaction control settings in session.transaction # calls instead to avoid additional hops to YDB cluster and allow more efficient # execution of queries. -def explicit_tcl(pool, path, series_id, season_id, episode_id): +def explicit_transaction_control(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); UPDATE episodes - SET air_date = CAST(CurrentUtcDate() AS Uint64) + SET air_date = CurrentUtcDate() WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; """.format( path @@ -205,9 +195,9 @@ def callee(session): with tx.execute( query, { - "$seriesId": (series_id, ydb.PrimitiveType.Uint64), - "$seasonId": (season_id, ydb.PrimitiveType.Uint64), - "$episodeId": (episode_id, ydb.PrimitiveType.Uint64), + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), }, ) as _: pass @@ -231,10 +221,10 @@ def create_tables(pool, path): """ PRAGMA TablePathPrefix("{}"); CREATE table `series` ( - `series_id` Uint64, + `series_id` Int64, `title` Utf8, `series_info` Utf8, - `release_date` Uint64, + `release_date` Date, PRIMARY KEY (`series_id`) ) """.format( @@ -247,11 +237,11 @@ def create_tables(pool, path): """ PRAGMA TablePathPrefix("{}"); CREATE table `seasons` ( - `series_id` Uint64, - `season_id` Uint64, + `series_id` Int64, + `season_id` Int64, `title` Utf8, - `first_aired` Uint64, - `last_aired` Uint64, + `first_aired` Date, + `last_aired` Date, PRIMARY KEY (`series_id`, `season_id`) ) """.format( @@ -264,11 +254,11 @@ def create_tables(pool, path): """ PRAGMA TablePathPrefix("{}"); CREATE table `episodes` ( - `series_id` Uint64, - `season_id` Uint64, - `episode_id` Uint64, + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, `title` Utf8, - `air_date` Uint64, + `air_date` Date, PRIMARY KEY (`series_id`, `season_id`, `episode_id`) ) """.format( @@ -328,5 +318,5 @@ def run(endpoint, database, path): select_with_parameters(pool, full_path, 2, 3, 7) select_with_parameters(pool, full_path, 2, 3, 8) - explicit_tcl(pool, full_path, 2, 6, 1) + explicit_transaction_control(pool, full_path, 2, 6, 1) select_with_parameters(pool, full_path, 2, 6, 1) diff --git a/examples/basic_example_v2/basic_example_data.py b/examples/basic_example_v2/basic_example_data.py index 1aebde89..3c9616f4 100644 --- a/examples/basic_example_v2/basic_example_data.py +++ b/examples/basic_example_v2/basic_example_data.py @@ -61,7 +61,7 @@ def get_series_data(): def get_series_data_type(): struct_type = ydb.StructType() - struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) struct_type.add_member("title", ydb.PrimitiveType.Utf8) struct_type.add_member("series_info", ydb.PrimitiveType.Utf8) struct_type.add_member("release_date", ydb.PrimitiveType.Date) @@ -84,8 +84,8 @@ def get_seasons_data(): def get_seasons_data_type(): struct_type = ydb.StructType() - struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) + struct_type.add_member("season_id", ydb.PrimitiveType.Int64) struct_type.add_member("title", ydb.PrimitiveType.Utf8) struct_type.add_member("first_aired", ydb.PrimitiveType.Date) struct_type.add_member("last_aired", ydb.PrimitiveType.Date) @@ -169,9 +169,9 @@ def get_episodes_data(): def get_episodes_data_type(): struct_type = ydb.StructType() - struct_type.add_member("series_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("season_id", ydb.PrimitiveType.Uint64) - struct_type.add_member("episode_id", ydb.PrimitiveType.Uint64) + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) + struct_type.add_member("season_id", ydb.PrimitiveType.Int64) + struct_type.add_member("episode_id", ydb.PrimitiveType.Int64) struct_type.add_member("title", ydb.PrimitiveType.Utf8) struct_type.add_member("air_date", ydb.PrimitiveType.Date) return ydb.ListType(struct_type) From e4a2fd895168ff30631b67568ae87a656d65b23f Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 10:28:59 +0300 Subject: [PATCH 08/13] style fixes --- examples/basic_example_v2/basic_example.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index fab88702..0f810698 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -96,10 +96,11 @@ def select_simple(pool, path): result_sets = pool.execute_with_retries( """ PRAGMA TablePathPrefix("{}"); + $format = DateTime::Format("%Y-%m-%d"); SELECT series_id, title, - release_date + $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date FROM series WHERE series_id = 1; """.format( @@ -142,9 +143,10 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); + $format = DateTime::Format("%Y-%m-%d"); SELECT title, - air_date + $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date FROM episodes WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; """.format( @@ -156,9 +158,7 @@ def callee(session): { "$seriesId": series_id, # could be defined implicit "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple - "$episodeId": ydb.TypedValue( - episode_id, ydb.PrimitiveType.Int64 - ), # could be defined via special class + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class }, commit_tx=True, ) as result_sets: @@ -293,7 +293,7 @@ def run(endpoint, database, path): with ydb.Driver( endpoint=endpoint, database=database, - credentials=ydb.credentials_from_env_variables() + credentials=ydb.credentials_from_env_variables(), ) as driver: driver.wait(timeout=5, fail_fast=True) From c74e46861026907f531753ae983134588558013b Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 13:30:32 +0300 Subject: [PATCH 09/13] make to native converters by default --- examples/basic_example_v2/basic_example.py | 6 ++-- ydb/query/base.py | 32 ++++++++++++++++++++-- ydb/query/session.py | 6 +++- ydb/query/transaction.py | 5 ++++ 4 files changed, 42 insertions(+), 7 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 0f810698..42802b61 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -96,11 +96,10 @@ def select_simple(pool, path): result_sets = pool.execute_with_retries( """ PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); SELECT series_id, title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(release_date AS Int16))) AS Uint32))) AS release_date + release_date FROM series WHERE series_id = 1; """.format( @@ -143,10 +142,9 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): def callee(session): query = """ PRAGMA TablePathPrefix("{}"); - $format = DateTime::Format("%Y-%m-%d"); SELECT title, - $format(DateTime::FromSeconds(CAST(DateTime::ToSeconds(DateTime::IntervalFromDays(CAST(air_date AS Int16))) AS Uint32))) AS air_date + air_date FROM episodes WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; """.format( diff --git a/ydb/query/base.py b/ydb/query/base.py index 9c6462ce..49c8f42d 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -53,7 +53,32 @@ def __exit__(self, exc_type, exc_val, exc_tb): class QueryClientSettings: - pass + def __init__(self): + self._native_datetime_in_result_sets = True + self._native_date_in_result_sets = True + self._native_json_in_result_sets = True + self._native_interval_in_result_sets = True + self._native_timestamp_in_result_sets = True + + def with_native_timestamp_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_timestamp_in_result_sets = enabled + return self + + def with_native_interval_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_interval_in_result_sets = enabled + return self + + def with_native_json_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_json_in_result_sets = enabled + return self + + def with_native_date_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_date_in_result_sets = enabled + return self + + def with_native_datetime_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_datetime_in_result_sets = enabled + return self class IQuerySessionState(abc.ABC): @@ -284,6 +309,7 @@ def execute( exec_mode: Optional[QueryExecMode] = None, parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, + settings: Optional[QueryClientSettings] = None, ) -> Iterator: """WARNING: This API is experimental and could be changed. @@ -300,6 +326,7 @@ def execute( 4) QueryExecMode.PARSE. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param settings: An additional request settings QueryClientSettings; :return: Iterator with result sets """ @@ -367,13 +394,14 @@ def wrap_execute_query_response( response_pb: _apis.ydb_query.ExecuteQueryResponsePart, tx: Optional[IQueryTxContext] = None, commit_tx: Optional[bool] = False, + settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: issues._process_response(response_pb) if tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) if tx and commit_tx: tx._move_to_commited() - return convert.ResultSet.from_message(response_pb.result_set) + return convert.ResultSet.from_message(response_pb.result_set, settings) def bad_session_handler(func): diff --git a/ydb/query/session.py b/ydb/query/session.py index 87351b75..1cc08ec6 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -313,5 +313,9 @@ def execute( return base.SyncResponseContextIterator( stream_it, - lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp), + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + settings=self._settings + ), ) diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index ba0d4a4d..cecc612c 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -372,6 +372,7 @@ def execute( syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, + settings: Optional[base.QueryClientSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -388,6 +389,7 @@ def execute( 4) QueryExecMode.PARSE. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param settings: An additional request settings QueryClientSettings; :return: Iterator with result sets """ @@ -402,6 +404,8 @@ def execute( parameters=parameters, concurrent_result_sets=concurrent_result_sets, ) + + settings = settings if settings is not None else self.session._settings self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -409,6 +413,7 @@ def execute( response_pb=resp, tx=self, commit_tx=commit_tx, + settings=settings ), ) return self._prev_stream From 34d91cd7f3b277f855590335aa9da96aadb5af15 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 13:37:38 +0300 Subject: [PATCH 10/13] style fixes --- ydb/query/session.py | 2 +- ydb/query/transaction.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/query/session.py b/ydb/query/session.py index 1cc08ec6..1fa3025d 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -316,6 +316,6 @@ def execute( lambda resp: base.wrap_execute_query_response( rpc_state=None, response_pb=resp, - settings=self._settings + settings=self._settings, ), ) diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index cecc612c..f42571c2 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -413,7 +413,7 @@ def execute( response_pb=resp, tx=self, commit_tx=commit_tx, - settings=settings + settings=settings, ), ) return self._prev_stream From 3ddf96f44046a04f45a8a126957a33d503c7897d Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 15:19:13 +0300 Subject: [PATCH 11/13] simplify examples --- examples/basic_example_v2/basic_example.py | 85 +++++++++------------- ydb/query/base.py | 2 +- ydb/query/pool.py | 9 ++- 3 files changed, 42 insertions(+), 54 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 42802b61..53b2b428 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -69,29 +69,22 @@ """ -def fill_tables_with_data(pool, path): +def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str): print("\nFilling tables with data...") - global FillDataQuery + query = FillDataQuery.format(path) - def callee(session): - - query = FillDataQuery.format(path) - with session.transaction(ydb.QuerySerializableReadWrite()).execute( - query, - { - "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), - "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), - "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), - }, - commit_tx=True, - ) as _: - pass - - return pool.retry_operation_sync(callee) + pool.execute_with_retries( + query, + { + "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), + "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), + "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), + }, + ) -def select_simple(pool, path): +def select_simple(pool: ydb.QuerySessionSync, path): print("\nCheck series table...") result_sets = pool.execute_with_retries( """ @@ -120,27 +113,22 @@ def select_simple(pool, path): return first_set -def upsert_simple(pool, path): +def upsert_simple(pool: ydb.QuerySessionPool, path): print("\nPerforming UPSERT into episodes...") - def callee(session): - with session.transaction().execute( - """ - PRAGMA TablePathPrefix("{}"); - UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); - """.format( - path - ), - commit_tx=True, - ) as _: - pass - - return pool.retry_operation_sync(callee) + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """.format( + path + ) + ) def select_with_parameters(pool, path, series_id, season_id, episode_id): - def callee(session): - query = """ + result_sets = pool.execute_with_retries( + """ PRAGMA TablePathPrefix("{}"); SELECT title, @@ -149,25 +137,20 @@ def callee(session): WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; """.format( path - ) - - with session.transaction(ydb.QuerySerializableReadWrite()).execute( - query, - { - "$seriesId": series_id, # could be defined implicit - "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple - "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class - }, - commit_tx=True, - ) as result_sets: - print("\n> select_prepared_transaction:") - first_set = next(result_sets) - for row in first_set.rows: - print("episode title:", row.title, ", air date:", row.air_date) + ), + { + "$seriesId": series_id, # could be defined implicit + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class + } + ) - return first_set + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) - return pool.retry_operation_sync(callee) + return first_set # Show usage of explicit Begin/Commit transaction control calls. diff --git a/ydb/query/base.py b/ydb/query/base.py index 49c8f42d..eef51ee6 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -166,9 +166,9 @@ def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxCon def execute( self, query: str, + parameters: Optional[dict] = None, syntax: Optional[QuerySyntax] = None, exec_mode: Optional[QueryExecMode] = None, - parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, ) -> Iterator: """WARNING: This API is experimental and could be changed. diff --git a/ydb/query/pool.py b/ydb/query/pool.py index 51efcb25..bc214ecb 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -55,7 +55,12 @@ def wrapped_callee(): return retry_operation_sync(wrapped_callee, retry_settings) def execute_with_retries( - self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. Special interface to execute a one-shot queries in a safe, retriable way. @@ -72,7 +77,7 @@ def execute_with_retries( def wrapped_callee(): with self.checkout() as session: - it = session.execute(query, *args, **kwargs) + it = session.execute(query, parameters, *args, **kwargs) return [result_set for result_set in it] return retry_operation_sync(wrapped_callee, retry_settings) From da4cad85de189555bc62bfdb0085399cc931a0f3 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 15:24:06 +0300 Subject: [PATCH 12/13] style fixes --- examples/basic_example_v2/basic_example.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py index 53b2b428..3deed483 100644 --- a/examples/basic_example_v2/basic_example.py +++ b/examples/basic_example_v2/basic_example.py @@ -84,7 +84,7 @@ def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str): ) -def select_simple(pool: ydb.QuerySessionSync, path): +def select_simple(pool: ydb.QuerySessionPool, path: str): print("\nCheck series table...") result_sets = pool.execute_with_retries( """ @@ -113,7 +113,7 @@ def select_simple(pool: ydb.QuerySessionSync, path): return first_set -def upsert_simple(pool: ydb.QuerySessionPool, path): +def upsert_simple(pool: ydb.QuerySessionPool, path: str): print("\nPerforming UPSERT into episodes...") pool.execute_with_retries( @@ -126,7 +126,7 @@ def upsert_simple(pool: ydb.QuerySessionPool, path): ) -def select_with_parameters(pool, path, series_id, season_id, episode_id): +def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id): result_sets = pool.execute_with_retries( """ PRAGMA TablePathPrefix("{}"); @@ -142,7 +142,7 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): "$seriesId": series_id, # could be defined implicit "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class - } + }, ) print("\n> select_with_parameters:") @@ -157,8 +157,8 @@ def select_with_parameters(pool, path, series_id, season_id, episode_id): # In most cases it's better to use transaction control settings in session.transaction # calls instead to avoid additional hops to YDB cluster and allow more efficient # execution of queries. -def explicit_transaction_control(pool, path, series_id, season_id, episode_id): - def callee(session): +def explicit_transaction_control(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id): + def callee(session: ydb.QuerySessionSync): query = """ PRAGMA TablePathPrefix("{}"); UPDATE episodes @@ -191,12 +191,12 @@ def callee(session): return pool.retry_operation_sync(callee) -def drop_tables(pool, path): +def drop_tables(pool: ydb.QuerySessionPool, path: str): print("\nCleaning up existing tables...") pool.execute_with_retries(DropTablesQuery.format(path)) -def create_tables(pool, path): +def create_tables(pool: ydb.QuerySessionPool, path: str): print("\nCreating table series...") pool.execute_with_retries( """ @@ -248,7 +248,7 @@ def create_tables(pool, path): ) -def is_directory_exists(driver, path): +def is_directory_exists(driver: ydb.Driver, path: str): try: return driver.scheme_client.describe_path(path).is_directory() except ydb.SchemeError: From 61c40fdd0ffa04767c34e8856faabd959a263c02 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 14 Aug 2024 16:11:14 +0300 Subject: [PATCH 13/13] add notes about outdated docs --- examples/basic_example_v1/README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/basic_example_v1/README.md b/examples/basic_example_v1/README.md index 3985a3e4..ac3c3978 100644 --- a/examples/basic_example_v1/README.md +++ b/examples/basic_example_v1/README.md @@ -1,5 +1,7 @@ # YDB Python SDK Example: basic_example_v1 +**This example is outdated, please see [example with new API](../basic_example_v2/)** + Example code demonstrating the basic YDB Python SDK operations. See the top-level [README.md](../README.md) file for instructions on running this example.