diff --git a/examples/basic_example_v1/README.md b/examples/basic_example_v1/README.md index 3985a3e4..ac3c3978 100644 --- a/examples/basic_example_v1/README.md +++ b/examples/basic_example_v1/README.md @@ -1,5 +1,7 @@ # YDB Python SDK Example: basic_example_v1 +**This example is outdated, please see [example with new API](../basic_example_v2/)** + Example code demonstrating the basic YDB Python SDK operations. See the top-level [README.md](../README.md) file for instructions on running this example. diff --git a/examples/basic_example_v2/README.md b/examples/basic_example_v2/README.md new file mode 100644 index 00000000..92ebe21a --- /dev/null +++ b/examples/basic_example_v2/README.md @@ -0,0 +1,5 @@ +# YDB Python SDK Example: basic_example_v2 + +Example code demonstrating the basic YDB Python SDK operations. + +See the top-level [README.md](../README.md) file for instructions on running this example. diff --git a/examples/basic_example_v2/__main__.py b/examples/basic_example_v2/__main__.py new file mode 100644 index 00000000..2397a2c1 --- /dev/null +++ b/examples/basic_example_v2/__main__.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import argparse +import basic_example +import logging + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="""\033[92mYDB basic example.\x1b[0m\n""", + ) + parser.add_argument("-d", "--database", help="Name of the database to use", default="/local") + parser.add_argument("-e", "--endpoint", help="Endpoint url to use", default="grpc://localhost:2136") + parser.add_argument("-p", "--path", default="") + parser.add_argument("-v", "--verbose", default=False, action="store_true") + + args = parser.parse_args() + + if args.verbose: + logger = logging.getLogger("ydb.pool.Discovery") + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler()) + + basic_example.run( + args.endpoint, + args.database, + args.path, + ) diff --git a/examples/basic_example_v2/basic_example.py b/examples/basic_example_v2/basic_example.py new file mode 100644 index 00000000..3deed483 --- /dev/null +++ b/examples/basic_example_v2/basic_example.py @@ -0,0 +1,303 @@ +# -*- coding: utf-8 -*- +import posixpath +import ydb +import basic_example_data + +# Table path prefix allows to put the working tables into the specific directory +# inside the YDB database. Putting `PRAGMA TablePathPrefix("some/path")` +# at the beginning of the query allows to reference the tables through +# their names "under" the specified directory. +# +# TablePathPrefix has to be defined as an absolute path, which has to be started +# with the current database location. +# +# https://ydb.tech/ru/docs/yql/reference/syntax/pragma#table-path-prefix + +DropTablesQuery = """PRAGMA TablePathPrefix("{}"); +DROP TABLE IF EXISTS series; +DROP TABLE IF EXISTS seasons; +DROP TABLE IF EXISTS episodes; +""" + +FillDataQuery = """PRAGMA TablePathPrefix("{}"); + +DECLARE $seriesData AS List>; + +DECLARE $seasonsData AS List>; + +DECLARE $episodesData AS List>; + +REPLACE INTO series +SELECT + series_id, + title, + series_info, + release_date +FROM AS_TABLE($seriesData); + +REPLACE INTO seasons +SELECT + series_id, + season_id, + title, + first_aired, + last_aired +FROM AS_TABLE($seasonsData); + +REPLACE INTO episodes +SELECT + series_id, + season_id, + episode_id, + title, + air_date +FROM AS_TABLE($episodesData); +""" + + +def fill_tables_with_data(pool: ydb.QuerySessionPool, path: str): + print("\nFilling tables with data...") + + query = FillDataQuery.format(path) + + pool.execute_with_retries( + query, + { + "$seriesData": (basic_example_data.get_series_data(), basic_example_data.get_series_data_type()), + "$seasonsData": (basic_example_data.get_seasons_data(), basic_example_data.get_seasons_data_type()), + "$episodesData": (basic_example_data.get_episodes_data(), basic_example_data.get_episodes_data_type()), + }, + ) + + +def select_simple(pool: ydb.QuerySessionPool, path: str): + print("\nCheck series table...") + result_sets = pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + SELECT + series_id, + title, + release_date + FROM series + WHERE series_id = 1; + """.format( + path + ), + ) + first_set = result_sets[0] + for row in first_set.rows: + print( + "series, id: ", + row.series_id, + ", title: ", + row.title, + ", release date: ", + row.release_date, + ) + + return first_set + + +def upsert_simple(pool: ydb.QuerySessionPool, path: str): + print("\nPerforming UPSERT into episodes...") + + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + UPSERT INTO episodes (series_id, season_id, episode_id, title) VALUES (2, 6, 1, "TBD"); + """.format( + path + ) + ) + + +def select_with_parameters(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id): + result_sets = pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + SELECT + title, + air_date + FROM episodes + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ), + { + "$seriesId": series_id, # could be defined implicit + "$seasonId": (season_id, ydb.PrimitiveType.Int64), # could be defined via tuple + "$episodeId": ydb.TypedValue(episode_id, ydb.PrimitiveType.Int64), # could be defined via special class + }, + ) + + print("\n> select_with_parameters:") + first_set = result_sets[0] + for row in first_set.rows: + print("episode title:", row.title, ", air date:", row.air_date) + + return first_set + + +# Show usage of explicit Begin/Commit transaction control calls. +# In most cases it's better to use transaction control settings in session.transaction +# calls instead to avoid additional hops to YDB cluster and allow more efficient +# execution of queries. +def explicit_transaction_control(pool: ydb.QuerySessionPool, path: str, series_id, season_id, episode_id): + def callee(session: ydb.QuerySessionSync): + query = """ + PRAGMA TablePathPrefix("{}"); + UPDATE episodes + SET air_date = CurrentUtcDate() + WHERE series_id = $seriesId AND season_id = $seasonId AND episode_id = $episodeId; + """.format( + path + ) + + # Get newly created transaction id + tx = session.transaction(ydb.QuerySerializableReadWrite()).begin() + + # Execute data query. + # Transaction control settings continues active transaction (tx) + with tx.execute( + query, + { + "$seriesId": (series_id, ydb.PrimitiveType.Int64), + "$seasonId": (season_id, ydb.PrimitiveType.Int64), + "$episodeId": (episode_id, ydb.PrimitiveType.Int64), + }, + ) as _: + pass + + print("\n> explicit TCL call") + + # Commit active transaction(tx) + tx.commit() + + return pool.retry_operation_sync(callee) + + +def drop_tables(pool: ydb.QuerySessionPool, path: str): + print("\nCleaning up existing tables...") + pool.execute_with_retries(DropTablesQuery.format(path)) + + +def create_tables(pool: ydb.QuerySessionPool, path: str): + print("\nCreating table series...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `series` ( + `series_id` Int64, + `title` Utf8, + `series_info` Utf8, + `release_date` Date, + PRIMARY KEY (`series_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table seasons...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `seasons` ( + `series_id` Int64, + `season_id` Int64, + `title` Utf8, + `first_aired` Date, + `last_aired` Date, + PRIMARY KEY (`series_id`, `season_id`) + ) + """.format( + path + ) + ) + + print("\nCreating table episodes...") + pool.execute_with_retries( + """ + PRAGMA TablePathPrefix("{}"); + CREATE table `episodes` ( + `series_id` Int64, + `season_id` Int64, + `episode_id` Int64, + `title` Utf8, + `air_date` Date, + PRIMARY KEY (`series_id`, `season_id`, `episode_id`) + ) + """.format( + path + ) + ) + + +def is_directory_exists(driver: ydb.Driver, path: str): + try: + return driver.scheme_client.describe_path(path).is_directory() + except ydb.SchemeError: + return False + + +def ensure_path_exists(driver, database, path): + paths_to_create = list() + path = path.rstrip("/") + while path not in ("", database): + full_path = posixpath.join(database, path) + if is_directory_exists(driver, full_path): + break + paths_to_create.append(full_path) + path = posixpath.dirname(path).rstrip("/") + + while len(paths_to_create) > 0: + full_path = paths_to_create.pop(-1) + driver.scheme_client.make_directory(full_path) + + +def run(endpoint, database, path): + with ydb.Driver( + endpoint=endpoint, + database=database, + credentials=ydb.credentials_from_env_variables(), + ) as driver: + driver.wait(timeout=5, fail_fast=True) + + with ydb.QuerySessionPool(driver) as pool: + + ensure_path_exists(driver, database, path) + + # absolute path - prefix to the table's names, + # including the database location + full_path = posixpath.join(database, path) + + drop_tables(pool, full_path) + + create_tables(pool, full_path) + + fill_tables_with_data(pool, full_path) + + select_simple(pool, full_path) + + upsert_simple(pool, full_path) + + select_with_parameters(pool, full_path, 2, 3, 7) + select_with_parameters(pool, full_path, 2, 3, 8) + + explicit_transaction_control(pool, full_path, 2, 6, 1) + select_with_parameters(pool, full_path, 2, 6, 1) diff --git a/examples/basic_example_v2/basic_example_data.py b/examples/basic_example_v2/basic_example_data.py new file mode 100644 index 00000000..3c9616f4 --- /dev/null +++ b/examples/basic_example_v2/basic_example_data.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +import iso8601 +import ydb + + +def to_days(date): + timedelta = iso8601.parse_date(date) - iso8601.parse_date("1970-1-1") + return timedelta.days + + +class Series(object): + __slots__ = ("series_id", "title", "release_date", "series_info") + + def __init__(self, series_id, title, release_date, series_info): + self.series_id = series_id + self.title = title + self.release_date = to_days(release_date) + self.series_info = series_info + + +class Season(object): + __slots__ = ("series_id", "season_id", "title", "first_aired", "last_aired") + + def __init__(self, series_id, season_id, title, first_aired, last_aired): + self.series_id = series_id + self.season_id = season_id + self.title = title + self.first_aired = to_days(first_aired) + self.last_aired = to_days(last_aired) + + +class Episode(object): + __slots__ = ("series_id", "season_id", "episode_id", "title", "air_date") + + def __init__(self, series_id, season_id, episode_id, title, air_date): + self.series_id = series_id + self.season_id = season_id + self.episode_id = episode_id + self.title = title + self.air_date = to_days(air_date) + + +def get_series_data(): + return [ + Series( + 1, + "IT Crowd", + "2006-02-03", + "The IT Crowd is a British sitcom produced by Channel 4, written by Graham Linehan, produced by " + "Ash Atalla and starring Chris O'Dowd, Richard Ayoade, Katherine Parkinson, and Matt Berry.", + ), + Series( + 2, + "Silicon Valley", + "2014-04-06", + "Silicon Valley is an American comedy television series created by Mike Judge, John Altschuler and " + "Dave Krinsky. The series focuses on five young men who founded a startup company in Silicon Valley.", + ), + ] + + +def get_series_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("series_info", ydb.PrimitiveType.Utf8) + struct_type.add_member("release_date", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) + + +def get_seasons_data(): + return [ + Season(1, 1, "Season 1", "2006-02-03", "2006-03-03"), + Season(1, 2, "Season 2", "2007-08-24", "2007-09-28"), + Season(1, 3, "Season 3", "2008-11-21", "2008-12-26"), + Season(1, 4, "Season 4", "2010-06-25", "2010-07-30"), + Season(2, 1, "Season 1", "2014-04-06", "2014-06-01"), + Season(2, 2, "Season 2", "2015-04-12", "2015-06-14"), + Season(2, 3, "Season 3", "2016-04-24", "2016-06-26"), + Season(2, 4, "Season 4", "2017-04-23", "2017-06-25"), + Season(2, 5, "Season 5", "2018-03-25", "2018-05-13"), + ] + + +def get_seasons_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) + struct_type.add_member("season_id", ydb.PrimitiveType.Int64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("first_aired", ydb.PrimitiveType.Date) + struct_type.add_member("last_aired", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) + + +def get_episodes_data(): + return [ + Episode(1, 1, 1, "Yesterday's Jam", "2006-02-03"), + Episode(1, 1, 2, "Calamity Jen", "2006-02-03"), + Episode(1, 1, 3, "Fifty-Fifty", "2006-02-10"), + Episode(1, 1, 4, "The Red Door", "2006-02-17"), + Episode(1, 1, 5, "The Haunting of Bill Crouse", "2006-02-24"), + Episode(1, 1, 6, "Aunt Irma Visits", "2006-03-03"), + Episode(1, 2, 1, "The Work Outing", "2006-08-24"), + Episode(1, 2, 2, "Return of the Golden Child", "2007-08-31"), + Episode(1, 2, 3, "Moss and the German", "2007-09-07"), + Episode(1, 2, 4, "The Dinner Party", "2007-09-14"), + Episode(1, 2, 5, "Smoke and Mirrors", "2007-09-21"), + Episode(1, 2, 6, "Men Without Women", "2007-09-28"), + Episode(1, 3, 1, "From Hell", "2008-11-21"), + Episode(1, 3, 2, "Are We Not Men?", "2008-11-28"), + Episode(1, 3, 3, "Tramps Like Us", "2008-12-05"), + Episode(1, 3, 4, "The Speech", "2008-12-12"), + Episode(1, 3, 5, "Friendface", "2008-12-19"), + Episode(1, 3, 6, "Calendar Geeks", "2008-12-26"), + Episode(1, 4, 1, "Jen The Fredo", "2010-06-25"), + Episode(1, 4, 2, "The Final Countdown", "2010-07-02"), + Episode(1, 4, 3, "Something Happened", "2010-07-09"), + Episode(1, 4, 4, "Italian For Beginners", "2010-07-16"), + Episode(1, 4, 5, "Bad Boys", "2010-07-23"), + Episode(1, 4, 6, "Reynholm vs Reynholm", "2010-07-30"), + Episode(2, 1, 1, "Minimum Viable Product", "2014-04-06"), + Episode(2, 1, 2, "The Cap Table", "2014-04-13"), + Episode(2, 1, 3, "Articles of Incorporation", "2014-04-20"), + Episode(2, 1, 4, "Fiduciary Duties", "2014-04-27"), + Episode(2, 1, 5, "Signaling Risk", "2014-05-04"), + Episode(2, 1, 6, "Third Party Insourcing", "2014-05-11"), + Episode(2, 1, 7, "Proof of Concept", "2014-05-18"), + Episode(2, 1, 8, "Optimal Tip-to-Tip Efficiency", "2014-06-01"), + Episode(2, 2, 1, "Sand Hill Shuffle", "2015-04-12"), + Episode(2, 2, 2, "Runaway Devaluation", "2015-04-19"), + Episode(2, 2, 3, "Bad Money", "2015-04-26"), + Episode(2, 2, 4, "The Lady", "2015-05-03"), + Episode(2, 2, 5, "Server Space", "2015-05-10"), + Episode(2, 2, 6, "Homicide", "2015-05-17"), + Episode(2, 2, 7, "Adult Content", "2015-05-24"), + Episode(2, 2, 8, "White Hat/Black Hat", "2015-05-31"), + Episode(2, 2, 9, "Binding Arbitration", "2015-06-07"), + Episode(2, 2, 10, "Two Days of the Condor", "2015-06-14"), + Episode(2, 3, 1, "Founder Friendly", "2016-04-24"), + Episode(2, 3, 2, "Two in the Box", "2016-05-01"), + Episode(2, 3, 3, "Meinertzhagen's Haversack", "2016-05-08"), + Episode(2, 3, 4, "Maleant Data Systems Solutions", "2016-05-15"), + Episode(2, 3, 5, "The Empty Chair", "2016-05-22"), + Episode(2, 3, 6, "Bachmanity Insanity", "2016-05-29"), + Episode(2, 3, 7, "To Build a Better Beta", "2016-06-05"), + Episode(2, 3, 8, "Bachman's Earnings Over-Ride", "2016-06-12"), + Episode(2, 3, 9, "Daily Active Users", "2016-06-19"), + Episode(2, 3, 10, "The Uptick", "2016-06-26"), + Episode(2, 4, 1, "Success Failure", "2017-04-23"), + Episode(2, 4, 2, "Terms of Service", "2017-04-30"), + Episode(2, 4, 3, "Intellectual Property", "2017-05-07"), + Episode(2, 4, 4, "Teambuilding Exercise", "2017-05-14"), + Episode(2, 4, 5, "The Blood Boy", "2017-05-21"), + Episode(2, 4, 6, "Customer Service", "2017-05-28"), + Episode(2, 4, 7, "The Patent Troll", "2017-06-04"), + Episode(2, 4, 8, "The Keenan Vortex", "2017-06-11"), + Episode(2, 4, 9, "Hooli-Con", "2017-06-18"), + Episode(2, 4, 10, "Server Error", "2017-06-25"), + Episode(2, 5, 1, "Grow Fast or Die Slow", "2018-03-25"), + Episode(2, 5, 2, "Reorientation", "2018-04-01"), + Episode(2, 5, 3, "Chief Operating Officer", "2018-04-08"), + Episode(2, 5, 4, "Tech Evangelist", "2018-04-15"), + Episode(2, 5, 5, "Facial Recognition", "2018-04-22"), + Episode(2, 5, 6, "Artificial Emotional Intelligence", "2018-04-29"), + Episode(2, 5, 7, "Initial Coin Offering", "2018-05-06"), + Episode(2, 5, 8, "Fifty-One Percent", "2018-05-13"), + ] + + +def get_episodes_data_type(): + struct_type = ydb.StructType() + struct_type.add_member("series_id", ydb.PrimitiveType.Int64) + struct_type.add_member("season_id", ydb.PrimitiveType.Int64) + struct_type.add_member("episode_id", ydb.PrimitiveType.Int64) + struct_type.add_member("title", ydb.PrimitiveType.Utf8) + struct_type.add_member("air_date", ydb.PrimitiveType.Date) + return ydb.ListType(struct_type) diff --git a/ydb/query/base.py b/ydb/query/base.py index e08d9f52..eef51ee6 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -53,7 +53,32 @@ def __exit__(self, exc_type, exc_val, exc_tb): class QueryClientSettings: - pass + def __init__(self): + self._native_datetime_in_result_sets = True + self._native_date_in_result_sets = True + self._native_json_in_result_sets = True + self._native_interval_in_result_sets = True + self._native_timestamp_in_result_sets = True + + def with_native_timestamp_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_timestamp_in_result_sets = enabled + return self + + def with_native_interval_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_interval_in_result_sets = enabled + return self + + def with_native_json_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_json_in_result_sets = enabled + return self + + def with_native_date_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_date_in_result_sets = enabled + return self + + def with_native_datetime_in_result_sets(self, enabled: bool) -> "QueryClientSettings": + self._native_datetime_in_result_sets = enabled + return self class IQuerySessionState(abc.ABC): @@ -141,9 +166,9 @@ def transaction(self, tx_mode: Optional[BaseQueryTxMode] = None) -> "IQueryTxCon def execute( self, query: str, + parameters: Optional[dict] = None, syntax: Optional[QuerySyntax] = None, exec_mode: Optional[QueryExecMode] = None, - parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, ) -> Iterator: """WARNING: This API is experimental and could be changed. @@ -238,14 +263,14 @@ def tx_id(self) -> Optional[str]: pass @abc.abstractmethod - def begin(self, settings: Optional[QueryClientSettings] = None) -> None: + def begin(self, settings: Optional[QueryClientSettings] = None) -> "IQueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction :param settings: A request settings - :return: None or exception if begin is failed + :return: Transaction object or exception if begin is failed """ pass @@ -284,6 +309,7 @@ def execute( exec_mode: Optional[QueryExecMode] = None, parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, + settings: Optional[QueryClientSettings] = None, ) -> Iterator: """WARNING: This API is experimental and could be changed. @@ -300,6 +326,7 @@ def execute( 4) QueryExecMode.PARSE. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param settings: An additional request settings QueryClientSettings; :return: Iterator with result sets """ @@ -367,13 +394,14 @@ def wrap_execute_query_response( response_pb: _apis.ydb_query.ExecuteQueryResponsePart, tx: Optional[IQueryTxContext] = None, commit_tx: Optional[bool] = False, + settings: Optional[QueryClientSettings] = None, ) -> convert.ResultSet: issues._process_response(response_pb) if tx and response_pb.tx_meta and not tx.tx_id: tx._move_to_beginned(response_pb.tx_meta.id) if tx and commit_tx: tx._move_to_commited() - return convert.ResultSet.from_message(response_pb.result_set) + return convert.ResultSet.from_message(response_pb.result_set, settings) def bad_session_handler(func): diff --git a/ydb/query/pool.py b/ydb/query/pool.py index e7514cdf..bc214ecb 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -55,7 +55,12 @@ def wrapped_callee(): return retry_operation_sync(wrapped_callee, retry_settings) def execute_with_retries( - self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs + self, + query: str, + parameters: Optional[dict] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, ) -> List[convert.ResultSet]: """WARNING: This API is experimental and could be changed. Special interface to execute a one-shot queries in a safe, retriable way. @@ -72,11 +77,20 @@ def execute_with_retries( def wrapped_callee(): with self.checkout() as session: - it = session.execute(query, *args, **kwargs) + it = session.execute(query, parameters, *args, **kwargs) return [result_set for result_set in it] return retry_operation_sync(wrapped_callee, retry_settings) + def stop(self, timeout=None): + pass # TODO: implement + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + class SimpleQuerySessionCheckout: def __init__(self, pool: QuerySessionPool): diff --git a/ydb/query/session.py b/ydb/query/session.py index d6034d34..1fa3025d 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -283,9 +283,9 @@ def transaction(self, tx_mode: Optional[base.BaseQueryTxMode] = None) -> base.IQ def execute( self, query: str, + parameters: dict = None, syntax: base.QuerySyntax = None, exec_mode: base.QueryExecMode = None, - parameters: dict = None, concurrent_result_sets: bool = False, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -313,5 +313,9 @@ def execute( return base.SyncResponseContextIterator( stream_it, - lambda resp: base.wrap_execute_query_response(rpc_state=None, response_pb=resp), + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + settings=self._settings, + ), ) diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 0a493202..f42571c2 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -313,19 +313,21 @@ def _move_to_commited(self) -> None: return self._tx_state._change_state(QueryTxStateEnum.COMMITTED) - def begin(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "BaseQueryTxContext": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction :param settings: A request settings - :return: None or exception if begin is failed + :return: Transaction object or exception if begin is failed """ self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) self._begin_call(settings) + return self + def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: """WARNING: This API is experimental and could be changed. @@ -365,11 +367,12 @@ def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: def execute( self, query: str, + parameters: Optional[dict] = None, commit_tx: Optional[bool] = False, syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, - parameters: Optional[dict] = None, concurrent_result_sets: Optional[bool] = False, + settings: Optional[base.QueryClientSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -386,6 +389,7 @@ def execute( 4) QueryExecMode.PARSE. :param parameters: dict with parameters and YDB types; :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; + :param settings: An additional request settings QueryClientSettings; :return: Iterator with result sets """ @@ -400,6 +404,8 @@ def execute( parameters=parameters, concurrent_result_sets=concurrent_result_sets, ) + + settings = settings if settings is not None else self.session._settings self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -407,6 +413,7 @@ def execute( response_pb=resp, tx=self, commit_tx=commit_tx, + settings=settings, ), ) return self._prev_stream