Skip to content

Query Service AIO #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions examples/query-service/basic_example_asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to _v2_asyncio?

import ydb


async def main():
driver_config = ydb.DriverConfig(
endpoint="grpc://localhost:2136",
database="/local",
# credentials=ydb.credentials_from_env_variables(),
# root_certificates=ydb.load_ydb_root_certificate(),
)
try:
driver = ydb.aio.Driver(driver_config)
await driver.wait(timeout=5)
except TimeoutError:
raise RuntimeError("Connect failed to YDB")

pool = ydb.aio.QuerySessionPoolAsync(driver)

print("=" * 50)
print("DELETE TABLE IF EXISTS")
await pool.execute_with_retries("DROP TABLE IF EXISTS example")

print("=" * 50)
print("CREATE TABLE")
await pool.execute_with_retries("CREATE TABLE example(key UInt64, value String, PRIMARY KEY (key))")

await pool.execute_with_retries("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')")

async def callee(session):
print("=" * 50)
async with await session.execute("DELETE FROM example"):
pass

print("BEFORE ACTION")
async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

print("=" * 50)
print("INSERT WITH COMMIT TX")

async with session.transaction() as tx:
await tx.begin()

async with await tx.execute("INSERT INTO example (key, value) VALUES (1, 'onepieceisreal')"):
pass

async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

await tx.commit()

print("=" * 50)
print("AFTER COMMIT TX")

async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

print("=" * 50)
print("INSERT WITH ROLLBACK TX")

async with session.transaction() as tx:
await tx.begin()

async with await tx.execute("INSERT INTO example (key, value) VALUES (2, 'onepieceisreal')"):
pass

async with await tx.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

await tx.rollback()

print("=" * 50)
print("AFTER ROLLBACK TX")

async with await session.execute("SELECT COUNT(*) AS rows_count FROM example") as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

await pool.retry_operation_async(callee)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

execute_with_retries?


async def callee(session: ydb.aio.QuerySessionAsync):
query_print = """select $a"""

print("=" * 50)
print("Check implicit typed parameters")

values = [
1,
1.0,
True,
"text",
{"4": 8, "15": 16, "23": 42},
[{"name": "Michael"}, {"surname": "Scott"}],
]

for value in values:
print(f"value: {value}")
async with await session.transaction().execute(
query=query_print,
parameters={"$a": value},
commit_tx=True,
) as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

print("=" * 50)
print("Check typed parameters as tuple pair")

typed_value = ([1, 2, 3], ydb.ListType(ydb.PrimitiveType.Int64))
print(f"value: {typed_value}")

async with await session.transaction().execute(
query=query_print,
parameters={"$a": typed_value},
commit_tx=True,
) as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

print("=" * 50)
print("Check typed parameters as ydb.TypedValue")

typed_value = ydb.TypedValue(111, ydb.PrimitiveType.Int64)
print(f"value: {typed_value}")

async with await session.transaction().execute(
query=query_print,
parameters={"$a": typed_value},
commit_tx=True,
) as results:
async for result_set in results:
print(f"rows: {str(result_set.rows)}")

await pool.retry_operation_async(callee)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use execute_with_retries?



if __name__ == "__main__":
asyncio.run(main())
Empty file added tests/aio/query/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions tests/aio/query/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync
from ydb.aio.query.pool import QuerySessionPoolAsync


@pytest.fixture
async def session(driver):
session = QuerySessionAsync(driver)

yield session

try:
await session.delete()
except BaseException:
pass


@pytest.fixture
async def tx(session):
await session.create()
transaction = session.transaction()

yield transaction

try:
await transaction.rollback()
except BaseException:
pass


@pytest.fixture
def pool(driver):
pool = QuerySessionPoolAsync(driver)
yield pool
112 changes: 112 additions & 0 deletions tests/aio/query/test_query_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync


def _check_session_state_empty(session: QuerySessionAsync):
assert session._state.session_id is None
assert session._state.node_id is None
assert not session._state.attached


def _check_session_state_full(session: QuerySessionAsync):
assert session._state.session_id is not None
assert session._state.node_id is not None
assert session._state.attached


