@@ -99,7 +99,7 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
99
99
def __del__ (self ):
100
100
if not self ._closed :
101
101
try :
102
- logger .warning ("Topic reader was not closed properly. Consider using method close()." )
102
+ logger .debug ("Topic reader was not closed properly. Consider using method close()." )
103
103
task = self ._loop .create_task (self .close (flush = False ))
104
104
topic_common .wrap_set_name_for_asyncio_task (task , task_name = "close reader" )
105
105
except BaseException :
@@ -121,6 +121,7 @@ async def receive_batch(
121
121
122
122
use asyncio.wait_for for wait with timeout.
123
123
"""
124
+ logger .debug ("receive_batch max_messages=%s" , max_messages )
124
125
await self ._reconnector .wait_message ()
125
126
return self ._reconnector .receive_batch_nowait (
126
127
max_messages = max_messages ,
@@ -137,6 +138,7 @@ async def receive_batch_with_tx(
137
138
138
139
use asyncio.wait_for for wait with timeout.
139
140
"""
141
+ logger .debug ("receive_batch_with_tx tx=%s max_messages=%s" , tx , max_messages )
140
142
await self ._reconnector .wait_message ()
141
143
return self ._reconnector .receive_batch_with_tx_nowait (
142
144
tx = tx ,
@@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
149
151
150
152
use asyncio.wait_for for wait with timeout.
151
153
"""
154
+ logger .debug ("receive_message" )
152
155
await self ._reconnector .wait_message ()
153
156
return self ._reconnector .receive_message_nowait ()
154
157
@@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
159
162
For the method no way check the commit result
160
163
(for example if lost connection - commits will not re-send and committed messages will receive again).
161
164
"""
165
+ logger .debug ("commit message or batch" )
162
166
if self ._settings .consumer is None :
163
167
raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
164
168
@@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
177
181
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
178
182
to this or other reader).
179
183
"""
184
+ logger .debug ("commit_with_ack message or batch" )
180
185
if self ._settings .consumer is None :
181
186
raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
182
187
@@ -187,8 +192,10 @@ async def close(self, flush: bool = True):
187
192
if self ._closed :
188
193
raise TopicReaderClosedError ()
189
194
195
+ logger .debug ("Close topic reader" )
190
196
self ._closed = True
191
197
await self ._reconnector .close (flush )
198
+ logger .debug ("Topic reader was closed" )
192
199
193
200
@property
194
201
def read_session_id (self ) -> Optional [str ]:
@@ -214,11 +221,12 @@ def __init__(
214
221
settings : topic_reader .PublicReaderSettings ,
215
222
loop : Optional [asyncio .AbstractEventLoop ] = None ,
216
223
):
217
- self ._id = self ._static_reader_reconnector_counter .inc_and_get ()
224
+ self ._id = ReaderReconnector ._static_reader_reconnector_counter .inc_and_get ()
218
225
self ._settings = settings
219
226
self ._driver = driver
220
227
self ._loop = loop if loop is not None else asyncio .get_running_loop ()
221
228
self ._background_tasks = set ()
229
+ logger .debug ("init reader reconnector id=%s" , self ._id )
222
230
223
231
self ._state_changed = asyncio .Event ()
224
232
self ._stream_reader = None
@@ -231,13 +239,16 @@ async def _connection_loop(self):
231
239
attempt = 0
232
240
while True :
233
241
try :
242
+ logger .debug ("reader %s connect attempt %s" , self ._id , attempt )
234
243
self ._stream_reader = await ReaderStream .create (self ._id , self ._driver , self ._settings )
244
+ logger .debug ("reader %s connected stream %s" , self ._id , self ._stream_reader ._id )
235
245
attempt = 0
236
246
self ._state_changed .set ()
237
247
await self ._stream_reader .wait_error ()
238
248
except BaseException as err :
239
249
retry_info = check_retriable_error (err , self ._settings ._retry_settings (), attempt )
240
250
if not retry_info .is_retriable :
251
+ logger .debug ("reader %s stop connection loop due to %s" , self ._id , err )
241
252
self ._set_first_error (err )
242
253
return
243
254
@@ -358,6 +369,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
358
369
return self ._stream_reader .commit (batch )
359
370
360
371
async def close (self , flush : bool ):
372
+ logger .debug ("reader reconnector %s close" , self ._id )
361
373
if self ._stream_reader :
362
374
await self ._stream_reader .close (flush )
363
375
for task in self ._background_tasks :
@@ -447,6 +459,8 @@ def __init__(
447
459
448
460
self ._settings = settings
449
461
462
+ logger .debug ("created ReaderStream id=%s reconnector=%s" , self ._id , self ._reader_reconnector_id )
463
+
450
464
@staticmethod
451
465
async def create (
452
466
reader_reconnector_id : int ,
@@ -464,6 +478,7 @@ async def create(
464
478
get_token_function = creds .get_auth_token if creds else None ,
465
479
)
466
480
await reader ._start (stream , settings ._init_message ())
481
+ logger .debug ("reader stream %s started session=%s" , reader ._id , reader ._session_id )
467
482
return reader
468
483
469
484
async def _start (self , stream : IGrpcWrapperAsyncIO , init_message : StreamReadMessage .InitRequest ):
@@ -472,11 +487,13 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
472
487
473
488
self ._started = True
474
489
self ._stream = stream
490
+ logger .debug ("reader stream %s send init request" , self ._id )
475
491
476
492
stream .write (StreamReadMessage .FromClient (client_message = init_message ))
477
493
init_response = await stream .receive () # type: StreamReadMessage.FromServer
478
494
if isinstance (init_response .server_message , StreamReadMessage .InitResponse ):
479
495
self ._session_id = init_response .server_message .session_id
496
+ logger .debug ("reader stream %s initialized session=%s" , self ._id , self ._session_id )
480
497
else :
481
498
raise TopicReaderError ("Unexpected message after InitRequest: %s" , init_response )
482
499
@@ -615,6 +632,7 @@ async def _handle_background_errors(self):
615
632
616
633
async def _read_messages_loop (self ):
617
634
try :
635
+ logger .debug ("reader stream %s start read loop" , self ._id )
618
636
self ._stream .write (
619
637
StreamReadMessage .FromClient (
620
638
client_message = StreamReadMessage .ReadRequest (
@@ -628,6 +646,7 @@ async def _read_messages_loop(self):
628
646
_process_response (message .server_status )
629
647
630
648
if isinstance (message .server_message , StreamReadMessage .ReadResponse ):
649
+ logger .debug ("reader stream %s read %s bytes" , self ._id , message .server_message .bytes_size )
631
650
self ._on_read_response (message .server_message )
632
651
633
652
elif isinstance (message .server_message , StreamReadMessage .CommitOffsetResponse ):
@@ -637,18 +656,33 @@ async def _read_messages_loop(self):
637
656
message .server_message ,
638
657
StreamReadMessage .StartPartitionSessionRequest ,
639
658
):
659
+ logger .debug (
660
+ "reader stream %s start partition %s" ,
661
+ self ._id ,
662
+ message .server_message .partition_session .partition_session_id ,
663
+ )
640
664
await self ._on_start_partition_session (message .server_message )
641
665
642
666
elif isinstance (
643
667
message .server_message ,
644
668
StreamReadMessage .StopPartitionSessionRequest ,
645
669
):
670
+ logger .debug (
671
+ "reader stream %s stop partition %s" ,
672
+ self ._id ,
673
+ message .server_message .partition_session_id ,
674
+ )
646
675
self ._on_partition_session_stop (message .server_message )
647
676
648
677
elif isinstance (
649
678
message .server_message ,
650
679
StreamReadMessage .EndPartitionSession ,
651
680
):
681
+ logger .debug (
682
+ "reader stream %s end partition %s" ,
683
+ self ._id ,
684
+ message .server_message .partition_session_id ,
685
+ )
652
686
self ._on_end_partition_session (message .server_message )
653
687
654
688
elif isinstance (message .server_message , UpdateTokenResponse ):
@@ -663,6 +697,7 @@ async def _read_messages_loop(self):
663
697
664
698
self ._state_changed .set ()
665
699
except Exception as e :
700
+ logger .debug ("reader stream %s error: %s" , self ._id , e )
666
701
self ._set_first_error (e )
667
702
return
668
703
@@ -825,6 +860,7 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
825
860
async def _decode_batches_loop (self ):
826
861
while True :
827
862
batch = await self ._batches_to_decode .get ()
863
+ logger .debug ("reader stream %s decode batch %s messages" , self ._id , len (batch .messages ))
828
864
await self ._decode_batch_inplace (batch )
829
865
self ._add_batch_to_queue (batch )
830
866
self ._state_changed .set ()
@@ -833,9 +869,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
833
869
part_sess_id = batch ._partition_session .id
834
870
if part_sess_id in self ._message_batches :
835
871
self ._message_batches [part_sess_id ]._extend (batch )
872
+ logger .debug (
873
+ "reader stream %s extend batch partition=%s size=%s" ,
874
+ self ._id ,
875
+ part_sess_id ,
876
+ len (batch .messages ),
877
+ )
836
878
return
837
879
838
880
self ._message_batches [part_sess_id ] = batch
881
+ logger .debug (
882
+ "reader stream %s new batch partition=%s size=%s" ,
883
+ self ._id ,
884
+ part_sess_id ,
885
+ len (batch .messages ),
886
+ )
839
887
840
888
async def _decode_batch_inplace (self , batch ):
841
889
if batch ._codec == Codec .CODEC_RAW :
@@ -882,6 +930,7 @@ async def close(self, flush: bool):
882
930
return
883
931
884
932
self ._closed = True
933
+ logger .debug ("reader stream %s close" , self ._id )
885
934
886
935
if flush :
887
936
await self .flush ()
@@ -899,3 +948,5 @@ async def close(self, flush: bool):
899
948
900
949
if self ._background_tasks :
901
950
await asyncio .wait (self ._background_tasks )
951
+
952
+ logger .debug ("reader stream %s was closed" , self ._id )
0 commit comments