Skip to content

Commit 28f527c

Browse files
committed
query session pool async
1 parent a5870d9 commit 28f527c

File tree

4 files changed

+179
-0
lines changed

4 files changed

+179
-0
lines changed

tests/aio/query/conftest.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import pytest
22
from ydb.aio.query.session import QuerySessionAsync
3+
from ydb.aio.query.pool import QuerySessionPoolAsync
34

45

56
@pytest.fixture
@@ -25,3 +26,9 @@ async def tx(session):
2526
await transaction.rollback()
2627
except BaseException:
2728
pass
29+
30+
31+
@pytest.fixture
32+
def pool(driver):
33+
pool = QuerySessionPoolAsync(driver)
34+
yield pool
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import pytest
2+
import ydb
3+
from ydb.aio.query.pool import QuerySessionPoolAsync
4+
from ydb.aio.query.session import QuerySessionAsync, QuerySessionStateEnum
5+
6+
7+
class TestQuerySessionPoolAsync:
8+
@pytest.mark.asyncio
9+
async def test_checkout_provides_created_session(self, pool: QuerySessionPoolAsync):
10+
async with pool.checkout() as session:
11+
assert session._state._state == QuerySessionStateEnum.CREATED
12+
13+
assert session._state._state == QuerySessionStateEnum.CLOSED
14+
15+
@pytest.mark.asyncio
16+
async def test_oneshot_query_normal(self, pool: QuerySessionPoolAsync):
17+
res = await pool.execute_with_retries("select 1;")
18+
assert len(res) == 1
19+
20+
@pytest.mark.asyncio
21+
async def test_oneshot_ddl_query(self, pool: QuerySessionPoolAsync):
22+
await pool.execute_with_retries("create table Queen(key UInt64, PRIMARY KEY (key));")
23+
await pool.execute_with_retries("drop table Queen;")
24+
25+
@pytest.mark.asyncio
26+
async def test_oneshot_query_raises(self, pool: QuerySessionPoolAsync):
27+
with pytest.raises(ydb.GenericError):
28+
await pool.execute_with_retries("Is this the real life? Is this just fantasy?")
29+
30+
@pytest.mark.asyncio
31+
async def test_retry_op_uses_created_session(self, pool: QuerySessionPoolAsync):
32+
async def callee(session: QuerySessionAsync):
33+
assert session._state._state == QuerySessionStateEnum.CREATED
34+
35+
await pool.retry_operation_async(callee)
36+
37+
@pytest.mark.asyncio
38+
async def test_retry_op_normal(self, pool: QuerySessionPoolAsync):
39+
async def callee(session: QuerySessionAsync):
40+
async with session.transaction() as tx:
41+
iterator = await tx.execute("select 1;", commit_tx=True)
42+
return [result_set async for result_set in iterator]
43+
44+
res = await pool.retry_operation_async(callee)
45+
assert len(res) == 1
46+
47+
@pytest.mark.asyncio
48+
async def test_retry_op_raises(self, pool: QuerySessionPoolAsync):
49+
class CustomException(Exception):
50+
pass
51+
52+
async def callee(session: QuerySessionAsync):
53+
raise CustomException()
54+
55+
with pytest.raises(CustomException):
56+
await pool.retry_operation_async(callee)

