From 6f5270ba34c9c9798fbd08e2db189a9b1846ba8a Mon Sep 17 00:00:00 2001 From: Gleb Nosov Date: Mon, 24 Jun 2024 17:37:21 +0300 Subject: [PATCH 1/2] fix typings in _topic_reader --- ydb/_topic_reader/datatypes.py | 2 +- ydb/_topic_reader/topic_reader_asyncio.py | 18 +++++++++--------- ydb/_topic_reader/topic_reader_sync.py | 20 +++++++++----------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 28155ea7..a0578c19 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -7,7 +7,7 @@ from collections import deque from dataclasses import dataclass, field import datetime -from typing import Union, Any, List, Dict, Deque, Optional +from typing import Any, Deque, Dict, List, Optional, Union from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec from ydb._topic_reader import topic_reader_asyncio diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 7b3d1cfa..04fc453c 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -6,7 +6,7 @@ import typing from asyncio import Task from collections import deque -from typing import Optional, Set, Dict, Union, Callable +from typing import Any, Callable, Deque, Dict, Optional, Set, Union import ydb from .. import _apis, issues @@ -65,7 +65,7 @@ class PublicAsyncIOReader: _loop: asyncio.AbstractEventLoop _closed: bool _reconnector: ReaderReconnector - _parent: typing.Any # need for prevent close parent client by GC + _parent: Any # need for prevent close parent client by GC def __init__( self, @@ -97,7 +97,7 @@ async def wait_message(self): async def receive_batch( self, - ) -> typing.Union[datatypes.PublicBatch, None]: + ) -> Union[datatypes.PublicBatch, None]: """ Get one messages batch from reader. All messages in a batch from same partition. @@ -107,7 +107,7 @@ async def receive_batch( await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait() - async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: + async def receive_message(self) -> Optional[datatypes.PublicMessage]: """ Block until receive new message @@ -116,7 +116,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: await self._reconnector.wait_message() return self._reconnector.receive_message_nowait() - def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): + def commit(self, batch: Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ Write commit message to a buffer. @@ -128,7 +128,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa except PublicTopicReaderPartitionExpiredError: pass - async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): + async def commit_with_ack(self, batch: Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ write commit message to a buffer and wait ack from the server. @@ -255,7 +255,7 @@ class ReaderStream: _partition_sessions: Dict[int, datatypes.PartitionSession] _buffer_size_bytes: int # use for init request, then for debug purposes only _decode_executor: concurrent.futures.Executor - _decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes + _decoders: Dict[int, Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes if typing.TYPE_CHECKING: _batches_to_decode: asyncio.Queue[datatypes.PublicBatch] @@ -264,7 +264,7 @@ class ReaderStream: _state_changed: asyncio.Event _closed: bool - _message_batches: typing.Deque[datatypes.PublicBatch] + _message_batches: Deque[datatypes.PublicBatch] _first_error: asyncio.Future[YdbError] _update_token_interval: Union[int, float] @@ -558,7 +558,7 @@ def _buffer_release_bytes(self, bytes_size): ) ) - def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> typing.List[datatypes.PublicBatch]: + def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> List[datatypes.PublicBatch]: batches = [] batch_count = sum(len(p.batches) for p in message.partition_data) diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index c266de82..65d3890f 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -1,7 +1,6 @@ import asyncio import concurrent.futures -import typing -from typing import List, Union, Optional +from typing import Any, List, Optional, Union from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType from ydb._topic_common.common import ( @@ -10,7 +9,6 @@ TimeoutType, ) from ydb._topic_reader import datatypes -from ydb._topic_reader.datatypes import PublicBatch from ydb._topic_reader.topic_reader import ( PublicReaderSettings, CommitResult, @@ -25,7 +23,7 @@ class TopicReaderSync: _caller: CallFromSyncToAsync _async_reader: PublicAsyncIOReader _closed: bool - _parent: typing.Any # need for prevent stop the client by GC + _parent: Any # need for prevent stop the client by GC def __init__( self, @@ -60,7 +58,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def receive_message(self, *, timeout: TimeoutType = None) -> datatypes.PublicMessage: + def receive_message(self, *, timeout: TimeoutType = None) -> Union[datatypes.PublicMessage, None]: """ Block until receive new message It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. @@ -89,10 +87,10 @@ def async_wait_message(self) -> concurrent.futures.Future: def receive_batch( self, *, - max_messages: typing.Union[int, None] = None, - max_bytes: typing.Union[int, None] = None, + max_messages: Union[int, None] = None, + max_bytes: Union[int, None] = None, timeout: Union[float, None] = None, - ) -> Union[PublicBatch, None]: + ) -> Union[datatypes.PublicBatch, None]: """ Get one messages batch from reader It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available. @@ -107,7 +105,7 @@ def receive_batch( timeout, ) - def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]): + def commit(self, mess: Union[datatypes.PublicMessage, datatypes.PublicBatch]): """ Put commit message to internal buffer. @@ -120,7 +118,7 @@ def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBat def commit_with_ack( self, - mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch], + mess: Union[datatypes.PublicMessage, datatypes.PublicBatch], timeout: TimeoutType = None, ) -> Union[CommitResult, List[CommitResult]]: """ @@ -133,7 +131,7 @@ def commit_with_ack( return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout) def async_commit_with_ack( - self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch] + self, mess: Union[datatypes.PublicMessage, datatypes.PublicBatch] ) -> concurrent.futures.Future: """ write commit message to a buffer and return Future for wait result. From 6975fb030fb88bdf14ef6100c88242b41683deb4 Mon Sep 17 00:00:00 2001 From: Gleb Nosov Date: Mon, 24 Jun 2024 18:11:38 +0300 Subject: [PATCH 2/2] import List from typing in topic_reader_asyncio.py --- ydb/_topic_reader/topic_reader_asyncio.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 04fc453c..110e8d05 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -6,7 +6,7 @@ import typing from asyncio import Task from collections import deque -from typing import Any, Callable, Deque, Dict, Optional, Set, Union +from typing import Any, Callable, Deque, Dict, List, Optional, Set, Union import ydb from .. import _apis, issues