From 036ea0b928826078440accf2624aa71c7144e7fc Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:04:45 +0300 Subject: [PATCH 01/23] Update topic_writer_asyncio.py --- ydb/_topic_writer/topic_writer_asyncio.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 585e88ab..3ea6196b 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -366,8 +366,8 @@ async def _connection_loop(self): self._stream_connected.set() - send_loop = asyncio.create_task(self._send_loop(stream_writer), name="writer send loop") - receive_loop = asyncio.create_task(self._read_loop(stream_writer), name="writer receive loop") + send_loop = asyncio.create_task(self._send_loop(stream_writer)) + receive_loop = asyncio.create_task(self._read_loop(stream_writer)) tasks = [send_loop, receive_loop] done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED) @@ -653,7 +653,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = asyncio.create_task(self._update_token_loop(), name="update_token_loop") + self._update_token_task = asyncio.create_task(self._update_token_loop()) @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient): From 66d972599aba2e0123e2afff5a0ac92a509ad3d3 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:06:07 +0300 Subject: [PATCH 02/23] Update topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 81c6d9f4..d2ed55ef 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -87,7 +87,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: - self._loop.create_task(self.close(flush=False), name="close reader") + self._loop.create_task(self.close(flush=False)) async def wait_message(self): """ @@ -337,12 +337,12 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._update_token_event.set() - self._background_tasks.add(asyncio.create_task(self._read_messages_loop(), name="read_messages_loop")) - self._background_tasks.add(asyncio.create_task(self._decode_batches_loop(), name="decode_batches")) + self._background_tasks.add(asyncio.create_task(self._read_messages_loop())) + self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) if self._get_token_function: - self._background_tasks.add(asyncio.create_task(self._update_token_loop(), name="update_token_loop")) + self._background_tasks.add(asyncio.create_task(self._update_token_loop())) self._background_tasks.add( - asyncio.create_task(self._handle_background_errors(), name="handle_background_errors") + asyncio.create_task(self._handle_background_errors()) ) async def wait_error(self): From 711696538decc25888193597bb3f134bceea9033 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:09:34 +0300 Subject: [PATCH 03/23] Update topic_writer_asyncio.py --- ydb/_topic_writer/topic_writer_asyncio.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 3ea6196b..22b6dfaf 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -231,8 +231,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings): self._new_messages = asyncio.Queue() self._stop_reason = self._loop.create_future() self._background_tasks = [ - asyncio.create_task(self._connection_loop(), name="connection_loop"), - asyncio.create_task(self._encode_loop(), name="encode_loop"), + asyncio.create_task(self._connection_loop()), + asyncio.create_task(self._encode_loop()), ] self._state_changed = asyncio.Event() From 10974e0f5a9a3d99273c02a9748261e6933685fc Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:14:08 +0300 Subject: [PATCH 04/23] Update topic_reader_asyncio.py fix linters --- ydb/_topic_reader/topic_reader_asyncio.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index d2ed55ef..e871e549 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -341,9 +341,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) if self._get_token_function: self._background_tasks.add(asyncio.create_task(self._update_token_loop())) - self._background_tasks.add( - asyncio.create_task(self._handle_background_errors()) - ) + self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) async def wait_error(self): raise await self._first_error From 8c187670c99eb082018850eb438703bd3313bbeb Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:25:37 +0300 Subject: [PATCH 05/23] Update topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e871e549..3b56b342 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -341,7 +341,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) if self._get_token_function: self._background_tasks.add(asyncio.create_task(self._update_token_loop())) - self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) + self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) async def wait_error(self): raise await self._first_error From 2d5c288c119b38bc1068b46732e8d207ce5f81cc Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Mon, 23 Sep 2024 14:26:18 +0300 Subject: [PATCH 06/23] Update topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 3b56b342..e871e549 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -341,7 +341,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) if self._get_token_function: self._background_tasks.add(asyncio.create_task(self._update_token_loop())) - self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) + self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) async def wait_error(self): raise await self._first_error From 8767bd3e85b5e360cff4bead7201a864e40779a9 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 11:29:02 +0300 Subject: [PATCH 07/23] Update topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e871e549..a118deb0 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -169,7 +169,7 @@ def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): self._state_changed = asyncio.Event() self._stream_reader = None - self._background_tasks.add(asyncio.create_task(self._connection_loop())) + self._background_tasks.add(asyncio.create_task(self._connection_loop() )) self._first_error = asyncio.get_running_loop().create_future() async def _connection_loop(self): From c5e3a6408ed4e62189da9a2e0ee80f3d0781d5ef Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 11:29:28 +0300 Subject: [PATCH 08/23] Update topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index a118deb0..e871e549 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -169,7 +169,7 @@ def __init__(self, driver: Driver, settings: topic_reader.PublicReaderSettings): self._state_changed = asyncio.Event() self._stream_reader = None - self._background_tasks.add(asyncio.create_task(self._connection_loop() )) + self._background_tasks.add(asyncio.create_task(self._connection_loop())) self._first_error = asyncio.get_running_loop().create_future() async def _connection_loop(self): From 34e2a0a64fa54837f2c2114962979b095973f4dd Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:18:52 +0000 Subject: [PATCH 09/23] add wrapper for asyncio.create_task --- ydb/_topic_common/common.py | 7 +++++++ ydb/_topic_reader/topic_reader_asyncio.py | 25 ++++++++++++++++++----- ydb/_topic_writer/topic_writer_asyncio.py | 17 ++++++++++----- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 7a97336e..c296116e 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -1,5 +1,6 @@ import asyncio import concurrent.futures +import sys import threading import typing from typing import Optional @@ -29,6 +30,12 @@ def wrapper(rpc_state, response_pb, driver=None): return wrapper +def wrap_create_asyncio_task(func: typing.Callable, *args, **kwargs, task_name: str): + if sys.hexversion < 0x03080000: + return asyncio.create_task(func(*args, **kwargs)) + return asyncio.create_task(func(*args, **kwargs), task_name=loop_name) + + _shared_event_loop_lock = threading.Lock() _shared_event_loop: Optional[asyncio.AbstractEventLoop] = None diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index e871e549..4226badb 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -3,6 +3,7 @@ import asyncio import concurrent.futures import gzip +import sys import typing from asyncio import Task from collections import deque @@ -10,6 +11,7 @@ import ydb from .. import _apis, issues +from .._topic_common import common as topic_common from .._utilities import AtomicCounter from ..aio import Driver from ..issues import Error as YdbError, _process_response @@ -87,7 +89,10 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: - self._loop.create_task(self.close(flush=False)) + if sys.hexversion < 0x03080000: + self._loop.create_task(self.close(flush=False)) + else: + self._loop.create_task(self.close(flush=False), name="close reader") async def wait_message(self): """ @@ -337,11 +342,21 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._update_token_event.set() - self._background_tasks.add(asyncio.create_task(self._read_messages_loop())) - self._background_tasks.add(asyncio.create_task(self._decode_batches_loop())) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._read_messages_loop, task_name="read_messages_loop"), + ) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._decode_batches_loop, task_name="decode_batches"), + ) if self._get_token_function: - self._background_tasks.add(asyncio.create_task(self._update_token_loop())) - self._background_tasks.add(asyncio.create_task(self._handle_background_errors())) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task(self._update_token_loop, task_name="update_token_loop"), + ) + self._background_tasks.add( + topic_common.wrap_create_asyncio_task( + self._handle_background_errors, task_name="handle_background_errors", + ), + ) async def wait_error(self): raise await self._first_error diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 22b6dfaf..08687f97 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -28,6 +28,7 @@ issues, ) from .._errors import check_retriable_error +from .._topic_common import common as topic_common from ..retries import RetrySettings from .._grpc.grpcwrapper.ydb_topic_public_types import PublicCodec from .._grpc.grpcwrapper.ydb_topic import ( @@ -231,8 +232,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings): self._new_messages = asyncio.Queue() self._stop_reason = self._loop.create_future() self._background_tasks = [ - asyncio.create_task(self._connection_loop()), - asyncio.create_task(self._encode_loop()), + topic_common.wrap_create_asyncio_task(self._connection_loop, task_name="connection_loop"), + topic_common.wrap_create_asyncio_task(self._encode_loop, task_name="encode_loop"), ] self._state_changed = asyncio.Event() @@ -366,8 +367,12 @@ async def _connection_loop(self): self._stream_connected.set() - send_loop = asyncio.create_task(self._send_loop(stream_writer)) - receive_loop = asyncio.create_task(self._read_loop(stream_writer)) + send_loop = topic_common.wrap_create_asyncio_task( + self._send_loop, stream_writer, task_name="writer send loop", + ) + receive_loop = topic_common.wrap_create_asyncio_task( + self._read_loop, stream_writer, task_name="writer receive loop", + ) tasks = [send_loop, receive_loop] done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED) @@ -653,7 +658,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = asyncio.create_task(self._update_token_loop()) + self._update_token_task = topic_common.wrap_create_asyncio_task( + self._update_token_loop, task_name="update_token_loop", + ) @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient): From f4d3bcca8efc408877cd557bad06b9da82a91cab Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:22:05 +0000 Subject: [PATCH 10/23] fix linters --- ydb/_topic_common/common.py | 4 ++-- ydb/_topic_writer/topic_writer_asyncio.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index c296116e..bb133143 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -30,10 +30,10 @@ def wrapper(rpc_state, response_pb, driver=None): return wrapper -def wrap_create_asyncio_task(func: typing.Callable, *args, **kwargs, task_name: str): +def wrap_create_asyncio_task(func: typing.Callable, task_name: str, *args, **kwargs): if sys.hexversion < 0x03080000: return asyncio.create_task(func(*args, **kwargs)) - return asyncio.create_task(func(*args, **kwargs), task_name=loop_name) + return asyncio.create_task(func(*args, **kwargs), name=task_name) _shared_event_loop_lock = threading.Lock() diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 08687f97..637f14ea 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -368,10 +368,10 @@ async def _connection_loop(self): self._stream_connected.set() send_loop = topic_common.wrap_create_asyncio_task( - self._send_loop, stream_writer, task_name="writer send loop", + self._send_loop, task_name="writer send loop", stream_writer, ) receive_loop = topic_common.wrap_create_asyncio_task( - self._read_loop, stream_writer, task_name="writer receive loop", + self._read_loop, task_name="writer receive loop", stream_writer, ) tasks = [send_loop, receive_loop] From a779513a18d4fa9f33701b0bfebd1fad4be2c1e1 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:27:57 +0000 Subject: [PATCH 11/23] fix linters --- ydb/_topic_reader/topic_reader_asyncio.py | 16 +++++++--------- ydb/_topic_writer/topic_writer_asyncio.py | 12 ++++++------ 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 4226badb..7ef15a87 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -343,19 +343,17 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._update_token_event.set() self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._read_messages_loop, task_name="read_messages_loop"), - ) + topic_common.wrap_create_asyncio_task(self._read_messages_loop, "read_messages_loop"), + ) self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._decode_batches_loop, task_name="decode_batches"), - ) + topic_common.wrap_create_asyncio_task(self._decode_batches_loop, "decode_batches"), + ) if self._get_token_function: self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._update_token_loop, task_name="update_token_loop"), - ) + topic_common.wrap_create_asyncio_task(self._update_token_loop, "update_token_loop"), + ) self._background_tasks.add( - topic_common.wrap_create_asyncio_task( - self._handle_background_errors, task_name="handle_background_errors", - ), + topic_common.wrap_create_asyncio_task(self._handle_background_errors, "handle_background_errors"), ) async def wait_error(self): diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 637f14ea..6608d464 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -232,8 +232,8 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings): self._new_messages = asyncio.Queue() self._stop_reason = self._loop.create_future() self._background_tasks = [ - topic_common.wrap_create_asyncio_task(self._connection_loop, task_name="connection_loop"), - topic_common.wrap_create_asyncio_task(self._encode_loop, task_name="encode_loop"), + topic_common.wrap_create_asyncio_task(self._connection_loop, "connection_loop"), + topic_common.wrap_create_asyncio_task(self._encode_loop, "encode_loop"), ] self._state_changed = asyncio.Event() @@ -368,10 +368,10 @@ async def _connection_loop(self): self._stream_connected.set() send_loop = topic_common.wrap_create_asyncio_task( - self._send_loop, task_name="writer send loop", stream_writer, + self._send_loop, "writer send loop", stream_writer, ) receive_loop = topic_common.wrap_create_asyncio_task( - self._read_loop, task_name="writer receive loop", stream_writer, + self._read_loop, "writer receive loop", stream_writer, ) tasks = [send_loop, receive_loop] @@ -659,8 +659,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, task_name="update_token_loop", - ) + self._update_token_loop, "update_token_loop", + ) @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient): From 1bcb7262f8572029907065d9ebb254f4d399f018 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:32:40 +0000 Subject: [PATCH 12/23] fix linters --- ydb/_topic_writer/topic_writer_asyncio.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 6608d464..a6983178 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -368,11 +368,15 @@ async def _connection_loop(self): self._stream_connected.set() send_loop = topic_common.wrap_create_asyncio_task( - self._send_loop, "writer send loop", stream_writer, - ) + self._send_loop, + "writer send loop", + stream_writer, + ) receive_loop = topic_common.wrap_create_asyncio_task( - self._read_loop, "writer receive loop", stream_writer, - ) + self._read_loop, + "writer receive loop", + stream_writer, + ) tasks = [send_loop, receive_loop] done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED) @@ -659,7 +663,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, "update_token_loop", + self._update_token_loop, + "update_token_loop", ) @staticmethod From e33b9c4923e5be6a9079e400784e55d3723174ff Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:38:56 +0000 Subject: [PATCH 13/23] fix linters --- ydb/_topic_writer/topic_writer_asyncio.py | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index a6983178..25f7756b 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -367,14 +367,10 @@ async def _connection_loop(self): self._stream_connected.set() - send_loop = topic_common.wrap_create_asyncio_task( - self._send_loop, - "writer send loop", - stream_writer, - ) + send_loop = topic_common.wrap_create_asyncio_task(self._send_loop, "writer send loop", stream_writer) receive_loop = topic_common.wrap_create_asyncio_task( - self._read_loop, - "writer receive loop", + self._read_loop, + "writer receive loop", stream_writer, ) @@ -662,10 +658,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, - "update_token_loop", - ) + self._update_token_task = topic_common.wrap_create_asyncio_task(self._update_token_loop, "update_token_loop") @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient): From ef44568d20e99c0708397c9ea4fe5210021b840b Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:41:56 +0000 Subject: [PATCH 14/23] fix linters --- ydb/_topic_writer/topic_writer_asyncio.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 25f7756b..00c6490e 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -658,7 +658,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = topic_common.wrap_create_asyncio_task(self._update_token_loop, "update_token_loop") + self._update_token_task = topic_common.wrap_create_asyncio_task( + self._update_token_loop, "update_token_loop", + ) @staticmethod def _ensure_ok(message: WriterMessagesFromServerToClient): From 202169664890fea74a253203afb7f6fbbab0f176 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:43:34 +0000 Subject: [PATCH 15/23] fix linters --- ydb/_topic_writer/topic_writer_asyncio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 00c6490e..992ece61 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -659,7 +659,8 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, "update_token_loop", + self._update_token_loop, + "update_token_loop", ) @staticmethod From 2082cb868ca7f4e8f7777b60f7cf325e713d7782 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Wed, 25 Sep 2024 16:47:20 +0000 Subject: [PATCH 16/23] fix linters --- ydb/_topic_writer/topic_writer_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 992ece61..79789193 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -659,7 +659,7 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, + self._update_token_loop, "update_token_loop", ) From b6908b56e687b1f42722f77a04f8c807b4262ed8 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:08:04 +0000 Subject: [PATCH 17/23] split setting task name during function declaration stage --- ydb/_topic_common/common.py | 12 ++++++---- ydb/_topic_common/common_test.py | 18 ++++++++++++++- ydb/_topic_reader/topic_reader_asyncio.py | 27 ++++++++++++++-------- ydb/_topic_writer/topic_writer_asyncio.py | 28 +++++++++++++++-------- 4 files changed, 60 insertions(+), 25 deletions(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index bb133143..0afe2c81 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -29,11 +29,13 @@ def wrapper(rpc_state, response_pb, driver=None): return wrapper - -def wrap_create_asyncio_task(func: typing.Callable, task_name: str, *args, **kwargs): - if sys.hexversion < 0x03080000: - return asyncio.create_task(func(*args, **kwargs)) - return asyncio.create_task(func(*args, **kwargs), name=task_name) +if sys.hexversion < 0x03080000: + def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: + task.set_name(task_name) + return task +else: + def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: + return task _shared_event_loop_lock = threading.Lock() diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index b31f9af9..188aaa41 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -1,4 +1,5 @@ import asyncio +import sys import threading import time import typing @@ -6,7 +7,7 @@ import grpc import pytest -from .common import CallFromSyncToAsync +from .common import CallFromSyncToAsync, wrap_set_name_for_asyncio_task from .._grpc.grpcwrapper.common_utils import ( GrpcWrapperAsyncIO, ServerStatus, @@ -75,6 +76,21 @@ async def async_failed(): with pytest.raises(TestError): await callback_from_asyncio(async_failed) + async def test_task_name_on_asyncio_task(self): + task_name = "asyncio task" + loop = asyncio.get_running_loop() + + async def some_async_task(): + await asyncio.sleep(0) + return 1 + + asyncio_task = loop.create_task(some_async_task()) + wrap_set_name_for_asyncio_task(asyncio_task, task_name=task_name) + + if sys.hexversion >= 0x03080000: + assert asyncio_task.get_name() == task_name + + @pytest.mark.asyncio class TestGrpcWrapperAsyncIO: diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 7ef15a87..752e0a1f 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -3,7 +3,6 @@ import asyncio import concurrent.futures import gzip -import sys import typing from asyncio import Task from collections import deque @@ -89,10 +88,8 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: - if sys.hexversion < 0x03080000: - self._loop.create_task(self.close(flush=False)) - else: - self._loop.create_task(self.close(flush=False), name="close reader") + task = self._loop.create_task(self.close(flush=False)) + topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader") async def wait_message(self): """ @@ -343,17 +340,29 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._update_token_event.set() self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._read_messages_loop, "read_messages_loop"), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._read_messages_loop()), + task_name="read_messages_loop", + ), ) self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._decode_batches_loop, "decode_batches"), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._decode_batches_loop()), + task_name="decode_batches", + ), ) if self._get_token_function: self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._update_token_loop, "update_token_loop"), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._update_token_loop()), + task_name="update_token_loop", + ), ) self._background_tasks.add( - topic_common.wrap_create_asyncio_task(self._handle_background_errors, "handle_background_errors"), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._handle_background_errors()), + task_name="handle_background_errors", + ), ) async def wait_error(self): diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 79789193..c7f88a42 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -232,8 +232,14 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings): self._new_messages = asyncio.Queue() self._stop_reason = self._loop.create_future() self._background_tasks = [ - topic_common.wrap_create_asyncio_task(self._connection_loop, "connection_loop"), - topic_common.wrap_create_asyncio_task(self._encode_loop, "encode_loop"), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._connection_loop()), + task_name="connection_loop", + ), + topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._encode_loop()), + task_name="encode_loop", + ), ] self._state_changed = asyncio.Event() @@ -367,11 +373,13 @@ async def _connection_loop(self): self._stream_connected.set() - send_loop = topic_common.wrap_create_asyncio_task(self._send_loop, "writer send loop", stream_writer) - receive_loop = topic_common.wrap_create_asyncio_task( - self._read_loop, - "writer receive loop", - stream_writer, + send_loop = topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._send_loop(stream_writer)), + task_name="writer send loop", + ) + receive_loop = topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._read_loop(stream_writer)), + task_name="writer receive loop", ) tasks = [send_loop, receive_loop] @@ -658,9 +666,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes if self._update_token_interval is not None: self._update_token_event.set() - self._update_token_task = topic_common.wrap_create_asyncio_task( - self._update_token_loop, - "update_token_loop", + self._update_token_task = topic_common.wrap_set_name_for_asyncio_task( + asyncio.create_task(self._update_token_loop()), + task_name="update_token_loop", ) @staticmethod From d489a62fdd46f6a04b99d507eac6ad59be9f53a3 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:11:28 +0000 Subject: [PATCH 18/23] fix tests --- ydb/_topic_common/common_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index 188aaa41..a61b1ce3 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -81,7 +81,7 @@ async def test_task_name_on_asyncio_task(self): loop = asyncio.get_running_loop() async def some_async_task(): - await asyncio.sleep(0) + await asyncio.sleep(1) return 1 asyncio_task = loop.create_task(some_async_task()) From 2534b70342e362943fdd4c3236ea8f4a02816b2d Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:16:25 +0000 Subject: [PATCH 19/23] fix tests --- ydb/_topic_common/common.py | 2 +- ydb/_topic_common/common_test.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 0afe2c81..3e1a19be 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -31,10 +31,10 @@ def wrapper(rpc_state, response_pb, driver=None): if sys.hexversion < 0x03080000: def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: - task.set_name(task_name) return task else: def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: + task.set_name(task_name) return task diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index a61b1ce3..2fd5dfc2 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -87,8 +87,7 @@ async def some_async_task(): asyncio_task = loop.create_task(some_async_task()) wrap_set_name_for_asyncio_task(asyncio_task, task_name=task_name) - if sys.hexversion >= 0x03080000: - assert asyncio_task.get_name() == task_name + assert asyncio_task.get_name() == task_name From 724f6a1721fc5ad9a8086f18db98479673f40358 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:19:41 +0000 Subject: [PATCH 20/23] fix linters --- ydb/_topic_common/common.py | 4 ++++ ydb/_topic_common/common_test.py | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 3e1a19be..73e28781 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -29,10 +29,14 @@ def wrapper(rpc_state, response_pb, driver=None): return wrapper + if sys.hexversion < 0x03080000: + def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: return task + else: + def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: task.set_name(task_name) return task diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index 2fd5dfc2..a0de3105 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -90,7 +90,6 @@ async def some_async_task(): assert asyncio_task.get_name() == task_name - @pytest.mark.asyncio class TestGrpcWrapperAsyncIO: async def test_convert_grpc_errors_to_ydb(self): From 7c5bb8abb9b10cefd50e59faa263dbd0a32fc133 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:21:54 +0000 Subject: [PATCH 21/23] fix linters --- ydb/_topic_common/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_common/common.py b/ydb/_topic_common/common.py index 73e28781..8241dda4 100644 --- a/ydb/_topic_common/common.py +++ b/ydb/_topic_common/common.py @@ -36,7 +36,7 @@ def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asynci return task else: - + def wrap_set_name_for_asyncio_task(task: asyncio.Task, task_name: str) -> asyncio.Task: task.set_name(task_name) return task From 63abf1515d5b8d1470e8f81a7b4712cc5c331ae7 Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:27:08 +0000 Subject: [PATCH 22/23] fix linters --- ydb/_topic_common/common_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index a0de3105..f76c57da 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -1,5 +1,4 @@ import asyncio -import sys import threading import time import typing From fa5cc4209aaa78372970d1b7d2c5f9c90a6cf46d Mon Sep 17 00:00:00 2001 From: Alexander <56935749+alex2211-put@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:07:57 +0000 Subject: [PATCH 23/23] fix tests --- ydb/_topic_common/common_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_common/common_test.py b/ydb/_topic_common/common_test.py index f76c57da..32261520 100644 --- a/ydb/_topic_common/common_test.py +++ b/ydb/_topic_common/common_test.py @@ -80,7 +80,7 @@ async def test_task_name_on_asyncio_task(self): loop = asyncio.get_running_loop() async def some_async_task(): - await asyncio.sleep(1) + await asyncio.sleep(0) return 1 asyncio_task = loop.create_task(some_async_task())