Skip to content

Add missing ability to configure QueryClientSettings #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions tests/query/test_query_client_settings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import pytest

import ydb

from datetime import date, datetime, timedelta, timezone


@pytest.fixture
def settings_on():
settings = (
ydb.QueryClientSettings()
.with_native_date_in_result_sets(True)
.with_native_datetime_in_result_sets(True)
.with_native_timestamp_in_result_sets(True)
.with_native_interval_in_result_sets(True)
.with_native_json_in_result_sets(True)
)
return settings


@pytest.fixture
def settings_off():
settings = (
ydb.QueryClientSettings()
.with_native_date_in_result_sets(False)
.with_native_datetime_in_result_sets(False)
.with_native_timestamp_in_result_sets(False)
.with_native_interval_in_result_sets(False)
.with_native_json_in_result_sets(False)
)
return settings


test_td = timedelta(microseconds=-100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need 100 microseconds?
can you use one second or one millisecond?

test_now = datetime.utcnow()
test_today = test_now.date()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about use fixed date/time instead of now?

test_dt_today = datetime.today()
tz4h = timezone(timedelta(hours=4))


params = pytest.mark.parametrize(
"value,ydb_type,casted_result,uncasted_type",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

casted_result and uncasted_type are difficult to underdstand.

what about: raw result and native_result?
and always compare values

[
(test_today, "Date", test_today, int),
(365, "Date", date(1971, 1, 1), int),
(3600 * 24 * 365, "Datetime", datetime(1971, 1, 1, 0, 0), int),
(datetime(1970, 1, 1, 4, 0, tzinfo=tz4h), "Timestamp", datetime(1970, 1, 1, 0, 0), int),
(test_td, "Interval", test_td, int),
(test_now, "Timestamp", test_now, int),
(
1511789040123456,
"Timestamp",
datetime.fromisoformat("2017-11-27 13:24:00.123456"),
int,
),
('{"foo": "bar"}', "Json", {"foo": "bar"}, str),
('{"foo": "bar"}', "JsonDocument", {"foo": "bar"}, str),
],
)


class TestQueryClientSettings:
@params
def test_driver_turn_on(self, driver_sync, settings_on, value, ydb_type, casted_result, uncasted_type):
driver_sync._driver_config.query_client_settings = settings_on
pool = ydb.QuerySessionPool(driver_sync)
result = pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert result[0].rows[0].value == casted_result
pool.stop()

@params
def test_driver_turn_off(self, driver_sync, settings_off, value, ydb_type, casted_result, uncasted_type):
driver_sync._driver_config.query_client_settings = settings_off
pool = ydb.QuerySessionPool(driver_sync)
result = pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert type(result[0].rows[0].value) == uncasted_type
pool.stop()

@params
def test_session_pool_turn_on(self, driver_sync, settings_on, value, ydb_type, casted_result, uncasted_type):
pool = ydb.QuerySessionPool(driver_sync, query_client_settings=settings_on)
result = pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert result[0].rows[0].value == casted_result
pool.stop()

@params
def test_session_pool_turn_off(self, driver_sync, settings_off, value, ydb_type, casted_result, uncasted_type):
pool = ydb.QuerySessionPool(driver_sync, query_client_settings=settings_off)
result = pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert type(result[0].rows[0].value) == uncasted_type
pool.stop()

@pytest.mark.asyncio
@params
async def test_driver_async_turn_on(self, driver, settings_on, value, ydb_type, casted_result, uncasted_type):
driver._driver_config.query_client_settings = settings_on
pool = ydb.aio.QuerySessionPool(driver)
result = await pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert result[0].rows[0].value == casted_result
await pool.stop()

@pytest.mark.asyncio
@params
async def test_driver_async_turn_off(self, driver, settings_off, value, ydb_type, casted_result, uncasted_type):
driver._driver_config.query_client_settings = settings_off
pool = ydb.aio.QuerySessionPool(driver)
result = await pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert type(result[0].rows[0].value) == uncasted_type
await pool.stop()

@pytest.mark.asyncio
@params
async def test_session_pool_async_turn_on(self, driver, settings_on, value, ydb_type, casted_result, uncasted_type):
pool = ydb.aio.QuerySessionPool(driver, query_client_settings=settings_on)
result = await pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert result[0].rows[0].value == casted_result
await pool.stop()

@pytest.mark.asyncio
@params
async def test_session_pool_async_turn_off(
self, driver, settings_off, value, ydb_type, casted_result, uncasted_type
):
pool = ydb.aio.QuerySessionPool(driver, query_client_settings=settings_off)
result = await pool.execute_with_retries(
f"DECLARE $param as {ydb_type}; SELECT $param as value",
{"$param": (value, getattr(ydb.PrimitiveType, ydb_type))},
)
assert type(result[0].rows[0].value) == uncasted_type
await pool.stop()
12 changes: 10 additions & 2 deletions ydb/aio/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
RetrySettings,
retry_operation_async,
)
from ...query.base import QueryClientSettings
from ... import convert
from ..._grpc.grpcwrapper import common_utils

