-
Notifications
You must be signed in to change notification settings - Fork 58
Query Service AIO #467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Query Service AIO #467
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
7d4af03
aio query session
vgvoleg 09cdcec
async transactions
vgvoleg 00b138e
style fixes
vgvoleg 73c9ad0
fix async transactions
vgvoleg 4208d64
style fixes
vgvoleg d0675cf
query session pool async
vgvoleg bf79ab0
style fixes
vgvoleg 3b66b2a
query service asyncio example
vgvoleg 52a6bfd
temp
vgvoleg fe65ddd
refactor useless interfaces
vgvoleg af4d5d9
style fixes
vgvoleg 145054f
review fixes
vgvoleg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
import asyncio | ||
import ydb | ||
|
||
|
||
async def main(): | ||
driver_config = ydb.DriverConfig( | ||
endpoint="grpc://localhost:2136", | ||
database="/local", | ||
# credentials=ydb.credentials_from_env_variables(), | ||
# root_certificates=ydb.load_ydb_root_certificate(), | ||
) | ||
try: | ||
driver = ydb.aio.Driver(driver_config) | ||
await driver.wait(timeout=5) | ||
except TimeoutError: | ||
raise RuntimeError("Connect failed to YDB") | ||
|
||
pool = ydb.aio.QuerySessionPoolAsync(driver) | ||
|
||
print("=" * 50) | ||
print("DELETE TABLE IF EXISTS") | ||
await pool.execute_with_retries("DROP TABLE IF EXISTS example") | ||
|
||
print("=" * 50) | ||
print("CREATE TABLE") | ||
await pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))") | ||
|
||
await pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')") | ||
|
||
async def callee(session): | ||
print("=" * 50) | ||
async with await session.execute("DELETE FROM example"): | ||
pass | ||
|
||
print("BEFORE ACTION") | ||
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
print("=" * 50) | ||
print("INSERT WITH COMMIT TX") | ||
|
||
async with session.transaction() as tx: | ||
await tx.begin() | ||
|
||
async with await tx.execute("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')"): | ||
pass | ||
|
||
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
await tx.commit() | ||
|
||
print("=" * 50) | ||
print("AFTER COMMIT TX") | ||
|
||
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
print("=" * 50) | ||
print("INSERT WITH ROLLBACK TX") | ||
|
||
async with session.transaction() as tx: | ||
await tx.begin() | ||
|
||
async with await tx.execute("INSERT INTO example (key, value) VALUES (2, 'onepieceisreal')"): | ||
pass | ||
|
||
async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
await tx.rollback() | ||
|
||
print("=" * 50) | ||
print("AFTER ROLLBACK TX") | ||
|
||
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
await pool.retry_operation_async(callee) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. execute_with_retries? |
||
|
||
async def callee(session: ydb.aio.QuerySessionAsync): | ||
query_print = """select $a""" | ||
|
||
print("=" * 50) | ||
print("Check implicit typed parameters") | ||
|
||
values = [ | ||
1, | ||
1.0, | ||
True, | ||
"text", | ||
{"4": 8, "15": 16, "23": 42}, | ||
[{"name": "Michael"}, {"surname": "Scott"}], | ||
] | ||
|
||
for value in values: | ||
print(f"value: {value}") | ||
async with await session.transaction().execute( | ||
query=query_print, | ||
parameters={"$a": value}, | ||
commit_tx=True, | ||
) as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
print("=" * 50) | ||
print("Check typed parameters as tuple pair") | ||
|
||
typed_value = ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64)) | ||
print(f"value: {typed_value}") | ||
|
||
async with await session.transaction().execute( | ||
query=query_print, | ||
parameters={"$a": typed_value}, | ||
commit_tx=True, | ||
) as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
print("=" * 50) | ||
print("Check typed parameters as ydb.TypedValue") | ||
|
||
typed_value = ydb.TypedValue(111, ydb.PrimitiveType.Int64) | ||
print(f"value: {typed_value}") | ||
|
||
async with await session.transaction().execute( | ||
query=query_print, | ||
parameters={"$a": typed_value}, | ||
commit_tx=True, | ||
) as results: | ||
async for result_set in results: | ||
print(f"rows: {str(result_set.rows)}") | ||
|
||
await pool.retry_operation_async(callee) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to use |
||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import pytest | ||
from ydb.aio.query.session import QuerySessionAsync | ||
from ydb.aio.query.pool import QuerySessionPoolAsync | ||
|
||
|
||
@pytest.fixture | ||
async def session(driver): | ||
session = QuerySessionAsync(driver) | ||
|
||
yield session | ||
|
||
try: | ||
await session.delete() | ||
except BaseException: | ||
pass | ||
|
||
|
||
@pytest.fixture | ||
async def tx(session): | ||
await session.create() | ||
transaction = session.transaction() | ||
|
||
yield transaction | ||
|
||
try: | ||
await transaction.rollback() | ||
except BaseException: | ||
pass | ||
|
||
|
||
@pytest.fixture | ||
def pool(driver): | ||
pool = QuerySessionPoolAsync(driver) | ||
yield pool |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
import pytest | ||
from ydb.aio.query.session import QuerySessionAsync | ||
|
||
|
||
def _check_session_state_empty(session: QuerySessionAsync): | ||
assert session._state.session_id is None | ||
assert session._state.node_id is None | ||
assert not session._state.attached | ||
|
||
|
||
def _check_session_state_full(session: QuerySessionAsync): | ||
assert session._state.session_id is not None | ||
assert session._state.node_id is not None | ||
assert session._state.attached | ||
|
||
|
||
class TestAsyncQuerySession: | ||
@pytest.mark.asyncio | ||
async def test_session_normal_lifecycle(self, session: QuerySessionAsync): | ||
_check_session_state_empty(session) | ||
|
||
await session.create() | ||
_check_session_state_full(session) | ||
|
||
await session.delete() | ||
_check_session_state_empty(session) | ||
|
||
@pytest.mark.asyncio | ||
async def test_second_create_do_nothing(self, session: QuerySessionAsync): | ||
await session.create() | ||
_check_session_state_full(session) | ||
|
||
session_id_before = session._state.session_id | ||
node_id_before = session._state.node_id | ||
|
||
await session.create() | ||
_check_session_state_full(session) | ||
|
||
assert session._state.session_id == session_id_before | ||
assert session._state.node_id == node_id_before | ||
|
||
@pytest.mark.asyncio | ||
async def test_second_delete_do_nothing(self, session: QuerySessionAsync): | ||
await session.create() | ||
|
||
await session.delete() | ||
await session.delete() | ||
|
||
@pytest.mark.asyncio | ||
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync): | ||
with pytest.raises(RuntimeError): | ||
await session.delete() | ||
|
||
@pytest.mark.asyncio | ||
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync): | ||
await session.create() | ||
await session.delete() | ||
with pytest.raises(RuntimeError): | ||
await session.create() | ||
|
||
def test_transaction_before_create_raises(self, session: QuerySessionAsync): | ||
with pytest.raises(RuntimeError): | ||
session.transaction() | ||
|
||
@pytest.mark.asyncio | ||
async def test_transaction_after_delete_raises(self, session: QuerySessionAsync): | ||
await session.create() | ||
|
||
await session.delete() | ||
|
||
with pytest.raises(RuntimeError): | ||
session.transaction() | ||
|
||
@pytest.mark.asyncio | ||
async def test_transaction_after_create_not_raises(self, session: QuerySessionAsync): | ||
await session.create() | ||
session.transaction() | ||
|
||
@pytest.mark.asyncio | ||
async def test_execute_before_create_raises(self, session: QuerySessionAsync): | ||
with pytest.raises(RuntimeError): | ||
await session.execute("select 1;") | ||
|
||
@pytest.mark.asyncio | ||
async def test_execute_after_delete_raises(self, session: QuerySessionAsync): | ||
await session.create() | ||
await session.delete() | ||
with pytest.raises(RuntimeError): | ||
await session.execute("select 1;") | ||
|
||
@pytest.mark.asyncio | ||
async def test_basic_execute(self, session: QuerySessionAsync): | ||
await session.create() | ||
it = await session.execute("select 1;") | ||
result_sets = [result_set async for result_set in it] | ||
|
||
assert len(result_sets) == 1 | ||
assert len(result_sets[0].rows) == 1 | ||
assert len(result_sets[0].columns) == 1 | ||
assert list(result_sets[0].rows[0].values()) == [1] | ||
|
||
@pytest.mark.asyncio | ||
async def test_two_results(self, session: QuerySessionAsync): | ||
await session.create() | ||
res = [] | ||
|
||
async with await session.execute("select 1; select 2") as results: | ||
async for result_set in results: | ||
if len(result_set.rows) > 0: | ||
res.append(list(result_set.rows[0].values())) | ||
|
||
assert res == [[1], [2]] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import pytest | ||
import ydb | ||
from ydb.aio.query.pool import QuerySessionPoolAsync | ||
from ydb.aio.query.session import QuerySessionAsync, QuerySessionStateEnum | ||
|
||
|
||
class TestQuerySessionPoolAsync: | ||
@pytest.mark.asyncio | ||
async def test_checkout_provides_created_session(self, pool: QuerySessionPoolAsync): | ||
async with pool.checkout() as session: | ||
assert session._state._state == QuerySessionStateEnum.CREATED | ||
|
||
assert session._state._state == QuerySessionStateEnum.CLOSED | ||
|
||
@pytest.mark.asyncio | ||
async def test_oneshot_query_normal(self, pool: QuerySessionPoolAsync): | ||
res = await pool.execute_with_retries("select 1;") | ||
assert len(res) == 1 | ||
|
||
@pytest.mark.asyncio | ||
async def test_oneshot_ddl_query(self, pool: QuerySessionPoolAsync): | ||
await pool.execute_with_retries("create table Queen(key UInt64, PRIMARY KEY (key));") | ||
await pool.execute_with_retries("drop table Queen;") | ||
|
||
@pytest.mark.asyncio | ||
async def test_oneshot_query_raises(self, pool: QuerySessionPoolAsync): | ||
with pytest.raises(ydb.GenericError): | ||
await pool.execute_with_retries("Is this the real life? Is this just fantasy?") | ||
|
||
@pytest.mark.asyncio | ||
async def test_retry_op_uses_created_session(self, pool: QuerySessionPoolAsync): | ||
async def callee(session: QuerySessionAsync): | ||
assert session._state._state == QuerySessionStateEnum.CREATED | ||
|
||
await pool.retry_operation_async(callee) | ||
|
||
@pytest.mark.asyncio | ||
async def test_retry_op_normal(self, pool: QuerySessionPoolAsync): | ||
async def callee(session: QuerySessionAsync): | ||
async with session.transaction() as tx: | ||
iterator = await tx.execute("select 1;", commit_tx=True) | ||
return [result_set async for result_set in iterator] | ||
|
||
res = await pool.retry_operation_async(callee) | ||
assert len(res) == 1 | ||
|
||
@pytest.mark.asyncio | ||
async def test_retry_op_raises(self, pool: QuerySessionPoolAsync): | ||
class CustomException(Exception): | ||
pass | ||
|
||
async def callee(session: QuerySessionAsync): | ||
raise CustomException() | ||
|
||
with pytest.raises(CustomException): | ||
await pool.retry_operation_async(callee) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to _v2_asyncio?