class TestAsyncQuerySession:
@pytest.mark.asyncio
async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
_check_session_state_empty(session)

await session.create()
_check_session_state_full(session)

await session.delete()
_check_session_state_empty(session)

@pytest.mark.asyncio
async def test_second_create_do_nothing(self, session: QuerySessionAsync):
await session.create()
_check_session_state_full(session)

session_id_before = session._state.session_id
node_id_before = session._state.node_id

await session.create()
_check_session_state_full(session)

assert session._state.session_id == session_id_before
assert session._state.node_id == node_id_before

@pytest.mark.asyncio
async def test_second_delete_do_nothing(self, session: QuerySessionAsync):
await session.create()

await session.delete()
await session.delete()

@pytest.mark.asyncio
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync):
with pytest.raises(RuntimeError):
await session.delete()

@pytest.mark.asyncio
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.create()

def test_transaction_before_create_raises(self, session: QuerySessionAsync):
with pytest.raises(RuntimeError):
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_delete_raises(self, session: QuerySessionAsync):
await session.create()

await session.delete()

with pytest.raises(RuntimeError):
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_create_not_raises(self, session: QuerySessionAsync):
await session.create()
session.transaction()

@pytest.mark.asyncio
async def test_execute_before_create_raises(self, session: QuerySessionAsync):
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_execute_after_delete_raises(self, session: QuerySessionAsync):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_basic_execute(self, session: QuerySessionAsync):
await session.create()
it = await session.execute("select 1;")
result_sets = [result_set async for result_set in it]

assert len(result_sets) == 1
assert len(result_sets[0].rows) == 1
assert len(result_sets[0].columns) == 1
assert list(result_sets[0].rows[0].values()) == [1]

@pytest.mark.asyncio
async def test_two_results(self, session: QuerySessionAsync):
await session.create()
res = []

async with await session.execute("select 1; select 2") as results:
async for result_set in results:
if len(result_set.rows) > 0:
res.append(list(result_set.rows[0].values()))

assert res == [[1], [2]]
56 changes: 56 additions & 0 deletions tests/aio/query/test_query_session_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import pytest
import ydb
from ydb.aio.query.pool import QuerySessionPoolAsync
from ydb.aio.query.session import QuerySessionAsync, QuerySessionStateEnum


class TestQuerySessionPoolAsync:
@pytest.mark.asyncio
async def test_checkout_provides_created_session(self, pool: QuerySessionPoolAsync):
async with pool.checkout() as session:
assert session._state._state == QuerySessionStateEnum.CREATED

assert session._state._state == QuerySessionStateEnum.CLOSED

@pytest.mark.asyncio
async def test_oneshot_query_normal(self, pool: QuerySessionPoolAsync):
res = await pool.execute_with_retries("select 1;")
assert len(res) == 1

@pytest.mark.asyncio
async def test_oneshot_ddl_query(self, pool: QuerySessionPoolAsync):
await pool.execute_with_retries("create table Queen(key UInt64, PRIMARY KEY (key));")
await pool.execute_with_retries("drop table Queen;")

@pytest.mark.asyncio
async def test_oneshot_query_raises(self, pool: QuerySessionPoolAsync):
with pytest.raises(ydb.GenericError):
await pool.execute_with_retries("Is this the real life? Is this just fantasy?")

@pytest.mark.asyncio
async def test_retry_op_uses_created_session(self, pool: QuerySessionPoolAsync):
async def callee(session: QuerySessionAsync):
assert session._state._state == QuerySessionStateEnum.CREATED

await pool.retry_operation_async(callee)

@pytest.mark.asyncio
async def test_retry_op_normal(self, pool: QuerySessionPoolAsync):
async def callee(session: QuerySessionAsync):
async with session.transaction() as tx:
iterator = await tx.execute("select 1;", commit_tx=True)
return [result_set async for result_set in iterator]

res = await pool.retry_operation_async(callee)
assert len(res) == 1

@pytest.mark.asyncio
async def test_retry_op_raises(self, pool: QuerySessionPoolAsync):
class CustomException(Exception):
pass

async def callee(session: QuerySessionAsync):
raise CustomException()

with pytest.raises(CustomException):
await pool.retry_operation_async(callee)
Loading
Loading