ydb/aio/query/pool.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import logging
2+
from typing import (
3+
Callable,
4+
Optional,
5+
List,
6+
)
7+
8+
from ...query import base
9+
from .session import (
10+
QuerySessionAsync,
11+
)
12+
from ...retries import (
13+
RetrySettings,
14+
retry_operation_async,
15+
)
16+
from ... import convert
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class QuerySessionPoolAsync:
22+
"""QuerySessionPoolAsync is an object to simplify operations with sessions of Query Service."""
23+
24+
def __init__(self, driver: base.SupportedDriverType):
25+
"""
26+
:param driver: A driver instance
27+
"""
28+
29+
logger.warning("QuerySessionPoolAsync is an experimental API, which could be changed.")
30+
self._driver = driver
31+
32+
def checkout(self) -> "SimpleQuerySessionCheckoutAsync":
33+
"""WARNING: This API is experimental and could be changed.
34+
Return a Session context manager, that opens session on enter and closes session on exit.
35+
"""
36+
37+
return SimpleQuerySessionCheckoutAsync(self)
38+
39+
async def retry_operation_async(self, callee: Callable, retry_settings: Optional[RetrySettings] = None, *args, **kwargs):
40+
"""WARNING: This API is experimental and could be changed.
41+
Special interface to execute a bunch of commands with session in a safe, retriable way.
42+
43+
:param callee: A function, that works with session.
44+
:param retry_settings: RetrySettings object.
45+
46+
:return: Result sets or exception in case of execution errors.
47+
"""
48+
49+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
50+
51+
async def wrapped_callee():
52+
async with self.checkout() as session:
53+
return await callee(session, *args, **kwargs)
54+
55+
return await retry_operation_async(wrapped_callee, retry_settings)
56+
57+
async def execute_with_retries(
58+
self, query: str, retry_settings: Optional[RetrySettings] = None, *args, **kwargs
59+
) -> List[convert.ResultSet]:
60+
"""WARNING: This API is experimental and could be changed.
61+
Special interface to execute a one-shot queries in a safe, retriable way.
62+
Note: this method loads all data from stream before return, do not use this
63+
method with huge read queries.
64+
65+
:param query: A query, yql or sql text.
66+
:param retry_settings: RetrySettings object.
67+
68+
:return: Result sets or exception in case of execution errors.
69+
"""
70+
71+
retry_settings = RetrySettings() if retry_settings is None else retry_settings
72+
73+
async def wrapped_callee():
74+
async with self.checkout() as session:
75+
it = await session.execute(query, *args, **kwargs)
76+
return [result_set async for result_set in it]
77+
78+
return await retry_operation_async(wrapped_callee, retry_settings)
79+
80+
81+
class SimpleQuerySessionCheckoutAsync:
82+
def __init__(self, pool: QuerySessionPoolAsync):
83+
self._pool = pool
84+
self._session = QuerySessionAsync(pool._driver)
85+
86+
async def __aenter__(self) -> base.IQuerySession:
87+
await self._session.create()
88+
return self._session
89+
90+
async def __aexit__(self, exc_type, exc_val, exc_tb):
91+
await self._session.delete()

ydb/retries.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import random
23
import time
34

@@ -134,3 +135,27 @@ def retry_operation_sync(callee, retry_settings=None, *args, **kwargs):
134135
time.sleep(next_opt.timeout)
135136
else:
136137
return next_opt.result
138+
139+
140+
async def retry_operation_async(callee, retry_settings=None, *args, **kwargs): # pylint: disable=W1113
141+
"""
142+
The retry operation helper can be used to retry a coroutine that raises YDB specific
143+
exceptions.
144+
145+
:param callee: A coroutine to retry.
146+
:param retry_settings: An instance of ydb.RetrySettings that describes how the coroutine
147+
should be retried. If None, default instance of retry settings will be used.
148+
:param args: A tuple with positional arguments to be passed into the coroutine.
149+
:param kwargs: A dictionary with keyword arguments to be passed into the coroutine.
150+
151+
Returns awaitable result of coroutine. If retries are not succussful exception is raised.
152+
"""
153+
opt_generator = retry_operation_impl(callee, retry_settings, *args, **kwargs)
154+
for next_opt in opt_generator:
155+
if isinstance(next_opt, YdbRetryOperationSleepOpt):
156+
await asyncio.sleep(next_opt.timeout)
157+
else:
158+
try:
159+
return await next_opt.result
160+
except BaseException as e: # pylint: disable=W0703
161+
next_opt.set_exception(e)

0 commit comments

Comments
 (0)