@@ -121,6 +121,7 @@ async def receive_batch(
121
121
122
122
use asyncio.wait_for for wait with timeout.
123
123
"""
124
+ logger .debug ("receive_batch max_messages=%s" , max_messages )
124
125
await self ._reconnector .wait_message ()
125
126
return self ._reconnector .receive_batch_nowait (
126
127
max_messages = max_messages ,
@@ -137,6 +138,7 @@ async def receive_batch_with_tx(
137
138
138
139
use asyncio.wait_for for wait with timeout.
139
140
"""
141
+ logger .debug ("receive_batch_with_tx tx=%s max_messages=%s" , tx , max_messages )
140
142
await self ._reconnector .wait_message ()
141
143
return self ._reconnector .receive_batch_with_tx_nowait (
142
144
tx = tx ,
@@ -149,6 +151,7 @@ async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
149
151
150
152
use asyncio.wait_for for wait with timeout.
151
153
"""
154
+ logger .debug ("receive_message" )
152
155
await self ._reconnector .wait_message ()
153
156
return self ._reconnector .receive_message_nowait ()
154
157
@@ -159,6 +162,7 @@ def commit(self, batch: typing.Union[datatypes.PublicMessage, datatypes.PublicBa
159
162
For the method no way check the commit result
160
163
(for example if lost connection - commits will not re-send and committed messages will receive again).
161
164
"""
165
+ logger .debug ("commit message or batch" )
162
166
if self ._settings .consumer is None :
163
167
raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
164
168
@@ -177,6 +181,7 @@ async def commit_with_ack(self, batch: typing.Union[datatypes.PublicMessage, dat
177
181
before receive commit ack. Message may be acked or not (if not - it will send in other read session,
178
182
to this or other reader).
179
183
"""
184
+ logger .debug ("commit_with_ack message or batch" )
180
185
if self ._settings .consumer is None :
181
186
raise issues .Error ("Commit operations are not supported for topic reader without consumer." )
182
187
@@ -187,8 +192,10 @@ async def close(self, flush: bool = True):
187
192
if self ._closed :
188
193
raise TopicReaderClosedError ()
189
194
195
+ logger .debug ("Close topic reader" )
190
196
self ._closed = True
191
197
await self ._reconnector .close (flush )
198
+ logger .debug ("Topic reader was closed" )
192
199
193
200
@property
194
201
def read_session_id (self ) -> Optional [str ]:
@@ -219,6 +226,7 @@ def __init__(
219
226
self ._driver = driver
220
227
self ._loop = loop if loop is not None else asyncio .get_running_loop ()
221
228
self ._background_tasks = set ()
229
+ logger .debug ("init reader reconnector id=%s" , self ._id )
222
230
223
231
self ._state_changed = asyncio .Event ()
224
232
self ._stream_reader = None
@@ -231,13 +239,18 @@ async def _connection_loop(self):
231
239
attempt = 0
232
240
while True :
233
241
try :
242
+ logger .debug ("reader %s connect attempt %s" , self ._id , attempt )
234
243
self ._stream_reader = await ReaderStream .create (self ._id , self ._driver , self ._settings )
244
+ logger .debug (
245
+ "reader %s connected stream %s" , self ._id , self ._stream_reader ._id
246
+ )
235
247
attempt = 0
236
248
self ._state_changed .set ()
237
249
await self ._stream_reader .wait_error ()
238
250
except BaseException as err :
239
251
retry_info = check_retriable_error (err , self ._settings ._retry_settings (), attempt )
240
252
if not retry_info .is_retriable :
253
+ logger .debug ("reader %s stop connection loop due to %s" , self ._id , err )
241
254
self ._set_first_error (err )
242
255
return
243
256
@@ -358,6 +371,7 @@ def commit(self, batch: datatypes.ICommittable) -> datatypes.PartitionSession.Co
358
371
return self ._stream_reader .commit (batch )
359
372
360
373
async def close (self , flush : bool ):
374
+ logger .debug ("reader reconnector %s close" , self ._id )
361
375
if self ._stream_reader :
362
376
await self ._stream_reader .close (flush )
363
377
for task in self ._background_tasks :
@@ -447,6 +461,10 @@ def __init__(
447
461
448
462
self ._settings = settings
449
463
464
+ logger .debug (
465
+ "created ReaderStream id=%s reconnector=%s" , self ._id , self ._reader_reconnector_id
466
+ )
467
+
450
468
@staticmethod
451
469
async def create (
452
470
reader_reconnector_id : int ,
@@ -464,6 +482,9 @@ async def create(
464
482
get_token_function = creds .get_auth_token if creds else None ,
465
483
)
466
484
await reader ._start (stream , settings ._init_message ())
485
+ logger .debug (
486
+ "reader stream %s started session=%s" , reader ._id , reader ._session_id
487
+ )
467
488
return reader
468
489
469
490
async def _start (self , stream : IGrpcWrapperAsyncIO , init_message : StreamReadMessage .InitRequest ):
@@ -472,11 +493,17 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
472
493
473
494
self ._started = True
474
495
self ._stream = stream
496
+ logger .debug (
497
+ "reader stream %s send init request" , self ._id
498
+ )
475
499
476
500
stream .write (StreamReadMessage .FromClient (client_message = init_message ))
477
501
init_response = await stream .receive () # type: StreamReadMessage.FromServer
478
502
if isinstance (init_response .server_message , StreamReadMessage .InitResponse ):
479
503
self ._session_id = init_response .server_message .session_id
504
+ logger .debug (
505
+ "reader stream %s initialized session=%s" , self ._id , self ._session_id
506
+ )
480
507
else :
481
508
raise TopicReaderError ("Unexpected message after InitRequest: %s" , init_response )
482
509
@@ -615,6 +642,7 @@ async def _handle_background_errors(self):
615
642
616
643
async def _read_messages_loop (self ):
617
644
try :
645
+ logger .debug ("reader stream %s start read loop" , self ._id )
618
646
self ._stream .write (
619
647
StreamReadMessage .FromClient (
620
648
client_message = StreamReadMessage .ReadRequest (
@@ -628,6 +656,9 @@ async def _read_messages_loop(self):
628
656
_process_response (message .server_status )
629
657
630
658
if isinstance (message .server_message , StreamReadMessage .ReadResponse ):
659
+ logger .debug (
660
+ "reader stream %s read %s bytes" , self ._id , message .server_message .bytes_size
661
+ )
631
662
self ._on_read_response (message .server_message )
632
663
633
664
elif isinstance (message .server_message , StreamReadMessage .CommitOffsetResponse ):
@@ -637,18 +668,33 @@ async def _read_messages_loop(self):
637
668
message .server_message ,
638
669
StreamReadMessage .StartPartitionSessionRequest ,
639
670
):
671
+ logger .debug (
672
+ "reader stream %s start partition %s" ,
673
+ self ._id ,
674
+ message .server_message .partition_session .partition_session_id ,
675
+ )
640
676
await self ._on_start_partition_session (message .server_message )
641
677
642
678
elif isinstance (
643
679
message .server_message ,
644
680
StreamReadMessage .StopPartitionSessionRequest ,
645
681
):
682
+ logger .debug (
683
+ "reader stream %s stop partition %s" ,
684
+ self ._id ,
685
+ message .server_message .partition_session_id ,
686
+ )
646
687
self ._on_partition_session_stop (message .server_message )
647
688
648
689
elif isinstance (
649
690
message .server_message ,
650
691
StreamReadMessage .EndPartitionSession ,
651
692
):
693
+ logger .debug (
694
+ "reader stream %s end partition %s" ,
695
+ self ._id ,
696
+ message .server_message .partition_session_id ,
697
+ )
652
698
self ._on_end_partition_session (message .server_message )
653
699
654
700
elif isinstance (message .server_message , UpdateTokenResponse ):
@@ -663,6 +709,7 @@ async def _read_messages_loop(self):
663
709
664
710
self ._state_changed .set ()
665
711
except Exception as e :
712
+ logger .debug ("reader stream %s error: %s" , self ._id , e )
666
713
self ._set_first_error (e )
667
714
return
668
715
@@ -825,6 +872,9 @@ def _read_response_to_batches(self, message: StreamReadMessage.ReadResponse) ->
825
872
async def _decode_batches_loop (self ):
826
873
while True :
827
874
batch = await self ._batches_to_decode .get ()
875
+ logger .debug (
876
+ "reader stream %s decode batch %s messages" , self ._id , len (batch .messages )
877
+ )
828
878
await self ._decode_batch_inplace (batch )
829
879
self ._add_batch_to_queue (batch )
830
880
self ._state_changed .set ()
@@ -833,9 +883,21 @@ def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
833
883
part_sess_id = batch ._partition_session .id
834
884
if part_sess_id in self ._message_batches :
835
885
self ._message_batches [part_sess_id ]._extend (batch )
886
+ logger .debug (
887
+ "reader stream %s extend batch partition=%s size=%s" ,
888
+ self ._id ,
889
+ part_sess_id ,
890
+ len (batch .messages ),
891
+ )
836
892
return
837
893
838
894
self ._message_batches [part_sess_id ] = batch
895
+ logger .debug (
896
+ "reader stream %s new batch partition=%s size=%s" ,
897
+ self ._id ,
898
+ part_sess_id ,
899
+ len (batch .messages ),
900
+ )
839
901
840
902
async def _decode_batch_inplace (self , batch ):
841
903
if batch ._codec == Codec .CODEC_RAW :
@@ -882,6 +944,7 @@ async def close(self, flush: bool):
882
944
return
883
945
884
946
self ._closed = True
947
+ logger .debug ("reader stream %s close" , self ._id )
885
948
886
949
if flush :
887
950
await self .flush ()
@@ -899,3 +962,5 @@ async def close(self, flush: bool):
899
962
900
963
if self ._background_tasks :
901
964
await asyncio .wait (self ._background_tasks )
965
+
966
+ logger .debug ("reader stream %s was closed" , self ._id )
0 commit comments