From 4890ce51a6dfb1f9414aa1f5c56043f36e6ff446 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 13:33:17 +0300 Subject: [PATCH 01/14] Hide session management for table client methods --- ydb/aio/table.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 3c25f7d2..df9b5eb9 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -13,6 +13,7 @@ _scan_query_request_factory, _wrap_scan_query_response, BaseTxContext, + TableDescription, ) from . import _utilities from ydb import _apis, _session_impl @@ -139,6 +140,11 @@ async def rename_tables(self, rename_items, settings=None): # pylint: disable=W class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: SessionPool = SessionPool(self._driver, 10) + def session(self): return Session(self._driver, self._table_client_settings) @@ -158,6 +164,105 @@ async def scan_query(self, query, parameters=None, settings=None): # pylint: di lambda resp: _wrap_scan_query_response(resp, self._table_client_settings), ) + async def create_table( + self, + path: str, + table_description: TableDescription, + settings: typing.Optional[settings_impl.BaseRequestSettings] = None, + ): + """ + Create a YDB table. + + :param path: A table path + :param table_description: A description of table to create. An instance TableDescription + :param settings: An instance of BaseRequestSettings that describes how rpc should invoked. + + :return: A description of created scheme entry or error otherwise. + """ + async def callee(session: Session): + return await session.create_table(path=path, table_description=table_description, settings=settings) + + return await self._pool.retry_operation(callee) + + async def drop_table( + self, + path: str, + settings: typing.Optional[settings_impl.BaseRequestSettings] = None, + ): + async def callee(session: Session): + return await session.drop_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def alter_table( + self, + path, + add_columns=None, + drop_columns=None, + settings=None, + alter_attributes=None, + add_indexes=None, + drop_indexes=None, + set_ttl_settings=None, + drop_ttl_settings=None, + add_column_families=None, + alter_column_families=None, + alter_storage_settings=None, + set_compaction_policy=None, + alter_partitioning_settings=None, + set_key_bloom_filter=None, + set_read_replicas_settings=None, + ): + async def callee(session: Session): + return await session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return await self._pool.retry_operation(callee) + + async def describe_table(self, path, settings=None): + async def callee(session: Session): + return await session.describe_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def copy_table(self, source_path, destination_path, settings=None): + async def callee(session: Session): + return await session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return await self._pool.retry_operation(callee) + + async def copy_tables(self, source_destination_pairs, settings=None): + async def callee(session: Session): + return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return await self._pool.retry_operation(callee) + + async def rename_tables(self, rename_items, settings=None): + async def callee(session: Session): + return await session.rename_tables(rename_items=rename_items, settings=settings) + + return await self._pool.retry_operation(callee) + class TxContext(BaseTxContext): async def __aenter__(self): From e7104d376651cc1f15b0c105ac2437bff793e1c6 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 14:59:40 +0300 Subject: [PATCH 02/14] sync methods --- ydb/table.py | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/ydb/table.py b/ydb/table.py index 01f5e52b..9bf91556 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -4,6 +4,7 @@ from abc import abstractmethod import logging import enum +import typing from . import ( issues, @@ -1193,6 +1194,11 @@ def bulk_upsert(self, table_path, rows, column_types, settings=None): class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: SessionPool = SessionPool(self._driver, 10) + def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator request = _scan_query_request_factory(query, parameters, settings) @@ -1219,6 +1225,105 @@ def async_bulk_upsert(self, table_path, rows, column_types, settings=None): (), ) + def create_table( + self, + path: str, + table_description: TableDescription, + settings: typing.Optional[settings_impl.BaseRequestSettings] = None, + ): + """ + Create a YDB table. + + :param path: A table path + :param table_description: A description of table to create. An instance TableDescription + :param settings: An instance of BaseRequestSettings that describes how rpc should invoked. + + :return: A description of created scheme entry or error otherwise. + """ + def callee(session: Session): + return session.create_table(path=path, table_description=table_description, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def drop_table( + self, + path: str, + settings: typing.Optional[settings_impl.BaseRequestSettings] = None, + ): + def callee(session: Session): + return session.drop_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def alter_table( + self, + path, + add_columns=None, + drop_columns=None, + settings=None, + alter_attributes=None, + add_indexes=None, + drop_indexes=None, + set_ttl_settings=None, + drop_ttl_settings=None, + add_column_families=None, + alter_column_families=None, + alter_storage_settings=None, + set_compaction_policy=None, + alter_partitioning_settings=None, + set_key_bloom_filter=None, + set_read_replicas_settings=None, + ): + def callee(session: Session): + return session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return self._pool.retry_operation_sync(callee) + + def describe_table(self, path, settings=None): + def callee(session: Session): + return session.describe_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def copy_table(self, source_path, destination_path, settings=None): + def callee(session: Session): + return session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return self._pool.retry_operation_sync(callee) + + def copy_tables(self, source_destination_pairs, settings=None): + def callee(session: Session): + return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def rename_tables(self, rename_items, settings=None): + def callee(session: Session): + return session.rename_tables(rename_items=rename_items, settings=settings) + + return self._pool.retry_operation_sync(callee) + def _make_index_description(index): result = TableIndex(index.name).with_index_columns(*tuple(col for col in index.index_columns)) From 2480312a124c4ac2668a31e1fe7438fad13faf68 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 17:32:37 +0300 Subject: [PATCH 03/14] add tests --- tests/aio/test_table_client.py | 88 ++++++++++++++++++++++++++++++++ tests/table/test_table_client.py | 85 ++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 tests/aio/test_table_client.py create mode 100644 tests/table/test_table_client.py diff --git a/tests/aio/test_table_client.py b/tests/aio/test_table_client.py new file mode 100644 index 00000000..b4f3a5ea --- /dev/null +++ b/tests/aio/test_table_client.py @@ -0,0 +1,88 @@ +import pytest +import ydb + + +class TestTableClient: + @pytest.mark.asyncio + async def test_create_table(self, driver: ydb.aio.Driver): + client = driver.table_client + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + await client.describe_table(table_name) + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + actual_description = await client.describe_table(table_name) + + assert actual_description.columns == description.columns + + @pytest.mark.asyncio + async def test_alter_table(self, driver: ydb.aio.Driver): + client = driver.table_client + + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + await client.alter_table(table_name, add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ]) + + description = await client.describe_table(table_name) + assert len(description.columns) == 4 + + @pytest.mark.asyncio + async def test_copy_table(self, driver: ydb.aio.Driver): + client = driver.table_client + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + await client.copy_table(table_name, table_name + "_copy") + + copied_description = await client.describe_table(table_name + "_copy") + + assert description.columns == copied_description.columns diff --git a/tests/table/test_table_client.py b/tests/table/test_table_client.py new file mode 100644 index 00000000..615c1828 --- /dev/null +++ b/tests/table/test_table_client.py @@ -0,0 +1,85 @@ +import pytest +import ydb + + +class TestTableClient: + def test_create_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + client.describe_table(table_name) + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + actual_description = client.describe_table(table_name) + + assert actual_description.columns == description.columns + + def test_alter_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + client.alter_table(table_name, add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ]) + + description = client.describe_table(table_name) + assert len(description.columns) == 4 + + def test_copy_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + client.copy_table(table_name, table_name + "_copy") + + copied_description = client.describe_table(table_name + "_copy") + + assert description.columns == copied_description.columns From f5e568eff5dfbf5b79168f60aa1f9d85808ef73d Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 17:38:16 +0300 Subject: [PATCH 04/14] close pool on driver close --- ydb/aio/driver.py | 4 ++++ ydb/driver.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/ydb/aio/driver.py b/ydb/aio/driver.py index 0f4f3630..ad03b207 100644 --- a/ydb/aio/driver.py +++ b/ydb/aio/driver.py @@ -59,3 +59,7 @@ def __init__( self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, config.table_client_settings) self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings) + + async def stop(self, timeout=10): + await self.table_client._pool.stop(timeout) + await super().stop(timeout=timeout) diff --git a/ydb/driver.py b/ydb/driver.py index 1559b0d0..8c8b9ecd 100644 --- a/ydb/driver.py +++ b/ydb/driver.py @@ -282,3 +282,7 @@ def __init__( self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, driver_config.table_client_settings) self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings) + + def stop(self): + self.table_client._pool.stop() + super().stop() From e538b8d8ef7dae22b7c22fc2f1e416fbbdaffb79 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 17:58:37 +0300 Subject: [PATCH 05/14] add tableclient destructors --- ydb/aio/table.py | 4 ++++ ydb/table.py | 3 +++ 2 files changed, 7 insertions(+) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index df9b5eb9..2dc5a8bf 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -145,6 +145,10 @@ def __init__(self, driver, table_client_settings=None): super().__init__(driver=driver, table_client_settings=table_client_settings) self._pool: SessionPool = SessionPool(self._driver, 10) + def __del__(self): + if not self._pool._terminating: + asyncio.get_running_loop.call_soon(self._pool.stop) + def session(self): return Session(self._driver, self._table_client_settings) diff --git a/ydb/table.py b/ydb/table.py index 9bf91556..1d998e95 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1199,6 +1199,9 @@ def __init__(self, driver, table_client_settings=None): super().__init__(driver=driver, table_client_settings=table_client_settings) self._pool: SessionPool = SessionPool(self._driver, 10) + def __del__(self): + self._pool.close() + def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator request = _scan_query_request_factory(query, parameters, settings) From b09f33f825ca7864884d7ca424f321d5fda53ac6 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 18:03:05 +0300 Subject: [PATCH 06/14] style fixes --- tests/aio/test_table_client.py | 9 ++++++--- tests/table/test_table_client.py | 9 ++++++--- ydb/aio/table.py | 1 + ydb/table.py | 1 + 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/aio/test_table_client.py b/tests/aio/test_table_client.py index b4f3a5ea..6eb93dbb 100644 --- a/tests/aio/test_table_client.py +++ b/tests/aio/test_table_client.py @@ -53,9 +53,12 @@ async def test_alter_table(self, driver: ydb.aio.Driver): await client.create_table(table_name, description) - await client.alter_table(table_name, add_columns=[ - ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), - ]) + await client.alter_table( + table_name, + add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ] + ) description = await client.describe_table(table_name) assert len(description.columns) == 4 diff --git a/tests/table/test_table_client.py b/tests/table/test_table_client.py index 615c1828..7d7c82c8 100644 --- a/tests/table/test_table_client.py +++ b/tests/table/test_table_client.py @@ -51,9 +51,12 @@ def test_alter_table(self, driver_sync: ydb.Driver): client.create_table(table_name, description) - client.alter_table(table_name, add_columns=[ - ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), - ]) + client.alter_table( + table_name, + add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ] + ) description = client.describe_table(table_name) assert len(description.columns) == 4 diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 2dc5a8bf..046bc01e 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -183,6 +183,7 @@ async def create_table( :return: A description of created scheme entry or error otherwise. """ + async def callee(session: Session): return await session.create_table(path=path, table_description=table_description, settings=settings) diff --git a/ydb/table.py b/ydb/table.py index 1d998e95..224cbf9a 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1243,6 +1243,7 @@ def create_table( :return: A description of created scheme entry or error otherwise. """ + def callee(session: Session): return session.create_table(path=path, table_description=table_description, settings=settings) From b57e9474e9a046dc6248364aee83d40225f6e2fa Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 18:06:37 +0300 Subject: [PATCH 07/14] style fixes --- tests/aio/test_table_client.py | 2 +- tests/table/test_table_client.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/aio/test_table_client.py b/tests/aio/test_table_client.py index 6eb93dbb..89c0ba75 100644 --- a/tests/aio/test_table_client.py +++ b/tests/aio/test_table_client.py @@ -57,7 +57,7 @@ async def test_alter_table(self, driver: ydb.aio.Driver): table_name, add_columns=[ ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), - ] + ], ) description = await client.describe_table(table_name) diff --git a/tests/table/test_table_client.py b/tests/table/test_table_client.py index 7d7c82c8..f4e121c8 100644 --- a/tests/table/test_table_client.py +++ b/tests/table/test_table_client.py @@ -55,7 +55,7 @@ def test_alter_table(self, driver_sync: ydb.Driver): table_name, add_columns=[ ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), - ] + ], ) description = client.describe_table(table_name) From 1a06f3e2aa44c63e18df33d1461837d28a5d59f0 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 18:48:37 +0300 Subject: [PATCH 08/14] add type hints & docstrings to TableClient --- ydb/aio/table.py | 149 +++++++++++++++++++++++++++++++++++++--------- ydb/table.py | 151 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 243 insertions(+), 57 deletions(-) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 046bc01e..96ecda94 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -3,6 +3,14 @@ import time import typing +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) + import ydb from ydb import issues, settings as settings_impl, table @@ -172,16 +180,16 @@ async def create_table( self, path: str, table_description: TableDescription, - settings: typing.Optional[settings_impl.BaseRequestSettings] = None, - ): + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: """ Create a YDB table. :param path: A table path - :param table_description: A description of table to create. An instance TableDescription - :param settings: An instance of BaseRequestSettings that describes how rpc should invoked. + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. - :return: A description of created scheme entry or error otherwise. + :return: Operation or YDB error otherwise. """ async def callee(session: Session): @@ -192,8 +200,17 @@ async def callee(session: Session): async def drop_table( self, path: str, - settings: typing.Optional[settings_impl.BaseRequestSettings] = None, - ): + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + async def callee(session: Session): return await session.drop_table(path=path, settings=settings) @@ -201,23 +218,45 @@ async def callee(session: Session): async def alter_table( self, - path, - add_columns=None, - drop_columns=None, - settings=None, - alter_attributes=None, - add_indexes=None, - drop_indexes=None, - set_ttl_settings=None, - drop_ttl_settings=None, - add_column_families=None, - alter_column_families=None, - alter_storage_settings=None, - set_compaction_policy=None, - alter_partitioning_settings=None, - set_key_bloom_filter=None, - set_read_replicas_settings=None, - ): + path: str, + add_columns: Optional[List[ydb.Column]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional[settings_impl.BaseRequestSettings] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List[ydb.TableIndex]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional[ydb.TtlSettings] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List[ydb.ColumnFamily]] = None, + alter_column_families: Optional[List[ydb.ColumnFamily]] = None, + alter_storage_settings: Optional[ydb.StorageSettings] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, + set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, + set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, + ) -> ydb.Operation: + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + async def callee(session: Session): return await session.alter_table( path=path, @@ -240,13 +279,41 @@ async def callee(session: Session): return await self._pool.retry_operation(callee) - async def describe_table(self, path, settings=None): + async def describe_table( + self, + path: str, + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.TableSchemeEntry: + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + async def callee(session: Session): return await session.describe_table(path=path, settings=settings) return await self._pool.retry_operation(callee) - async def copy_table(self, source_path, destination_path, settings=None): + async def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + async def callee(session: Session): return await session.copy_table( source_path=source_path, @@ -256,13 +323,39 @@ async def callee(session: Session): return await self._pool.retry_operation(callee) - async def copy_tables(self, source_destination_pairs, settings=None): + async def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional[settings_impl.BaseRequestSettings] = None + ) -> ydb.Operation: + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + async def callee(session: Session): return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) return await self._pool.retry_operation(callee) - async def rename_tables(self, rename_items, settings=None): + async def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings=None + ) -> ydb.Operation: + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + async def callee(session: Session): return await session.rename_tables(rename_items=rename_items, settings=settings) diff --git a/ydb/table.py b/ydb/table.py index 224cbf9a..87d5476b 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -6,6 +6,14 @@ import enum import typing +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) + from . import ( issues, convert, @@ -1200,7 +1208,7 @@ def __init__(self, driver, table_client_settings=None): self._pool: SessionPool = SessionPool(self._driver, 10) def __del__(self): - self._pool.close() + self._pool.stop() def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator @@ -1232,16 +1240,16 @@ def create_table( self, path: str, table_description: TableDescription, - settings: typing.Optional[settings_impl.BaseRequestSettings] = None, - ): + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: """ Create a YDB table. :param path: A table path - :param table_description: A description of table to create. An instance TableDescription - :param settings: An instance of BaseRequestSettings that describes how rpc should invoked. + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. - :return: A description of created scheme entry or error otherwise. + :return: Operation or YDB error otherwise. """ def callee(session: Session): @@ -1252,8 +1260,17 @@ def callee(session: Session): def drop_table( self, path: str, - settings: typing.Optional[settings_impl.BaseRequestSettings] = None, - ): + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + def callee(session: Session): return session.drop_table(path=path, settings=settings) @@ -1261,23 +1278,45 @@ def callee(session: Session): def alter_table( self, - path, - add_columns=None, - drop_columns=None, - settings=None, - alter_attributes=None, - add_indexes=None, - drop_indexes=None, - set_ttl_settings=None, - drop_ttl_settings=None, - add_column_families=None, - alter_column_families=None, - alter_storage_settings=None, - set_compaction_policy=None, - alter_partitioning_settings=None, - set_key_bloom_filter=None, - set_read_replicas_settings=None, - ): + path: str, + add_columns: Optional[List[ydb.Column]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional[settings_impl.BaseRequestSettings] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List[ydb.TableIndex]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional[ydb.TtlSettings] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List[ydb.ColumnFamily]] = None, + alter_column_families: Optional[List[ydb.ColumnFamily]] = None, + alter_storage_settings: Optional[ydb.StorageSettings] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, + set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, + set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, + ) -> ydb.Operation: + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + def callee(session: Session): return session.alter_table( path=path, @@ -1300,13 +1339,41 @@ def callee(session: Session): return self._pool.retry_operation_sync(callee) - def describe_table(self, path, settings=None): + def describe_table( + self, + path: str, + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.TableSchemeEntry: + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + def callee(session: Session): return session.describe_table(path=path, settings=settings) return self._pool.retry_operation_sync(callee) - def copy_table(self, source_path, destination_path, settings=None): + def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional[settings_impl.BaseRequestSettings] = None, + ) -> ydb.Operation: + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + def callee(session: Session): return session.copy_table( source_path=source_path, @@ -1316,13 +1383,39 @@ def callee(session: Session): return self._pool.retry_operation_sync(callee) - def copy_tables(self, source_destination_pairs, settings=None): + def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional[settings_impl.BaseRequestSettings] = None + ) -> ydb.Operation: + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + def callee(session: Session): return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) return self._pool.retry_operation_sync(callee) - def rename_tables(self, rename_items, settings=None): + def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings=None + ) -> ydb.Operation: + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + def callee(session: Session): return session.rename_tables(rename_items=rename_items, settings=settings) From 771fc5cee0828cb7ffa9a21fac8701fdc2cfd46c Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 18:51:05 +0300 Subject: [PATCH 09/14] style fixes --- ydb/aio/table.py | 12 ++++++------ ydb/table.py | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 96ecda94..f84a06df 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -181,7 +181,7 @@ async def create_table( path: str, table_description: TableDescription, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Create a YDB table. @@ -201,7 +201,7 @@ async def drop_table( self, path: str, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Drop a YDB table. @@ -234,7 +234,7 @@ async def alter_table( alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Alter a YDB table. @@ -303,7 +303,7 @@ async def copy_table( source_path: str, destination_path: str, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Copy a YDB table. @@ -327,7 +327,7 @@ async def copy_tables( self, source_destination_pairs: List[Tuple[str, str]], settings: Optional[settings_impl.BaseRequestSettings] = None - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Copy a YDB tables. @@ -346,7 +346,7 @@ async def rename_tables( self, rename_items: List[Tuple[str, str]], settings=None - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Rename a YDB tables. diff --git a/ydb/table.py b/ydb/table.py index 87d5476b..f1d19ccd 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1241,7 +1241,7 @@ def create_table( path: str, table_description: TableDescription, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Create a YDB table. @@ -1261,7 +1261,7 @@ def drop_table( self, path: str, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Drop a YDB table. @@ -1294,7 +1294,7 @@ def alter_table( alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Alter a YDB table. @@ -1363,7 +1363,7 @@ def copy_table( source_path: str, destination_path: str, settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Copy a YDB table. @@ -1387,7 +1387,7 @@ def copy_tables( self, source_destination_pairs: List[Tuple[str, str]], settings: Optional[settings_impl.BaseRequestSettings] = None - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Copy a YDB tables. @@ -1406,7 +1406,7 @@ def rename_tables( self, rename_items: List[Tuple[str, str]], settings=None - ) -> ydb.Operation: + ) -> "ydb.Operation": """ Rename a YDB tables. From 98f7167740654b81ac9f4ae6e6f0e832fae90cb2 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 18:57:58 +0300 Subject: [PATCH 10/14] fix cyclic import errors --- ydb/aio/table.py | 36 ++++++++++++++++++------------------ ydb/table.py | 36 ++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index f84a06df..eccd1fa6 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -179,8 +179,8 @@ async def scan_query(self, query, parameters=None, settings=None): # pylint: di async def create_table( self, path: str, - table_description: TableDescription, - settings: Optional[settings_impl.BaseRequestSettings] = None, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Create a YDB table. @@ -200,7 +200,7 @@ async def callee(session: Session): async def drop_table( self, path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Drop a YDB table. @@ -219,21 +219,21 @@ async def callee(session: Session): async def alter_table( self, path: str, - add_columns: Optional[List[ydb.Column]] = None, + add_columns: Optional[List["ydb.Column"]] = None, drop_columns: Optional[List[str]] = None, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, alter_attributes: Optional[Optional[Dict[str, str]]] = None, - add_indexes: Optional[List[ydb.TableIndex]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, drop_indexes: Optional[List[str]] = None, - set_ttl_settings: Optional[ydb.TtlSettings] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, drop_ttl_settings: Optional[Any] = None, - add_column_families: Optional[List[ydb.ColumnFamily]] = None, - alter_column_families: Optional[List[ydb.ColumnFamily]] = None, - alter_storage_settings: Optional[ydb.StorageSettings] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, set_compaction_policy: Optional[str] = None, - alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, - set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, - set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, ) -> "ydb.Operation": """ Alter a YDB table. @@ -282,8 +282,8 @@ async def callee(session: Session): async def describe_table( self, path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.TableSchemeEntry: + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": """ Describe a YDB table. @@ -302,7 +302,7 @@ async def copy_table( self, source_path: str, destination_path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Copy a YDB table. @@ -326,7 +326,7 @@ async def callee(session: Session): async def copy_tables( self, source_destination_pairs: List[Tuple[str, str]], - settings: Optional[settings_impl.BaseRequestSettings] = None + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Copy a YDB tables. @@ -345,7 +345,7 @@ async def callee(session: Session): async def rename_tables( self, rename_items: List[Tuple[str, str]], - settings=None + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Rename a YDB tables. diff --git a/ydb/table.py b/ydb/table.py index f1d19ccd..e88ae9b4 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1239,8 +1239,8 @@ def async_bulk_upsert(self, table_path, rows, column_types, settings=None): def create_table( self, path: str, - table_description: TableDescription, - settings: Optional[settings_impl.BaseRequestSettings] = None, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Create a YDB table. @@ -1260,7 +1260,7 @@ def callee(session: Session): def drop_table( self, path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Drop a YDB table. @@ -1279,21 +1279,21 @@ def callee(session: Session): def alter_table( self, path: str, - add_columns: Optional[List[ydb.Column]] = None, + add_columns: Optional[List["ydb.Column"]] = None, drop_columns: Optional[List[str]] = None, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, alter_attributes: Optional[Optional[Dict[str, str]]] = None, - add_indexes: Optional[List[ydb.TableIndex]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, drop_indexes: Optional[List[str]] = None, - set_ttl_settings: Optional[ydb.TtlSettings] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, drop_ttl_settings: Optional[Any] = None, - add_column_families: Optional[List[ydb.ColumnFamily]] = None, - alter_column_families: Optional[List[ydb.ColumnFamily]] = None, - alter_storage_settings: Optional[ydb.StorageSettings] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, set_compaction_policy: Optional[str] = None, - alter_partitioning_settings: Optional[ydb.PartitioningSettings] = None, - set_key_bloom_filter: Optional[ydb.FeatureFlag] = None, - set_read_replicas_settings: Optional[ydb.ReadReplicasSettings] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, ) -> "ydb.Operation": """ Alter a YDB table. @@ -1342,8 +1342,8 @@ def callee(session: Session): def describe_table( self, path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, - ) -> ydb.TableSchemeEntry: + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": """ Describe a YDB table. @@ -1362,7 +1362,7 @@ def copy_table( self, source_path: str, destination_path: str, - settings: Optional[settings_impl.BaseRequestSettings] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Copy a YDB table. @@ -1386,7 +1386,7 @@ def callee(session: Session): def copy_tables( self, source_destination_pairs: List[Tuple[str, str]], - settings: Optional[settings_impl.BaseRequestSettings] = None + settings: Optional["settings_impl.BaseRequestSettings"] = None ) -> "ydb.Operation": """ Copy a YDB tables. @@ -1405,7 +1405,7 @@ def callee(session: Session): def rename_tables( self, rename_items: List[Tuple[str, str]], - settings=None + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Rename a YDB tables. From b0f10ca6fba572b0fc01a93313f30586eb8f3b96 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 19:00:07 +0300 Subject: [PATCH 11/14] style fixes --- ydb/aio/table.py | 2 +- ydb/table.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index eccd1fa6..d1cdbd5d 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -345,7 +345,7 @@ async def callee(session: Session): async def rename_tables( self, rename_items: List[Tuple[str, str]], - settings: Optional["settings_impl.BaseRequestSettings"] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Rename a YDB tables. diff --git a/ydb/table.py b/ydb/table.py index e88ae9b4..94aee3a8 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1386,7 +1386,7 @@ def callee(session: Session): def copy_tables( self, source_destination_pairs: List[Tuple[str, str]], - settings: Optional["settings_impl.BaseRequestSettings"] = None + settings: Optional["settings_impl.BaseRequestSettings"] = None, ) -> "ydb.Operation": """ Copy a YDB tables. From b3669322c8a246e61a7a9b4e5487184e7eb1a8ac Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 15 Oct 2024 19:21:01 +0300 Subject: [PATCH 12/14] fix driver arg --- ydb/aio/driver.py | 2 +- ydb/driver.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/aio/driver.py b/ydb/aio/driver.py index ad03b207..e7b970c0 100644 --- a/ydb/aio/driver.py +++ b/ydb/aio/driver.py @@ -61,5 +61,5 @@ def __init__( self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings) async def stop(self, timeout=10): - await self.table_client._pool.stop(timeout) + await self.table_client._pool.stop(timeout=timeout) await super().stop(timeout=timeout) diff --git a/ydb/driver.py b/ydb/driver.py index 8c8b9ecd..271e8b9e 100644 --- a/ydb/driver.py +++ b/ydb/driver.py @@ -283,6 +283,6 @@ def __init__( self.table_client = table.TableClient(self, driver_config.table_client_settings) self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings) - def stop(self): - self.table_client._pool.stop() - super().stop() + def stop(self, timeout=10): + self.table_client._pool.stop(timeout=timeout) + super().stop(timeout=timeout) From a45ec48c69dc7a033d62f31452de6414d7366c78 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 16 Oct 2024 10:17:52 +0300 Subject: [PATCH 13/14] make pool init lazy --- ydb/aio/driver.py | 2 +- ydb/aio/table.py | 26 ++++++++++++++++++++++++-- ydb/driver.py | 2 +- ydb/table.py | 26 ++++++++++++++++++++++++-- 4 files changed, 50 insertions(+), 6 deletions(-) diff --git a/ydb/aio/driver.py b/ydb/aio/driver.py index e7b970c0..9cd6fd2b 100644 --- a/ydb/aio/driver.py +++ b/ydb/aio/driver.py @@ -61,5 +61,5 @@ def __init__( self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings) async def stop(self, timeout=10): - await self.table_client._pool.stop(timeout=timeout) + await self.table_client._stop_pool_if_needed(timeout=timeout) await super().stop(timeout=timeout) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index d1cdbd5d..62e4b099 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -151,11 +151,11 @@ class TableClient(BaseTableClient): def __init__(self, driver, table_client_settings=None): # type:(ydb.Driver, ydb.TableClientSettings) -> None super().__init__(driver=driver, table_client_settings=table_client_settings) - self._pool: SessionPool = SessionPool(self._driver, 10) + self._pool: Optional[SessionPool] = None def __del__(self): if not self._pool._terminating: - asyncio.get_running_loop.call_soon(self._pool.stop) + asyncio.get_running_loop.call_soon(self._stop_pool_if_needed) def session(self): return Session(self._driver, self._table_client_settings) @@ -176,6 +176,14 @@ async def scan_query(self, query, parameters=None, settings=None): # pylint: di lambda resp: _wrap_scan_query_response(resp, self._table_client_settings), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + async def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None: + await self._pool.stop(timeout=timeout) + async def create_table( self, path: str, @@ -192,6 +200,8 @@ async def create_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.create_table(path=path, table_description=table_description, settings=settings) @@ -211,6 +221,8 @@ async def drop_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.drop_table(path=path, settings=settings) @@ -257,6 +269,8 @@ async def alter_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.alter_table( path=path, @@ -293,6 +307,8 @@ async def describe_table( :return: TableSchemeEntry or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.describe_table(path=path, settings=settings) @@ -314,6 +330,8 @@ async def copy_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.copy_table( source_path=source_path, @@ -337,6 +355,8 @@ async def copy_tables( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) @@ -356,6 +376,8 @@ async def rename_tables( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + async def callee(session: Session): return await session.rename_tables(rename_items=rename_items, settings=settings) diff --git a/ydb/driver.py b/ydb/driver.py index 271e8b9e..6b35cd56 100644 --- a/ydb/driver.py +++ b/ydb/driver.py @@ -284,5 +284,5 @@ def __init__( self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings) def stop(self, timeout=10): - self.table_client._pool.stop(timeout=timeout) + self.table_client._stop_pool_if_needed(timeout=timeout) super().stop(timeout=timeout) diff --git a/ydb/table.py b/ydb/table.py index 94aee3a8..945e9187 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -1205,10 +1205,10 @@ class TableClient(BaseTableClient): def __init__(self, driver, table_client_settings=None): # type:(ydb.Driver, ydb.TableClientSettings) -> None super().__init__(driver=driver, table_client_settings=table_client_settings) - self._pool: SessionPool = SessionPool(self._driver, 10) + self._pool: Optional[SessionPool] = None def __del__(self): - self._pool.stop() + self._stop_pool_if_needed() def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator @@ -1236,6 +1236,14 @@ def async_bulk_upsert(self, table_path, rows, column_types, settings=None): (), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None: + self._pool.stop(timeout=timeout) + def create_table( self, path: str, @@ -1252,6 +1260,8 @@ def create_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.create_table(path=path, table_description=table_description, settings=settings) @@ -1271,6 +1281,8 @@ def drop_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.drop_table(path=path, settings=settings) @@ -1317,6 +1329,8 @@ def alter_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.alter_table( path=path, @@ -1353,6 +1367,8 @@ def describe_table( :return: TableSchemeEntry or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.describe_table(path=path, settings=settings) @@ -1374,6 +1390,8 @@ def copy_table( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.copy_table( source_path=source_path, @@ -1397,6 +1415,8 @@ def copy_tables( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) @@ -1416,6 +1436,8 @@ def rename_tables( :return: Operation or YDB error otherwise. """ + self._init_pool_if_needed() + def callee(session: Session): return session.rename_tables(rename_items=rename_items, settings=settings) From 7e51e81528ee888111eeee222749bac10ad05ae6 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Wed, 16 Oct 2024 10:20:32 +0300 Subject: [PATCH 14/14] update tableclient destructor --- ydb/aio/table.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 62e4b099..274716ed 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -154,8 +154,7 @@ def __init__(self, driver, table_client_settings=None): self._pool: Optional[SessionPool] = None def __del__(self): - if not self._pool._terminating: - asyncio.get_running_loop.call_soon(self._stop_pool_if_needed) + asyncio.get_running_loop.call_soon(self._stop_pool_if_needed) def session(self): return Session(self._driver, self._table_client_settings) @@ -181,7 +180,7 @@ def _init_pool_if_needed(self): self._pool = SessionPool(self._driver, 10) async def _stop_pool_if_needed(self, timeout=10): - if self._pool is not None: + if self._pool is not None and not self._pool._terminating: await self._pool.stop(timeout=timeout) async def create_table(