From 92643a9cb267a728d9f33a8cda8bacd28951dfb0 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Mon, 30 Jun 2025 15:12:58 +0300 Subject: [PATCH] Add detailed debug logs to topic instances --- examples/topic/basic_example.py | 4 +- test-requirements.txt | 2 +- ydb/_topic_reader/topic_reader_asyncio.py | 55 +++++++++++- .../topic_reader_asyncio_test.py | 2 + ydb/_topic_reader/topic_reader_sync.py | 2 +- ydb/_topic_writer/topic_writer_asyncio.py | 85 ++++++++++++++++++- .../topic_writer_asyncio_test.py | 1 + ydb/_topic_writer/topic_writer_sync.py | 19 ++++- ydb/topic.py | 35 +++++++- 9 files changed, 195 insertions(+), 10 deletions(-) diff --git a/examples/topic/basic_example.py b/examples/topic/basic_example.py index 18e9626f..4726ed79 100644 --- a/examples/topic/basic_example.py +++ b/examples/topic/basic_example.py @@ -66,7 +66,7 @@ async def main(): args = parser.parse_args() if args.verbose: - logger = logging.getLogger("topicexample") + logger = logging.getLogger("ydb") logger.setLevel(logging.DEBUG) logger.addHandler(logging.StreamHandler()) @@ -79,6 +79,8 @@ async def main(): read_messages(driver, args.path, args.consumer), ) + await driver.stop() + if __name__ == "__main__": asyncio.run(main()) diff --git a/test-requirements.txt b/test-requirements.txt index 012697ec..3cbcd696 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -46,6 +46,6 @@ sqlalchemy==1.4.26 pylint-protobuf cython freezegun==1.2.2 -pytest-cov +# pytest-cov yandexcloud -e . diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 24e8fa9e..7baadacb 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -99,7 +99,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: try: - logger.warning("Topic reader was not closed properly. Consider using method close().") + logger.debug("Topic reader was not closed properly. Consider using method close().") task = self._loop.create_task(self.close(flush=False)) topic_common.wrap_set_name_for_asyncio_task(task, task_name="close reader") except BaseException: @@ -121,6 +121,7 @@ async def receive_batch( use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_batch max_messages=%s", max_messages) await self._reconnector.wait_message() return self._reconnector.receive_batch_nowait( max_messages=max_messages, @@ -137,6 +138,7 @@ async def receive_batch_with_tx( use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_batch_with_tx tx=%s max_messages=%s", tx, max_messages) await self._reconnector.wait_message() return self._reconnector.receive_batch_with_tx_nowait( tx=tx, @@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]: use asyncio.wait_for for wait with timeout. """ + logger.debug("receive_message") await self._reconnector.wait_message() return self._reconnector.receive_message_nowait() @@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa For the method no way check the commit result (for example if lost connection - commits will not re-send and committed messages will receive again). """ + logger.debug("commit message or batch") if self._settings.consumer is None: raise issues.Error("Commit operations are not supported for topic reader without consumer.") @@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat before receive commit ack. Message may be acked or not (if not - it will send in other read session, to this or other reader). """ + logger.debug("commit_with_ack message or batch") if self._settings.consumer is None: raise issues.Error("Commit operations are not supported for topic reader without consumer.") @@ -187,8 +192,10 @@ async def close(self, flush: bool = True): if self._closed: raise TopicReaderClosedError() + logger.debug("Close topic reader") self._closed = True await self._reconnector.close(flush) + logger.debug("Topic reader was closed") @property def read_session_id(self) -> Optional[str]: @@ -214,11 +221,12 @@ def __init__( settings: topic_reader.PublicReaderSettings, loop: Optional[asyncio.AbstractEventLoop] = None, ): - self._id = self._static_reader_reconnector_counter.inc_and_get() + self._id = ReaderReconnector._static_reader_reconnector_counter.inc_and_get() self._settings = settings self._driver = driver self._loop = loop if loop is not None else asyncio.get_running_loop() self._background_tasks = set() + logger.debug("init reader reconnector id=%s", self._id) self._state_changed = asyncio.Event() self._stream_reader = None @@ -231,13 +239,16 @@ async def _connection_loop(self): attempt = 0 while True: try: + logger.debug("reader %s connect attempt %s", self._id, attempt) self._stream_reader = await ReaderStream.create(self._id, self._driver, self._settings) + logger.debug("reader %s connected stream %s", self._id, self._stream_reader._id) attempt = 0 self._state_changed.set() await self._stream_reader.wait_error() except BaseException as err: retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt) if not retry_info.is_retriable: + logger.debug("reader %s stop connection loop due to %s", self._id, err) self._set_first_error(err) return @@ -358,6 +369,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co return self._stream_reader.commit(batch) async def close(self, flush: bool): + logger.debug("reader reconnector %s close", self._id) if self._stream_reader: await self._stream_reader.close(flush) for task in self._background_tasks: @@ -447,6 +459,8 @@ def __init__( self._settings = settings + logger.debug("created ReaderStream id=%s reconnector=%s", self._id, self._reader_reconnector_id) + @staticmethod async def create( reader_reconnector_id: int, @@ -464,6 +478,7 @@ async def create( get_token_function=creds.get_auth_token if creds else None, ) await reader._start(stream, settings._init_message()) + logger.debug("reader stream %s started session=%s", reader._id, reader._session_id) return reader async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMessage.InitRequest): @@ -472,11 +487,13 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess self._started = True self._stream = stream + logger.debug("reader stream %s send init request", self._id) stream.write(StreamReadMessage.FromClient(client_message=init_message)) init_response = await stream.receive() # type: StreamReadMessage.FromServer if isinstance(init_response.server_message, StreamReadMessage.InitResponse): self._session_id = init_response.server_message.session_id + logger.debug("reader stream %s initialized session=%s", self._id, self._session_id) else: raise TopicReaderError("Unexpected message after InitRequest: %s", init_response) @@ -615,6 +632,7 @@ async def _handle_background_errors(self): async def _read_messages_loop(self): try: + logger.debug("reader stream %s start read loop", self._id) self._stream.write( StreamReadMessage.FromClient( client_message=StreamReadMessage.ReadRequest( @@ -628,6 +646,7 @@ async def _read_messages_loop(self): _process_response(message.server_status) if isinstance(message.server_message, StreamReadMessage.ReadResponse): + logger.debug("reader stream %s read %s bytes", self._id, message.server_message.bytes_size) self._on_read_response(message.server_message) elif isinstance(message.server_message, StreamReadMessage.CommitOffsetResponse): @@ -637,18 +656,33 @@ async def _read_messages_loop(self): message.server_message, StreamReadMessage.StartPartitionSessionRequest, ): + logger.debug( + "reader stream %s start partition %s", + self._id, + message.server_message.partition_session.partition_session_id, + ) await self._on_start_partition_session(message.server_message) elif isinstance( message.server_message, StreamReadMessage.StopPartitionSessionRequest, ): + logger.debug( + "reader stream %s stop partition %s", + self._id, + message.server_message.partition_session_id, + ) self._on_partition_session_stop(message.server_message) elif isinstance( message.server_message, StreamReadMessage.EndPartitionSession, ): + logger.debug( + "reader stream %s end partition %s", + self._id, + message.server_message.partition_session_id, + ) self._on_end_partition_session(message.server_message) elif isinstance(message.server_message, UpdateTokenResponse): @@ -663,6 +697,7 @@ async def _read_messages_loop(self): self._state_changed.set() except Exception as e: + logger.debug("reader stream %s error: %s", self._id, e) self._set_first_error(e) return @@ -825,6 +860,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) -> async def _decode_batches_loop(self): while True: batch = await self._batches_to_decode.get() + logger.debug("reader stream %s decode batch %s messages", self._id, len(batch.messages)) await self._decode_batch_inplace(batch) self._add_batch_to_queue(batch) self._state_changed.set() @@ -833,9 +869,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch): part_sess_id = batch._partition_session.id if part_sess_id in self._message_batches: self._message_batches[part_sess_id]._extend(batch) + logger.debug( + "reader stream %s extend batch partition=%s size=%s", + self._id, + part_sess_id, + len(batch.messages), + ) return self._message_batches[part_sess_id] = batch + logger.debug( + "reader stream %s new batch partition=%s size=%s", + self._id, + part_sess_id, + len(batch.messages), + ) async def _decode_batch_inplace(self, batch): if batch._codec == Codec.CODEC_RAW: @@ -882,6 +930,7 @@ async def close(self, flush: bool): return self._closed = True + logger.debug("reader stream %s close", self._id) if flush: await self.flush() @@ -899,3 +948,5 @@ async def close(self, flush: bool): if self._background_tasks: await asyncio.wait(self._background_tasks) + + logger.debug("reader stream %s was closed", self._id) diff --git a/ydb/_topic_reader/topic_reader_asyncio_test.py b/ydb/_topic_reader/topic_reader_asyncio_test.py index 7ad5077c..36f86177 100644 --- a/ydb/_topic_reader/topic_reader_asyncio_test.py +++ b/ydb/_topic_reader/topic_reader_asyncio_test.py @@ -1485,6 +1485,7 @@ async def wait_error(): raise test_error reader_stream_mock_with_error = mock.Mock(ReaderStream) + reader_stream_mock_with_error._id = 0 reader_stream_mock_with_error.wait_error = mock.AsyncMock(side_effect=wait_error) async def wait_messages_with_error(): @@ -1497,6 +1498,7 @@ async def wait_forever(): await f reader_stream_with_messages = mock.Mock(ReaderStream) + reader_stream_with_messages._id = 0 reader_stream_with_messages.wait_error = mock.AsyncMock(side_effect=wait_forever) reader_stream_with_messages.wait_messages.return_value = None diff --git a/ydb/_topic_reader/topic_reader_sync.py b/ydb/_topic_reader/topic_reader_sync.py index f7590a21..3eea0390 100644 --- a/ydb/_topic_reader/topic_reader_sync.py +++ b/ydb/_topic_reader/topic_reader_sync.py @@ -64,7 +64,7 @@ async def create_reader(): def __del__(self): if not self._closed: try: - logger.warning("Topic reader was not closed properly. Consider using method close().") + logger.debug("Topic reader was not closed properly. Consider using method close().") self.close(flush=False) except BaseException: logger.warning("Something went wrong during reader close in __del__") diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index ec5b2166..eeecbfd2 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -26,6 +26,7 @@ _apis, issues, ) +from .._utilities import AtomicCounter from .._errors import check_retriable_error from .._topic_common import common as topic_common from ..retries import RetrySettings @@ -82,7 +83,7 @@ def __del__(self): if self._closed or self._loop.is_closed(): return try: - logger.warning("Topic writer was not closed properly. Consider using method close().") + logger.debug("Topic writer was not closed properly. Consider using method close().") task = self._loop.create_task(self.close(flush=False)) topic_common.wrap_set_name_for_asyncio_task(task, task_name="close writer") except BaseException: @@ -92,9 +93,11 @@ async def close(self, *, flush: bool = True): if self._closed: return + logger.debug("Close topic writer") self._closed = True await self._reconnector.close(flush) + logger.debug("Topic writer was closed") async def write_with_ack( self, @@ -108,6 +111,10 @@ async def write_with_ack( For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write_with_ack %s messages", + len(messages) if isinstance(messages, list) else 1, + ) futures = await self.write_with_ack_future(messages) if not isinstance(futures, list): futures = [futures] @@ -129,6 +136,10 @@ async def write_with_ack_future( For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write_with_ack_future %s messages", + len(messages) if isinstance(messages, list) else 1, + ) input_single_message = not isinstance(messages, list) converted_messages = [] if isinstance(messages, list): @@ -153,6 +164,10 @@ async def write( For wait with timeout use asyncio.wait_for. """ + logger.debug( + "write %s messages", + len(messages) if isinstance(messages, list) else 1, + ) await self.write_with_ack_future(messages) async def flush(self): @@ -162,6 +177,7 @@ async def flush(self): For wait with timeout use asyncio.wait_for. """ + logger.debug("flush writer") return await self._reconnector.flush() async def wait_init(self) -> PublicWriterInitInfo: @@ -170,6 +186,7 @@ async def wait_init(self) -> PublicWriterInitInfo: For wait with timeout use asyncio.wait_for() """ + logger.debug("wait writer init") return await self._reconnector.wait_init() @@ -225,6 +242,8 @@ async def _on_before_rollback(self, tx: "BaseQueryTxContext"): class WriterAsyncIOReconnector: + _static_id_counter = AtomicCounter() + _closed: bool _loop: asyncio.AbstractEventLoop _credentials: Union[ydb.credentials.Credentials, None] @@ -260,6 +279,7 @@ def __init__( self, driver: SupportedDriverType, settings: WriterSettings, tx: Optional["BaseQueryTxContext"] = None ): self._closed = False + self._id = WriterAsyncIOReconnector._static_id_counter.inc_and_get() self._loop = asyncio.get_running_loop() self._driver = driver self._credentials = driver._credentials @@ -307,12 +327,13 @@ def __init__( ] self._state_changed = asyncio.Event() + logger.debug("init writer reconnector id=%s", self._id) async def close(self, flush: bool): if self._closed: return self._closed = True - logger.debug("Close writer reconnector") + logger.debug("Close writer reconnector id=%s", self._id) if flush: await self.flush() @@ -329,6 +350,8 @@ async def close(self, flush: bool): except TopicWriterStopped: pass + logger.debug("Writer reconnector id=%s was closed", self._id) + async def wait_init(self) -> PublicWriterInitInfo: while True: if self._stop_reason.done(): @@ -418,6 +441,7 @@ async def _connection_loop(self): # noinspection PyBroadException stream_writer = None try: + logger.debug("writer reconnector %s connect attempt %s", self._id, attempt) tx_identity = None if self._tx is None else self._tx._tx_identity() stream_writer = await WriterAsyncIOStream.create( self._driver, @@ -425,6 +449,11 @@ async def _connection_loop(self): self._settings.update_token_interval, tx_identity=tx_identity, ) + logger.debug( + "writer reconnector %s connected stream %s", + self._id, + stream_writer._id, + ) try: if self._init_info is None: self._last_known_seq_no = stream_writer.last_seqno @@ -458,6 +487,11 @@ async def _connection_loop(self): return await asyncio.sleep(err_info.sleep_timeout_seconds) + logger.debug( + "writer reconnector %s retry in %s seconds", + self._id, + err_info.sleep_timeout_seconds, + ) except (asyncio.CancelledError, Exception) as err: self._stop(err) @@ -477,6 +511,12 @@ async def _encode_loop(self): while not self._messages_for_encode.empty(): messages.extend(self._messages_for_encode.get_nowait()) + logger.debug( + "writer reconnector %s encode %s messages", + self._id, + len(messages), + ) + batch_codec = await self._codec_selector(messages) await self._encode_data_inplace(batch_codec, messages) self._add_messages_to_send_queue(messages) @@ -582,6 +622,8 @@ async def _read_loop(self, writer: "WriterAsyncIOStream"): while True: resp = await writer.receive() + logger.debug("writer reconnector %s received %s acks", self._id, len(resp.acks)) + for ack in resp.acks: self._handle_receive_ack(ack) @@ -604,20 +646,37 @@ def _handle_receive_ack(self, ack): else: raise TopicWriterError("internal error - receive unexpected ack message.") message_future.set_result(result) + logger.debug( + "writer reconnector %s ack seqno=%s result=%s", + self._id, + ack.seq_no, + type(result).__name__, + ) async def _send_loop(self, writer: "WriterAsyncIOStream"): try: + logger.debug("writer reconnector %s send loop start", self._id) messages = list(self._messages) last_seq_no = 0 for m in messages: writer.write([m]) + logger.debug( + "writer reconnector %s sent buffered message seqno=%s", + self._id, + m.seq_no, + ) last_seq_no = m.seq_no while True: m = await self._new_messages.get() # type: InternalMessage if m.seq_no > last_seq_no: writer.write([m]) + logger.debug( + "writer reconnector %s sent message seqno=%s", + self._id, + m.seq_no, + ) except asyncio.CancelledError: # the loop task cancelled be parent code, for example for reconnection # no need to stop all work. @@ -639,7 +698,7 @@ def _stop(self, reason: BaseException): f.set_exception(reason) self._state_changed.set() - logger.info("Stop topic writer: %s" % reason) + logger.info("Stop topic writer %s: %s" % (self._id, reason)) async def flush(self): if not self._messages_future: @@ -650,6 +709,8 @@ async def flush(self): class WriterAsyncIOStream: + _static_id_counter = AtomicCounter() + # todo slots _closed: bool @@ -674,6 +735,7 @@ def __init__( tx_identity: Optional[TransactionIdentity] = None, ): self._closed = False + self._id = WriterAsyncIOStream._static_id_counter.inc_and_get() self._update_token_interval = update_token_interval self._get_token_function = get_token_function @@ -686,12 +748,14 @@ async def close(self): if self._closed: return self._closed = True + logger.debug("writer stream %s close", self._id) if self._update_token_task: self._update_token_task.cancel() await asyncio.wait([self._update_token_task]) self._stream.close() + logger.debug("writer stream %s was closed", self._id) @staticmethod async def create( @@ -711,6 +775,11 @@ async def create( tx_identity=tx_identity, ) await writer._start(stream, init_request) + logger.debug( + "writer stream %s started seqno=%s", + writer._id, + writer.last_seqno, + ) return writer async def receive(self) -> StreamWriteMessage.WriteResponse: @@ -727,6 +796,7 @@ async def receive(self) -> StreamWriteMessage.WriteResponse: raise Exception("Unknown message while read writer answers: %s" % item) async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMessage.InitRequest): + logger.debug("writer stream %s send init request", self._id) stream.write(StreamWriteMessage.FromClient(init_message)) resp = await stream.receive() @@ -736,6 +806,11 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamWriteMes self.last_seqno = resp.last_seq_no self.supported_codecs = [PublicCodec(codec) for codec in resp.supported_codecs] + logger.debug( + "writer stream %s init done last_seqno=%s", + self._id, + self.last_seqno, + ) self._stream = stream @@ -755,6 +830,8 @@ def write(self, messages: List[InternalMessage]): if self._closed: raise RuntimeError("Can not write on closed stream.") + logger.debug("writer stream %s send %s messages", self._id, len(messages)) + for request in messages_to_proto_requests(messages, self._tx_identity): self._stream.write(request) @@ -764,6 +841,7 @@ async def _update_token_loop(self): token = self._get_token_function() if asyncio.iscoroutine(token): token = await token + logger.debug("writer stream %s update token", self._id) await self._update_token(token=token) async def _update_token(self, token: str): @@ -771,5 +849,6 @@ async def _update_token(self, token: str): try: msg = StreamWriteMessage.FromClient(UpdateTokenRequest(token)) self._stream.write(msg) + logger.debug("writer stream %s token sent", self._id) finally: self._update_token_event.clear() diff --git a/ydb/_topic_writer/topic_writer_asyncio_test.py b/ydb/_topic_writer/topic_writer_asyncio_test.py index cf88f797..3b67ba08 100644 --- a/ydb/_topic_writer/topic_writer_asyncio_test.py +++ b/ydb/_topic_writer/topic_writer_asyncio_test.py @@ -251,6 +251,7 @@ def __init__( update_token_interval: Optional[int, float] = None, get_token_function: Optional[Callable[[], str]] = None, ): + self._id = 0 self.last_seqno = 0 self.from_server = asyncio.Queue() self.from_client = asyncio.Queue() diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index 954864c9..7806d7fa 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -76,7 +76,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): def __del__(self): if not self._closed: try: - logger.warning("Topic writer was not closed properly. Consider using method close().") + logger.debug("Topic writer was not closed properly. Consider using method close().") self.close(flush=False) except BaseException: logger.warning("Something went wrong during writer close in __del__") @@ -85,6 +85,7 @@ def close(self, *, flush: bool = True, timeout: TimeoutType = None): if self._closed: return + logger.debug("Close topic writer") self._closed = True self._caller.safe_call_with_result(self._async_writer.close(flush=flush), timeout) @@ -101,16 +102,22 @@ def async_flush(self) -> Future: def flush(self, *, timeout=None): self._check_closed() + logger.debug("flush writer") + return self._caller.unsafe_call_with_result(self._async_writer.flush(), timeout) def async_wait_init(self) -> Future[PublicWriterInitInfo]: self._check_closed() + logger.debug("wait writer init") + return self._caller.unsafe_call_with_future(self._async_writer.wait_init()) def wait_init(self, *, timeout: TimeoutType = None) -> PublicWriterInitInfo: self._check_closed() + logger.debug("wait writer init") + return self._caller.unsafe_call_with_result(self._async_writer.wait_init(), timeout) def write( @@ -120,6 +127,11 @@ def write( ): self._check_closed() + logger.debug( + "write %s messages", + len(messages) if isinstance(messages, list) else 1, + ) + self._caller.safe_call_with_result(self._async_writer.write(messages), timeout) def async_write_with_ack( @@ -137,6 +149,11 @@ def write_with_ack( ) -> Union[PublicWriteResult, List[PublicWriteResult]]: self._check_closed() + logger.debug( + "write_with_ack %s messages", + len(messages) if isinstance(messages, list) else 1, + ) + return self._caller.unsafe_call_with_result(self._async_writer.write_with_ack(messages), timeout=timeout) diff --git a/ydb/topic.py b/ydb/topic.py index aa6c7eb4..5e86be68 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -122,7 +122,7 @@ def __init__(self, driver: aio.Driver, settings: Optional[TopicClientSettings] = def __del__(self): if not self._closed: try: - logger.warning("Topic client was not closed properly. Consider using method close().") + logger.debug("Topic client was not closed properly. Consider using method close().") self.close() except BaseException: logger.warning("Something went wrong during topic client close in __del__") @@ -161,6 +161,7 @@ async def create_topic( :param consumers: List of consumers for this topic :param metering_mode: Metering mode for the topic in a serverless database """ + logger.debug("Create topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.CreateTopicRequestParams(**args) @@ -210,6 +211,7 @@ async def alter_topic( :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. Empty list mean disable codec compatibility checks for the topic. """ + logger.debug("Alter topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.AlterTopicRequestParams(**args) @@ -222,6 +224,7 @@ async def alter_topic( ) async def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: + logger.debug("Describe topic request: path=%s", path) args = locals().copy() del args["self"] req = _ydb_topic_public_types.DescribeTopicRequestParams(**args) @@ -234,6 +237,7 @@ async def describe_topic(self, path: str, include_stats: bool = False) -> TopicD return res.to_public() async def drop_topic(self, path: str): + logger.debug("Drop topic request: path=%s", path) req = _ydb_topic_public_types.DropTopicRequestParams(path=path) await self._driver( req.to_proto(), @@ -257,6 +261,8 @@ def reader( event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReaderAsyncIO: + logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) + if not decoder_executor: decoder_executor = self._executor @@ -301,6 +307,7 @@ def writer( # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicWriterAsyncIO: + logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id) args = locals().copy() del args["self"] @@ -329,6 +336,7 @@ def tx_writer( # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, ) -> TopicTxWriterAsyncIO: + logger.debug("Create tx writer for topic=%s tx=%s", topic, tx) args = locals().copy() del args["self"] del args["tx"] @@ -343,6 +351,13 @@ def tx_writer( async def commit_offset( self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None ) -> None: + logger.debug( + "Commit offset: path=%s partition_id=%s offset=%s consumer=%s", + path, + partition_id, + offset, + consumer, + ) req = _ydb_topic.CommitOffsetRequest( path=path, consumer=consumer, @@ -362,6 +377,7 @@ def close(self): if self._closed: return + logger.debug("Close topic client") self._closed = True self._executor.shutdown(wait=False) @@ -433,6 +449,7 @@ def create_topic( :param consumers: List of consumers for this topic :param metering_mode: Metering mode for the topic in a serverless database """ + logger.debug("Create topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -484,6 +501,7 @@ def alter_topic( :param set_supported_codecs: List of allowed codecs for writers. Writes with codec not from this list are forbidden. Empty list mean disable codec compatibility checks for the topic. """ + logger.debug("Alter topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -498,6 +516,7 @@ def alter_topic( ) def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescription: + logger.debug("Describe topic request: path=%s", path) args = locals().copy() del args["self"] self._check_closed() @@ -514,6 +533,8 @@ def describe_topic(self, path: str, include_stats: bool = False) -> TopicDescrip def drop_topic(self, path: str): self._check_closed() + logger.debug("Drop topic request: path=%s", path) + req = _ydb_topic_public_types.DropTopicRequestParams(path=path) self._driver( req.to_proto(), @@ -536,6 +557,7 @@ def reader( auto_partitioning_support: Optional[bool] = True, # Auto partitioning feature flag. Default - True. event_handler: Optional[TopicReaderEvents.EventHandler] = None, ) -> TopicReader: + logger.debug("Create reader for topic=%s consumer=%s", topic, consumer) if not decoder_executor: decoder_executor = self._executor @@ -580,6 +602,7 @@ def writer( # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: + logger.debug("Create writer for topic=%s producer_id=%s", topic, producer_id) args = locals().copy() del args["self"] self._check_closed() @@ -609,6 +632,7 @@ def tx_writer( # If max_worker in the executor is 1 - then encoders will be called from the thread without parallel. encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: + logger.debug("Create tx writer for topic=%s tx=%s", topic, tx) args = locals().copy() del args["self"] del args["tx"] @@ -624,6 +648,13 @@ def tx_writer( def commit_offset( self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None ) -> None: + logger.debug( + "Commit offset: path=%s partition_id=%s offset=%s consumer=%s", + path, + partition_id, + offset, + consumer, + ) req = _ydb_topic.CommitOffsetRequest( path=path, consumer=consumer, @@ -643,8 +674,10 @@ def close(self): if self._closed: return + logger.debug("Close topic client") self._closed = True self._executor.shutdown(wait=False) + logger.debug("Topic client was closed") def _check_closed(self): if not self._closed: