diff --git a/tests/aio/test_table_client.py b/tests/aio/test_table_client.py new file mode 100644 index 00000000..89c0ba75 --- /dev/null +++ b/tests/aio/test_table_client.py @@ -0,0 +1,91 @@ +import pytest +import ydb + + +class TestTableClient: + @pytest.mark.asyncio + async def test_create_table(self, driver: ydb.aio.Driver): + client = driver.table_client + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + await client.describe_table(table_name) + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + actual_description = await client.describe_table(table_name) + + assert actual_description.columns == description.columns + + @pytest.mark.asyncio + async def test_alter_table(self, driver: ydb.aio.Driver): + client = driver.table_client + + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + await client.alter_table( + table_name, + add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ], + ) + + description = await client.describe_table(table_name) + assert len(description.columns) == 4 + + @pytest.mark.asyncio + async def test_copy_table(self, driver: ydb.aio.Driver): + client = driver.table_client + table_name = "/local/testtableclient" + try: + await client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + await client.create_table(table_name, description) + + await client.copy_table(table_name, table_name + "_copy") + + copied_description = await client.describe_table(table_name + "_copy") + + assert description.columns == copied_description.columns diff --git a/tests/table/test_table_client.py b/tests/table/test_table_client.py new file mode 100644 index 00000000..f4e121c8 --- /dev/null +++ b/tests/table/test_table_client.py @@ -0,0 +1,88 @@ +import pytest +import ydb + + +class TestTableClient: + def test_create_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + client.describe_table(table_name) + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + actual_description = client.describe_table(table_name) + + assert actual_description.columns == description.columns + + def test_alter_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + client.alter_table( + table_name, + add_columns=[ + ydb.Column("value2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ], + ) + + description = client.describe_table(table_name) + assert len(description.columns) == 4 + + def test_copy_table(self, driver_sync: ydb.Driver): + client = driver_sync.table_client + table_name = "/local/testtableclient" + try: + client.drop_table(table_name) + except ydb.SchemeError: + pass + + description = ( + ydb.TableDescription() + .with_primary_keys("key1", "key2") + .with_columns( + ydb.Column("key1", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("key2", ydb.OptionalType(ydb.PrimitiveType.Uint64)), + ydb.Column("value", ydb.OptionalType(ydb.PrimitiveType.Utf8)), + ) + ) + + client.create_table(table_name, description) + + client.copy_table(table_name, table_name + "_copy") + + copied_description = client.describe_table(table_name + "_copy") + + assert description.columns == copied_description.columns diff --git a/ydb/aio/driver.py b/ydb/aio/driver.py index 0f4f3630..9cd6fd2b 100644 --- a/ydb/aio/driver.py +++ b/ydb/aio/driver.py @@ -59,3 +59,7 @@ def __init__( self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, config.table_client_settings) self.topic_client = topic.TopicClientAsyncIO(self, config.topic_client_settings) + + async def stop(self, timeout=10): + await self.table_client._stop_pool_if_needed(timeout=timeout) + await super().stop(timeout=timeout) diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 3c25f7d2..274716ed 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -3,6 +3,14 @@ import time import typing +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) + import ydb from ydb import issues, settings as settings_impl, table @@ -13,6 +21,7 @@ _scan_query_request_factory, _wrap_scan_query_response, BaseTxContext, + TableDescription, ) from . import _utilities from ydb import _apis, _session_impl @@ -139,6 +148,14 @@ async def rename_tables(self, rename_items, settings=None): # pylint: disable=W class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: Optional[SessionPool] = None + + def __del__(self): + asyncio.get_running_loop.call_soon(self._stop_pool_if_needed) + def session(self): return Session(self._driver, self._table_client_settings) @@ -158,6 +175,213 @@ async def scan_query(self, query, parameters=None, settings=None): # pylint: di lambda resp: _wrap_scan_query_response(resp, self._table_client_settings), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + async def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None and not self._pool._terminating: + await self._pool.stop(timeout=timeout) + + async def create_table( + self, + path: str, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Create a YDB table. + + :param path: A table path + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.create_table(path=path, table_description=table_description, settings=settings) + + return await self._pool.retry_operation(callee) + + async def drop_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.drop_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def alter_table( + self, + path: str, + add_columns: Optional[List["ydb.Column"]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, + ) -> "ydb.Operation": + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return await self._pool.retry_operation(callee) + + async def describe_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.describe_table(path=path, settings=settings) + + return await self._pool.retry_operation(callee) + + async def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return await self._pool.retry_operation(callee) + + async def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return await self._pool.retry_operation(callee) + + async def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + async def callee(session: Session): + return await session.rename_tables(rename_items=rename_items, settings=settings) + + return await self._pool.retry_operation(callee) + class TxContext(BaseTxContext): async def __aenter__(self): diff --git a/ydb/driver.py b/ydb/driver.py index 1559b0d0..6b35cd56 100644 --- a/ydb/driver.py +++ b/ydb/driver.py @@ -282,3 +282,7 @@ def __init__( self.scheme_client = scheme.SchemeClient(self) self.table_client = table.TableClient(self, driver_config.table_client_settings) self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings) + + def stop(self, timeout=10): + self.table_client._stop_pool_if_needed(timeout=timeout) + super().stop(timeout=timeout) diff --git a/ydb/table.py b/ydb/table.py index 01f5e52b..945e9187 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -4,6 +4,15 @@ from abc import abstractmethod import logging import enum +import typing + +from typing import ( + Any, + Dict, + List, + Optional, + Tuple, +) from . import ( issues, @@ -1193,6 +1202,14 @@ def bulk_upsert(self, table_path, rows, column_types, settings=None): class TableClient(BaseTableClient): + def __init__(self, driver, table_client_settings=None): + # type:(ydb.Driver, ydb.TableClientSettings) -> None + super().__init__(driver=driver, table_client_settings=table_client_settings) + self._pool: Optional[SessionPool] = None + + def __del__(self): + self._stop_pool_if_needed() + def async_scan_query(self, query, parameters=None, settings=None): # type: (ydb.ScanQuery, tuple, ydb.BaseRequestSettings) -> ydb.AsyncResponseIterator request = _scan_query_request_factory(query, parameters, settings) @@ -1219,6 +1236,213 @@ def async_bulk_upsert(self, table_path, rows, column_types, settings=None): (), ) + def _init_pool_if_needed(self): + if self._pool is None: + self._pool = SessionPool(self._driver, 10) + + def _stop_pool_if_needed(self, timeout=10): + if self._pool is not None: + self._pool.stop(timeout=timeout) + + def create_table( + self, + path: str, + table_description: "TableDescription", + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Create a YDB table. + + :param path: A table path + :param table_description: TableDescription instanse. + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.create_table(path=path, table_description=table_description, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def drop_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Drop a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.drop_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def alter_table( + self, + path: str, + add_columns: Optional[List["ydb.Column"]] = None, + drop_columns: Optional[List[str]] = None, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + alter_attributes: Optional[Optional[Dict[str, str]]] = None, + add_indexes: Optional[List["ydb.TableIndex"]] = None, + drop_indexes: Optional[List[str]] = None, + set_ttl_settings: Optional["ydb.TtlSettings"] = None, + drop_ttl_settings: Optional[Any] = None, + add_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_column_families: Optional[List["ydb.ColumnFamily"]] = None, + alter_storage_settings: Optional["ydb.StorageSettings"] = None, + set_compaction_policy: Optional[str] = None, + alter_partitioning_settings: Optional["ydb.PartitioningSettings"] = None, + set_key_bloom_filter: Optional["ydb.FeatureFlag"] = None, + set_read_replicas_settings: Optional["ydb.ReadReplicasSettings"] = None, + ) -> "ydb.Operation": + """ + Alter a YDB table. + + :param path: A table path + :param add_columns: List of ydb.Column to add + :param drop_columns: List of column names to drop + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + :param alter_attributes: Dict of attributes to alter + :param add_indexes: List of ydb.TableIndex to add + :param drop_indexes: List of index names to drop + :param set_ttl_settings: ydb.TtlSettings to set + :param drop_ttl_settings: Any to drop + :param add_column_families: List of ydb.ColumnFamily to add + :param alter_column_families: List of ydb.ColumnFamily to alter + :param alter_storage_settings: ydb.StorageSettings to alter + :param set_compaction_policy: Compaction policy + :param alter_partitioning_settings: ydb.PartitioningSettings to alter + :param set_key_bloom_filter: ydb.FeatureFlag to set key bloom filter + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.alter_table( + path=path, + add_columns=add_columns, + drop_columns=drop_columns, + settings=settings, + alter_attributes=alter_attributes, + add_indexes=add_indexes, + drop_indexes=drop_indexes, + set_ttl_settings=set_ttl_settings, + drop_ttl_settings=drop_ttl_settings, + add_column_families=add_column_families, + alter_column_families=alter_column_families, + alter_storage_settings=alter_storage_settings, + set_compaction_policy=set_compaction_policy, + alter_partitioning_settings=alter_partitioning_settings, + set_key_bloom_filter=set_key_bloom_filter, + set_read_replicas_settings=set_read_replicas_settings, + ) + + return self._pool.retry_operation_sync(callee) + + def describe_table( + self, + path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.TableSchemeEntry": + """ + Describe a YDB table. + + :param path: A table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: TableSchemeEntry or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.describe_table(path=path, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def copy_table( + self, + source_path: str, + destination_path: str, + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB table. + + :param source_path: A table path + :param destination_path: Destination table path + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.copy_table( + source_path=source_path, + destination_path=destination_path, + settings=settings, + ) + + return self._pool.retry_operation_sync(callee) + + def copy_tables( + self, + source_destination_pairs: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Copy a YDB tables. + + :param source_destination_pairs: List of tuples (source_path, destination_path) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.copy_tables(source_destination_pairs=source_destination_pairs, settings=settings) + + return self._pool.retry_operation_sync(callee) + + def rename_tables( + self, + rename_items: List[Tuple[str, str]], + settings: Optional["settings_impl.BaseRequestSettings"] = None, + ) -> "ydb.Operation": + """ + Rename a YDB tables. + + :param rename_items: List of tuples (current_name, desired_name) + :param settings: An instance of BaseRequestSettings that describes how rpc should be invoked. + + :return: Operation or YDB error otherwise. + """ + + self._init_pool_if_needed() + + def callee(session: Session): + return session.rename_tables(rename_items=rename_items, settings=settings) + + return self._pool.retry_operation_sync(callee) + def _make_index_description(index): result = TableIndex(index.name).with_index_columns(*tuple(col for col in index.index_columns))