Skip to content

Add detailed debug logs to topic instances #687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/topic/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def main():
args = parser.parse_args()

if args.verbose:
logger = logging.getLogger("topicexample")
logger = logging.getLogger("ydb")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

Expand All @@ -79,6 +79,8 @@ async def main():
read_messages(driver, args.path, args.consumer),
)

await driver.stop()


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ sqlalchemy==1.4.26
pylint-protobuf
cython
freezegun==1.2.2
pytest-cov
# pytest-cov
yandexcloud
-e .
55 changes: 53 additions & 2 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
def __del__(self):
if not self._closed:
try:
logger.warning("Topic reader was not closed properly. Consider using method close().")
logger.debug("Topic reader was not closed properly. Consider using method close().")
task = self._loop.create_task(self.close(flush=False))
topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader")
except BaseException:
Expand All @@ -121,6 +121,7 @@ async def receive_batch(

use asyncio.wait_for for wait with timeout.
"""
logger.debug("receive_batch max_messages=%s", max_messages)
await self._reconnector.wait_message()
return self._reconnector.receive_batch_nowait(
max_messages=max_messages,
Expand All @@ -137,6 +138,7 @@ async def receive_batch_with_tx(

use asyncio.wait_for for wait with timeout.
"""
logger.debug("receive_batch_with_tx tx=%s max_messages=%s", tx, max_messages)
await self._reconnector.wait_message()
return self._reconnector.receive_batch_with_tx_nowait(
tx=tx,
Expand All @@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:

use asyncio.wait_for for wait with timeout.
"""
logger.debug("receive_message")
await self._reconnector.wait_message()
return self._reconnector.receive_message_nowait()

Expand All @@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
For the method no way check the commit result
(for example if lost connection - commits will not re-send and committed messages will receive again).
"""
logger.debug("commit message or batch")
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")

Expand All @@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
to this or other reader).
"""
logger.debug("commit_with_ack message or batch")
if self._settings.consumer is None:
raise issues.Error("Commit operations are not supported for topic reader without consumer.")

Expand All @@ -187,8 +192,10 @@ async def close(self, flush: bool = True):
if self._closed:
raise TopicReaderClosedError()

logger.debug("Close topic reader")
self._closed = True
await self._reconnector.close(flush)
logger.debug("Topic reader was closed")

@property
def read_session_id(self) -> Optional[str]:
Expand All @@ -214,11 +221,12 @@ def __init__(
settings: topic_reader.PublicReaderSettings,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self._id = self._static_reader_reconnector_counter.inc_and_get()
self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get()
self._settings = settings
self._driver = driver
self._loop = loop if loop is not None else asyncio.get_running_loop()
self._background_tasks = set()
logger.debug("init reader reconnector id=%s", self._id)

self._state_changed = asyncio.Event()
self._stream_reader = None
Expand All @@ -231,13 +239,16 @@ async def _connection_loop(self):
attempt = 0
while True:
try:
logger.debug("reader %s connect attempt %s", self._id, attempt)
self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings)
logger.debug("reader %s connected stream %s", self._id, self._stream_reader._id)
attempt = 0
self._state_changed.set()
await self._stream_reader.wait_error()
except BaseException as err:
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
if not retry_info.is_retriable:
logger.debug("reader %s stop connection loop due to %s", self._id, err)
self._set_first_error(err)
return

Expand Down Expand Up @@ -358,6 +369,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
return self._stream_reader.commit(batch)

async def close(self, flush: bool):
logger.debug("reader reconnector %s close", self._id)
if self._stream_reader:
await self._stream_reader.close(flush)
for task in self._background_tasks:
Expand Down Expand Up @@ -447,6 +459,8 @@ def __init__(

self._settings = settings

logger.debug("created ReaderStream id=%s reconnector=%s", self._id, self._reader_reconnector_id)

@staticmethod
async def create(
reader_reconnector_id: int,
Expand All @@ -464,6 +478,7 @@ async def create(
get_token_function=creds.get_auth_token if creds else None,
)
await reader._start(stream, settings._init_message())
logger.debug("reader stream %s started session=%s", reader._id, reader._session_id)
return reader

async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest):
Expand All @@ -472,11 +487,13 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess

self._started = True
self._stream = stream
logger.debug("reader stream %s send init request", self._id)

stream.write(StreamReadMessage.FromClient(client_message=init_message))
init_response = await stream.receive() # type: StreamReadMessage.FromServer
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
self._session_id = init_response.server_message.session_id
logger.debug("reader stream %s initialized session=%s", self._id, self._session_id)
else:
raise TopicReaderError("Unexpected message after InitRequest: %s", init_response)

Expand Down Expand Up @@ -615,6 +632,7 @@ async def _handle_background_errors(self):

async def _read_messages_loop(self):
try:
logger.debug("reader stream %s start read loop", self._id)
self._stream.write(
StreamReadMessage.FromClient(
client_message=StreamReadMessage.ReadRequest(
Expand All @@ -628,6 +646,7 @@ async def _read_messages_loop(self):
_process_response(message.server_status)

if isinstance(message.server_message, StreamReadMessage.ReadResponse):
logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size)
self._on_read_response(message.server_message)

elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse):
Expand All @@ -637,18 +656,33 @@ async def _read_messages_loop(self):
message.server_message,
StreamReadMessage.StartPartitionSessionRequest,
):
logger.debug(
"reader stream %s start partition %s",
self._id,
message.server_message.partition_session.partition_session_id,
)
await self._on_start_partition_session(message.server_message)

elif isinstance(
message.server_message,
StreamReadMessage.StopPartitionSessionRequest,
):
logger.debug(
"reader stream %s stop partition %s",
self._id,
message.server_message.partition_session_id,
)
self._on_partition_session_stop(message.server_message)

elif isinstance(
message.server_message,
StreamReadMessage.EndPartitionSession,
):
logger.debug(
"reader stream %s end partition %s",
self._id,
message.server_message.partition_session_id,
)
self._on_end_partition_session(message.server_message)

elif isinstance(message.server_message, UpdateTokenResponse):
Expand All @@ -663,6 +697,7 @@ async def _read_messages_loop(self):

self._state_changed.set()
except Exception as e:
logger.debug("reader stream %s error: %s", self._id, e)
self._set_first_error(e)
return

Expand Down Expand Up @@ -825,6 +860,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
async def _decode_batches_loop(self):
while True:
batch = await self._batches_to_decode.get()
logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages))
await self._decode_batch_inplace(batch)
self._add_batch_to_queue(batch)
self._state_changed.set()
Expand All @@ -833,9 +869,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
part_sess_id = batch._partition_session.id
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id]._extend(batch)
logger.debug(
"reader stream %s extend batch partition=%s size=%s",
self._id,
part_sess_id,
len(batch.messages),
)
return

self._message_batches[part_sess_id] = batch
logger.debug(
"reader stream %s new batch partition=%s size=%s",
self._id,
part_sess_id,
len(batch.messages),
)

async def _decode_batch_inplace(self, batch):
if batch._codec == Codec.CODEC_RAW:
Expand Down Expand Up @@ -882,6 +930,7 @@ async def close(self, flush: bool):
return

self._closed = True
logger.debug("reader stream %s close", self._id)

if flush:
await self.flush()
Expand All @@ -899,3 +948,5 @@ async def close(self, flush: bool):

if self._background_tasks:
await asyncio.wait(self._background_tasks)

logger.debug("reader stream %s was closed", self._id)
2 changes: 2 additions & 0 deletions ydb/_topic_reader/topic_reader_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,7 @@ async def wait_error():
raise test_error

reader_stream_mock_with_error = mock.Mock(ReaderStream)
reader_stream_mock_with_error._id = 0
reader_stream_mock_with_error.wait_error = mock.AsyncMock(side_effect=wait_error)

async def wait_messages_with_error():
Expand All @@ -1497,6 +1498,7 @@ async def wait_forever():
await f

reader_stream_with_messages = mock.Mock(ReaderStream)
reader_stream_with_messages._id = 0
reader_stream_with_messages.wait_error = mock.AsyncMock(side_effect=wait_forever)
reader_stream_with_messages.wait_messages.return_value = None

Expand Down
2 changes: 1 addition & 1 deletion ydb/_topic_reader/topic_reader_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def create_reader():
def __del__(self):
if not self._closed:
try:
logger.warning("Topic reader was not closed properly. Consider using method close().")
logger.debug("Topic reader was not closed properly. Consider using method close().")
self.close(flush=False)
except BaseException:
logger.warning("Something went wrong during reader close in __del__")
Expand Down
Loading
Loading