21
21
import com .linkedin .venice .utils .DaemonThreadFactory ;
22
22
import com .linkedin .venice .utils .RedundantExceptionFilter ;
23
23
import com .linkedin .venice .utils .SystemTime ;
24
+ import com .linkedin .venice .utils .Time ;
24
25
import com .linkedin .venice .utils .Utils ;
25
26
import com .linkedin .venice .utils .concurrent .VeniceConcurrentHashMap ;
26
27
import io .tehuti .metrics .MetricsRepository ;
@@ -78,6 +79,8 @@ public class AggKafkaConsumerService extends AbstractVeniceService {
78
79
79
80
private final static String STUCK_CONSUMER_MSG =
80
81
"Didn't find any suspicious ingestion task, and please contact developers to investigate it further" ;
82
+ private static final String CONSUMER_POLL_WARNING_MESSAGE_PREFIX =
83
+ "Consumer poll tracker found stale topic partitions for consumer service: " ;
81
84
82
85
private final VeniceJsonSerializer <Map <String , Map <String , TopicPartitionIngestionInfo >>> topicPartitionIngestionContextJsonSerializer =
83
86
new VeniceJsonSerializer <>(new TypeReference <Map <String , Map <String , TopicPartitionIngestionInfo >>>() {
@@ -117,11 +120,14 @@ public AggKafkaConsumerService(
117
120
int intervalInSeconds = serverConfig .getStuckConsumerRepairIntervalSecond ();
118
121
this .stuckConsumerRepairExecutorService .scheduleAtFixedRate (
119
122
getStuckConsumerDetectionAndRepairRunnable (
123
+ LOGGER ,
124
+ SystemTime .INSTANCE ,
120
125
kafkaServerToConsumerServiceMap ,
121
126
versionTopicStoreIngestionTaskMapping ,
122
127
TimeUnit .SECONDS .toMillis (serverConfig .getStuckConsumerDetectionRepairThresholdSecond ()),
123
128
TimeUnit .SECONDS .toMillis (serverConfig .getNonExistingTopicIngestionTaskKillThresholdSecond ()),
124
129
TimeUnit .SECONDS .toMillis (serverConfig .getNonExistingTopicCheckRetryIntervalSecond ()),
130
+ TimeUnit .SECONDS .toMillis (serverConfig .getConsumerPollTrackerStaleThresholdSeconds ()),
125
131
new StuckConsumerRepairStats (metricsRepository ),
126
132
killIngestionTaskRunnable ),
127
133
intervalInSeconds ,
@@ -154,11 +160,14 @@ public void stopInner() throws Exception {
154
160
}
155
161
156
162
protected static Runnable getStuckConsumerDetectionAndRepairRunnable (
163
+ Logger logger ,
164
+ Time time ,
157
165
Map <String , AbstractKafkaConsumerService > kafkaServerToConsumerServiceMap ,
158
166
Map <String , StoreIngestionTask > versionTopicStoreIngestionTaskMapping ,
159
167
long stuckConsumerRepairThresholdMs ,
160
168
long nonExistingTopicIngestionTaskKillThresholdMs ,
161
169
long nonExistingTopicRetryIntervalMs ,
170
+ long consumerPollTrackerStaleThresholdMs ,
162
171
StuckConsumerRepairStats stuckConsumerRepairStats ,
163
172
Consumer <String > killIngestionTaskRunnable ) {
164
173
return () -> {
@@ -177,82 +186,113 @@ protected static Runnable getStuckConsumerDetectionAndRepairRunnable(
177
186
long maxDelayMs = consumerService .getMaxElapsedTimeMSSinceLastPollInConsumerPool ();
178
187
if (maxDelayMs >= stuckConsumerRepairThresholdMs ) {
179
188
scanStoreIngestionTaskToFixStuckConsumer = true ;
180
- LOGGER .warn ("Found some consumer has stuck for {} ms, will start the repairing procedure" , maxDelayMs );
189
+ logger .warn ("Found some consumer has stuck for {} ms, will start the repairing procedure" , maxDelayMs );
181
190
break ;
182
191
}
183
192
}
184
- if (!scanStoreIngestionTaskToFixStuckConsumer ) {
185
- return ;
186
- }
187
- stuckConsumerRepairStats .recordStuckConsumerFound ();
188
-
189
- /**
190
- * Collect a list of SITs, whose version topic doesn't exist by checking {@link StoreIngestionTask#isProducingVersionTopicHealthy()},
191
- * and this function will continue to check the version topic healthiness for a period of {@link nonExistingTopicIngestionTaskKillThresholdMs}
192
- * to tolerate transient topic discovery issue.
193
- */
194
- Map <String , StoreIngestionTask > versionTopicIngestionTaskMappingForNonExistingTopic = new HashMap <>();
195
- versionTopicStoreIngestionTaskMapping .forEach ((vt , sit ) -> {
196
- try {
197
- if (!sit .isProducingVersionTopicHealthy ()) {
198
- versionTopicIngestionTaskMappingForNonExistingTopic .put (vt , sit );
199
- LOGGER .warn ("The producing version topic:{} is not healthy" , vt );
193
+ if (scanStoreIngestionTaskToFixStuckConsumer ) {
194
+ stuckConsumerRepairStats .recordStuckConsumerFound ();
195
+
196
+ /**
197
+ * Collect a list of SITs, whose version topic doesn't exist by checking {@link StoreIngestionTask#isProducingVersionTopicHealthy()},
198
+ * and this function will continue to check the version topic healthiness for a period of {@link nonExistingTopicIngestionTaskKillThresholdMs}
199
+ * to tolerate transient topic discovery issue.
200
+ */
201
+ Map <String , StoreIngestionTask > versionTopicIngestionTaskMappingForNonExistingTopic = new HashMap <>();
202
+ versionTopicStoreIngestionTaskMapping .forEach ((vt , sit ) -> {
203
+ try {
204
+ if (!sit .isProducingVersionTopicHealthy ()) {
205
+ versionTopicIngestionTaskMappingForNonExistingTopic .put (vt , sit );
206
+ logger .warn ("The producing version topic:{} is not healthy" , vt );
207
+ }
208
+ } catch (Exception e ) {
209
+ logger .error ("Got exception while checking topic existence for version topic: {}" , vt , e );
210
+ }
211
+ });
212
+ int maxAttempts =
213
+ (int ) Math .ceil ((double ) nonExistingTopicIngestionTaskKillThresholdMs / nonExistingTopicRetryIntervalMs );
214
+ for (int cnt = 0 ; cnt < maxAttempts ; ++cnt ) {
215
+ Iterator <Map .Entry <String , StoreIngestionTask >> iterator =
216
+ versionTopicIngestionTaskMappingForNonExistingTopic .entrySet ().iterator ();
217
+ while (iterator .hasNext ()) {
218
+ Map .Entry <String , StoreIngestionTask > entry = iterator .next ();
219
+ String versionTopic = entry .getKey ();
220
+ StoreIngestionTask sit = entry .getValue ();
221
+ try {
222
+ if (sit .isProducingVersionTopicHealthy ()) {
223
+ /**
224
+ * If the version topic becomes available after retries, remove it from the tracking map.
225
+ */
226
+ iterator .remove ();
227
+ logger .info ("The producing version topic:{} becomes healthy" , versionTopic );
228
+ }
229
+ } catch (Exception e ) {
230
+ logger .error ("Got exception while checking topic existence for version topic: {}" , versionTopic , e );
231
+ } finally {
232
+ Utils .sleep (nonExistingTopicRetryIntervalMs );
233
+ }
200
234
}
201
- } catch (Exception e ) {
202
- LOGGER .error ("Got exception while checking topic existence for version topic: {}" , vt , e );
203
235
}
204
- });
205
- int maxAttempts =
206
- (int ) Math .ceil ((double ) nonExistingTopicIngestionTaskKillThresholdMs / nonExistingTopicRetryIntervalMs );
207
- for (int cnt = 0 ; cnt < maxAttempts ; ++cnt ) {
208
- Iterator <Map .Entry <String , StoreIngestionTask >> iterator =
209
- versionTopicIngestionTaskMappingForNonExistingTopic .entrySet ().iterator ();
210
- while (iterator .hasNext ()) {
211
- Map .Entry <String , StoreIngestionTask > entry = iterator .next ();
212
- String versionTopic = entry .getKey ();
213
- StoreIngestionTask sit = entry .getValue ();
236
+
237
+ AtomicBoolean hasRepairedSomeIngestionTask = new AtomicBoolean (false );
238
+ versionTopicIngestionTaskMappingForNonExistingTopic .forEach ((vt , sit ) -> {
214
239
try {
215
- if (sit .isProducingVersionTopicHealthy ()) {
216
- /**
217
- * If the version topic becomes available after retries, remove it from the tracking map.
218
- */
219
- iterator .remove ();
220
- LOGGER .info ("The producing version topic:{} becomes healthy" , versionTopic );
221
- }
240
+ logger .warn (
241
+ "The ingestion topics (version topic) are not healthy for "
242
+ + "store version: {}, will kill the ingestion task to try to unblock shared consumer" ,
243
+ vt );
244
+ /**
245
+ * The following function call will interrupt all the stuck {@link org.apache.kafka.clients.producer.KafkaProducer#send} call
246
+ * to non-existing topics.
247
+ */
248
+ sit .closeVeniceWriters (false );
249
+ killIngestionTaskRunnable .accept (vt );
250
+ hasRepairedSomeIngestionTask .set (true );
251
+ stuckConsumerRepairStats .recordIngestionTaskRepair ();
222
252
} catch (Exception e ) {
223
- LOGGER .error ("Got exception while checking topic existence for version topic: {}" , versionTopic , e );
224
- } finally {
225
- Utils .sleep (nonExistingTopicRetryIntervalMs );
253
+ logger .error ("Hit exception while killing the stuck ingestion task for store version: {}" , vt , e );
254
+ }
255
+ });
256
+ if (!hasRepairedSomeIngestionTask .get ()) {
257
+ if (!REDUNDANT_LOGGING_FILTER .isRedundantException (STUCK_CONSUMER_MSG )) {
258
+ logger .warn (STUCK_CONSUMER_MSG );
226
259
}
260
+ stuckConsumerRepairStats .recordRepairFailure ();
227
261
}
228
262
}
229
263
230
- AtomicBoolean hasRepairedSomeIngestionTask = new AtomicBoolean (false );
231
- versionTopicIngestionTaskMappingForNonExistingTopic .forEach ((vt , sit ) -> {
232
- try {
233
- LOGGER .warn (
234
- "The ingestion topics (version topic) are not healthy for "
235
- + "store version: {}, will kill the ingestion task to try to unblock shared consumer" ,
236
- vt );
237
- /**
238
- * The following function call will interrupt all the stuck {@link org.apache.kafka.clients.producer.KafkaProducer#send} call
239
- * to non-existing topics.
240
- */
241
- sit .closeVeniceWriters (false );
242
- killIngestionTaskRunnable .accept (vt );
243
- hasRepairedSomeIngestionTask .set (true );
244
- stuckConsumerRepairStats .recordIngestionTaskRepair ();
245
- } catch (Exception e ) {
246
- LOGGER .error ("Hit exception while killing the stuck ingestion task for store version: {}" , vt , e );
247
- }
248
- });
249
- if (!hasRepairedSomeIngestionTask .get ()) {
250
- if (!REDUNDANT_LOGGING_FILTER .isRedundantException (STUCK_CONSUMER_MSG )) {
251
- LOGGER .warn (STUCK_CONSUMER_MSG );
264
+ reportStaleTopicPartitions (logger , time , kafkaServerToConsumerServiceMap , consumerPollTrackerStaleThresholdMs );
265
+ };
266
+ }
267
+
268
+ private static void reportStaleTopicPartitions (
269
+ Logger logger ,
270
+ Time time ,
271
+ Map <String , AbstractKafkaConsumerService > kafkaServerToConsumerServiceMap ,
272
+ long consumerPollTrackerStaleThresholdMs ) {
273
+ StringBuilder stringBuilder = new StringBuilder ();
274
+ long now = time .getMilliseconds ();
275
+ // Detect and log any subscribed topic partitions that are not polling records
276
+ for (Map .Entry <String , AbstractKafkaConsumerService > consumerService : kafkaServerToConsumerServiceMap .entrySet ()) {
277
+ Map <PubSubTopicPartition , Long > staleTopicPartitions =
278
+ consumerService .getValue ().getStaleTopicPartitions (now - consumerPollTrackerStaleThresholdMs );
279
+ if (!staleTopicPartitions .isEmpty ()) {
280
+ stringBuilder .append (CONSUMER_POLL_WARNING_MESSAGE_PREFIX );
281
+ stringBuilder .append (consumerService .getKey ());
282
+ for (Map .Entry <PubSubTopicPartition , Long > staleTopicPartition : staleTopicPartitions .entrySet ()) {
283
+ stringBuilder .append ("\n topic: " );
284
+ stringBuilder .append (staleTopicPartition .getKey ().getTopicName ());
285
+ stringBuilder .append (" partition: " );
286
+ stringBuilder .append (staleTopicPartition .getKey ().getPartitionNumber ());
287
+ stringBuilder .append (" stale for: " );
288
+ stringBuilder .append (now - staleTopicPartition .getValue ());
289
+ stringBuilder .append ("ms" );
252
290
}
253
- stuckConsumerRepairStats .recordRepairFailure ();
291
+ logger .warn (stringBuilder .toString ());
292
+ // clear the StringBuilder to be reused for next reporting cycle
293
+ stringBuilder .setLength (0 );
254
294
}
255
- };
295
+ }
256
296
}
257
297
258
298
/**
0 commit comments