Expand All @@ -22,10 +23,16 @@
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""

def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
def __init__(
self,
driver: common_utils.SupportedDriverType,
size: int = 100,
query_client_settings: Optional[QueryClientSettings] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about explicit named paremeter for the settings?

):
"""
:param driver: A driver instance
:param size: Size of session pool
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
"""

self._driver = driver
Expand All @@ -35,9 +42,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
self._current_size = 0
self._waiters = 0
self._loop = asyncio.get_running_loop()
self._query_client_settings = query_client_settings

async def _create_new_session(self):
session = QuerySession(self._driver)
session = QuerySession(self._driver, settings=self._query_client_settings)
await session.create()
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
return session
Expand Down
3 changes: 3 additions & 0 deletions ydb/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class DriverConfig(object):
"secure_channel",
"table_client_settings",
"topic_client_settings",
"query_client_settings",
"endpoints",
"primary_user_agent",
"tracer",
Expand All @@ -112,6 +113,7 @@ def __init__(
grpc_keep_alive_timeout=None,
table_client_settings=None,
topic_client_settings=None,
query_client_settings=None,
endpoints=None,
primary_user_agent="python-library",
tracer=None,
Expand Down Expand Up @@ -159,6 +161,7 @@ def __init__(
self.grpc_keep_alive_timeout = grpc_keep_alive_timeout
self.table_client_settings = table_client_settings
self.topic_client_settings = topic_client_settings
self.query_client_settings = query_client_settings
self.primary_user_agent = primary_user_agent
self.tracer = tracer if tracer is not None else tracing.Tracer(None)
self.grpc_lb_policy_name = grpc_lb_policy_name
Expand Down
12 changes: 10 additions & 2 deletions ydb/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import threading
import queue

from .base import QueryClientSettings
from .session import (
QuerySession,
)
Expand All @@ -27,10 +28,16 @@
class QuerySessionPool:
"""QuerySessionPool is an object to simplify operations with sessions of Query Service."""

def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
def __init__(
self,
driver: common_utils.SupportedDriverType,
size: int = 100,
query_client_settings: Optional[QueryClientSettings] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and here (named parameter)

):
"""
:param driver: A driver instance.
:param size: Max size of Session Pool.
:param query_client_settings: ydb.QueryClientSettings object to configure QueryService behavior
"""

self._driver = driver
Expand All @@ -39,9 +46,10 @@ def __init__(self, driver: common_utils.SupportedDriverType, size: int = 100):
self._size = size
self._should_stop = threading.Event()
self._lock = threading.RLock()
self._query_client_settings = query_client_settings

def _create_new_session(self, timeout: Optional[float]):
session = QuerySession(self._driver)
session = QuerySession(self._driver, settings=self._query_client_settings)
session.create(settings=BaseRequestSettings().with_timeout(timeout))
logger.debug(f"New session was created for pool. Session id: {session._state.session_id}")
return session
Expand Down
13 changes: 12 additions & 1 deletion ydb/query/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,20 @@ class BaseQuerySession:

def __init__(self, driver: common_utils.SupportedDriverType, settings: Optional[base.QueryClientSettings] = None):
self._driver = driver
self._settings = settings if settings is not None else base.QueryClientSettings()
self._settings = self._get_client_settings(driver, settings)
self._state = QuerySessionState(settings)

def _get_client_settings(
self,
driver: common_utils.SupportedDriverType,
settings: Optional[base.QueryClientSettings] = None,
) -> base.QueryClientSettings:
if settings is not None:
return settings
if driver._driver_config.query_client_settings is not None:
return driver._driver_config.query_client_settings
return base.QueryClientSettings()

def _create_call(self, settings: Optional[BaseRequestSettings] = None) -> "BaseQuerySession":
return self._driver(
_apis.ydb_query.CreateSessionRequest(),
Expand Down
Loading