Skip to content

Fix some typings in _topic_reader #444

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import deque
from dataclasses import dataclass, field
import datetime
from typing import Union, Any, List, Dict, Deque, Optional
from typing import Any, Deque, Dict, List, Optional, Union

from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec
from ydb._topic_reader import topic_reader_asyncio
Expand Down
18 changes: 9 additions & 9 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import typing
from asyncio import Task
from collections import deque
from typing import Optional, Set, Dict, Union, Callable
from typing import Any, Callable, Deque, Dict, List, Optional, Set, Union

import ydb
from .. import _apis, issues
Expand Down Expand Up @@ -65,7 +65,7 @@ class PublicAsyncIOReader:
_loop: asyncio.AbstractEventLoop
_closed: bool
_reconnector: ReaderReconnector
_parent: typing.Any # need for prevent close parent client by GC
_parent: Any # need for prevent close parent client by GC

def __init__(
self,
Expand Down Expand Up @@ -97,7 +97,7 @@ async def wait_message(self):

async def receive_batch(
self,
) -> typing.Union[datatypes.PublicBatch, None]:
) -> Union[datatypes.PublicBatch, None]:
"""
Get one messages batch from reader.
All messages in a batch from same partition.
Expand All @@ -107,7 +107,7 @@ async def receive_batch(
await self._reconnector.wait_message()
return self._reconnector.receive_batch_nowait()

async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
async def receive_message(self) -> Optional[datatypes.PublicMessage]:
"""
Block until receive new message

Expand All @@ -116,7 +116,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
await self._reconnector.wait_message()
return self._reconnector.receive_message_nowait()

def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
def commit(self, batch: Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
Write commit message to a buffer.

Expand All @@ -128,7 +128,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
except PublicTopicReaderPartitionExpiredError:
pass

async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
async def commit_with_ack(self, batch: Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
write commit message to a buffer and wait ack from the server.

Expand Down Expand Up @@ -255,7 +255,7 @@ class ReaderStream:
_partition_sessions: Dict[int, datatypes.PartitionSession]
_buffer_size_bytes: int # use for init request, then for debug purposes only
_decode_executor: concurrent.futures.Executor
_decoders: Dict[int, typing.Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes
_decoders: Dict[int, Callable[[bytes], bytes]] # dict[codec_code] func(encoded_bytes)->decoded_bytes

if typing.TYPE_CHECKING:
_batches_to_decode: asyncio.Queue[datatypes.PublicBatch]
Expand All @@ -264,7 +264,7 @@ class ReaderStream:

_state_changed: asyncio.Event
_closed: bool
_message_batches: typing.Deque[datatypes.PublicBatch]
_message_batches: Deque[datatypes.PublicBatch]
_first_error: asyncio.Future[YdbError]

_update_token_interval: Union[int, float]
Expand Down Expand Up @@ -558,7 +558,7 @@ def _buffer_release_bytes(self, bytes_size):
)
)

def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> typing.List[datatypes.PublicBatch]:
def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> List[datatypes.PublicBatch]:
batches = []

batch_count = sum(len(p.batches) for p in message.partition_data)
Expand Down
20 changes: 9 additions & 11 deletions ydb/_topic_reader/topic_reader_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import concurrent.futures
import typing
from typing import List, Union, Optional
from typing import Any, List, Optional, Union

from ydb._grpc.grpcwrapper.common_utils import SupportedDriverType
from ydb._topic_common.common import (
Expand All @@ -10,7 +9,6 @@
TimeoutType,
)
from ydb._topic_reader import datatypes
from ydb._topic_reader.datatypes import PublicBatch
from ydb._topic_reader.topic_reader import (
PublicReaderSettings,
CommitResult,
Expand All @@ -25,7 +23,7 @@ class TopicReaderSync:
_caller: CallFromSyncToAsync
_async_reader: PublicAsyncIOReader
_closed: bool
_parent: typing.Any # need for prevent stop the client by GC
_parent: Any # need for prevent stop the client by GC

def __init__(
self,
Expand Down Expand Up @@ -60,7 +58,7 @@ def __enter__(self):
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def receive_message(self, *, timeout: TimeoutType = None) -> datatypes.PublicMessage:
def receive_message(self, *, timeout: TimeoutType = None) -> Union[datatypes.PublicMessage, None]:
"""
Block until receive new message
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
Expand Down Expand Up @@ -89,10 +87,10 @@ def async_wait_message(self) -> concurrent.futures.Future:
def receive_batch(
self,
*,
max_messages: typing.Union[int, None] = None,
max_bytes: typing.Union[int, None] = None,
max_messages: Union[int, None] = None,
max_bytes: Union[int, None] = None,
timeout: Union[float, None] = None,
) -> Union[PublicBatch, None]:
) -> Union[datatypes.PublicBatch, None]:
"""
Get one messages batch from reader
It has no async_ version for prevent lost messages, use async_wait_message as signal for new batches available.
Expand All @@ -107,7 +105,7 @@ def receive_batch(
timeout,
)

def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]):
def commit(self, mess: Union[datatypes.PublicMessage, datatypes.PublicBatch]):
"""
Put commit message to internal buffer.

Expand All @@ -120,7 +118,7 @@ def commit(self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBat

def commit_with_ack(
self,
mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch],
mess: Union[datatypes.PublicMessage, datatypes.PublicBatch],
timeout: TimeoutType = None,
) -> Union[CommitResult, List[CommitResult]]:
"""
Expand All @@ -133,7 +131,7 @@ def commit_with_ack(
return self._caller.unsafe_call_with_result(self._async_reader.commit_with_ack(mess), timeout)

def async_commit_with_ack(
self, mess: typing.Union[datatypes.PublicMessage, datatypes.PublicBatch]
self, mess: Union[datatypes.PublicMessage, datatypes.PublicBatch]
) -> concurrent.futures.Future:
"""
write commit message to a buffer and return Future for wait result.
Expand Down
Loading