Skip to content

Commit 2850735

Browse files
authored
Merge pull request #200 from tKe/fix/event-loop-poll
fix: event loop stops polling after empty record batch
2 parents 9d7604f + 2827eaa commit 2850735

File tree

2 files changed

+107
-18
lines changed

2 files changed

+107
-18
lines changed

src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/EventLoop.kt

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import kotlin.time.toJavaDuration
1919
import kotlinx.coroutines.CoroutineScope
2020
import kotlinx.coroutines.CoroutineStart.LAZY
2121
import kotlinx.coroutines.CoroutineStart.UNDISPATCHED
22-
import kotlinx.coroutines.Dispatchers.Default
22+
import kotlinx.coroutines.DelicateCoroutinesApi
2323
import kotlinx.coroutines.ExperimentalCoroutinesApi
2424
import kotlinx.coroutines.InternalCoroutinesApi
2525
import kotlinx.coroutines.cancelAndJoin
@@ -44,6 +44,7 @@ import org.slf4j.Logger
4444
import org.slf4j.LoggerFactory
4545
import kotlin.coroutines.CoroutineContext
4646
import kotlin.time.Duration
47+
import kotlin.time.Duration.Companion.milliseconds
4748

4849
private val logger: Logger =
4950
LoggerFactory.getLogger(EventLoop::class.java)
@@ -83,7 +84,7 @@ internal class EventLoop<K, V>(
8384
channel.consumeAsFlow()
8485
.onStart {
8586
if (topicNames.isNotEmpty()) subscribe(topicNames)
86-
withContext(scope.coroutineContext) { poll() }
87+
schedulePoll()
8788
commitManager.start()
8889
}.onCompletion {
8990
commitBatchSignal.close()
@@ -120,8 +121,20 @@ internal class EventLoop<K, V>(
120121
return pausedNow
121122
}
122123

124+
private val scheduled = AtomicBoolean(false)
125+
private fun schedulePoll() {
126+
if (scheduled.compareAndSet(false, true)) {
127+
scope.launch {
128+
scheduled.set(false)
129+
@OptIn(DelicateCoroutinesApi::class)
130+
if (!channel.isClosedForSend) poll()
131+
}
132+
}
133+
}
134+
123135
@ConsumerThread
124136
private fun poll() {
137+
checkConsumerThread("poll")
125138
try {
126139
runCommitIfRequired(false)
127140

@@ -164,32 +177,35 @@ internal class EventLoop<K, V>(
164177
ConsumerRecords.empty()
165178
}
166179

167-
if (!records.isEmpty) {
180+
if (records.isEmpty) {
181+
schedulePoll()
182+
} else {
168183
if (settings.maxDeferredCommits > 0) {
169184
commitBatch.addUncommitted(records)
170185
}
171186
logger.debug("Attempting to send ${records.count()} records to Channel")
172187
channel.trySend(records)
173-
.onSuccess { poll() }
188+
.onSuccess { schedulePoll() }
174189
.onClosed { error -> logger.error("Channel closed when trying to send records.", error) }
175190
.onFailure { error ->
176191
if (error != null) {
177192
logger.error("Channel send failed when trying to send records.", error)
178193
closeChannel(error)
179-
} else logger.debug("Back-pressuring kafka consumer. Might pause KafkaConsumer on next poll tick.")
180-
181-
isPolling.set(false)
182-
183-
scope.launch(outerContext) {
184-
/* Send the records down,
185-
* when send returns we attempt to send and empty set of records down to test the backpressure.
186-
* If our "backpressure test" returns we start requesting/polling again. */
187-
channel.send(records)
188-
if (isPaused.get()) {
189-
consumer.wakeup()
194+
} else {
195+
logger.debug("Back-pressuring kafka consumer. Might pause KafkaConsumer on next poll tick.")
196+
197+
isPolling.set(false)
198+
scope.launch(outerContext) {
199+
/* Send the records down,
200+
* when send returns we attempt to send and empty set of records down to test the backpressure.
201+
* If our "backpressure test" returns we start requesting/polling again. */
202+
channel.send(records)
203+
if (isPaused.get()) {
204+
consumer.wakeup()
205+
}
206+
isPolling.set(true)
207+
schedulePoll()
190208
}
191-
isPolling.set(true)
192-
poll()
193209
}
194210
}
195211
}
@@ -567,7 +583,7 @@ private annotation class ConsumerThread
567583
private const val DEBUG: Boolean = true
568584

569585
private fun checkConsumerThread(msg: String): Unit =
570-
if (DEBUG) require(
586+
if (DEBUG) check(
571587
Thread.currentThread().name.startsWith("kotlin-kafka-")
572588
) { "$msg => should run on kotlin-kafka thread, but found ${Thread.currentThread().name}" }
573589
else Unit

src/test/kotlin/io/github/nomisrev/kafka/receiver/KafakReceiverSpec.kt

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import io.github.nomisrev.kafka.mapIndexed
88
import kotlinx.coroutines.CompletableDeferred
99
import kotlinx.coroutines.ExperimentalCoroutinesApi
1010
import kotlinx.coroutines.cancelAndJoin
11+
import kotlinx.coroutines.flow.buffer
1112
import kotlinx.coroutines.flow.collect
1213
import kotlinx.coroutines.flow.collectIndexed
14+
import kotlinx.coroutines.flow.count
1315
import kotlinx.coroutines.flow.flatMapConcat
1416
import kotlinx.coroutines.flow.flattenMerge
1517
import kotlinx.coroutines.flow.flowOf
@@ -19,7 +21,9 @@ import kotlinx.coroutines.flow.onEach
1921
import kotlinx.coroutines.flow.take
2022
import kotlinx.coroutines.flow.toList
2123
import kotlinx.coroutines.flow.toSet
24+
import kotlinx.coroutines.launch
2225
import kotlinx.coroutines.yield
26+
import org.apache.kafka.clients.consumer.ConsumerConfig
2327
import org.apache.kafka.clients.producer.ProducerRecord
2428
import org.junit.jupiter.api.Test
2529
import kotlin.test.assertEquals
@@ -53,6 +57,75 @@ class KafakReceiverSpec : KafkaSpec() {
5357
)
5458
}
5559

60+
@Test
61+
fun `All records produced while consuming are received`() = withTopic {
62+
launch { publishToKafka(topic, produced) }
63+
assertEquals(
64+
KafkaReceiver()
65+
.receive(topic.name())
66+
.map { record ->
67+
Pair(record.key(), record.value())
68+
.also { record.offset.acknowledge() }
69+
}.take(count)
70+
.toSet(),
71+
produced.toSet()
72+
)
73+
}
74+
75+
@Test
76+
fun `Continuous receiving does not overflow stack`() = withTopic {
77+
val largeCount = 20_000 // when it _was_ overflowing stack, it only took ~4095 messages.
78+
publishScope {
79+
repeat(largeCount) {
80+
offer(ProducerRecord(topic.name(), "key-$it", "value-$it"))
81+
}
82+
}
83+
val settings = receiverSetting().copy(
84+
maxDeferredCommits = largeCount + 1,
85+
commitStrategy = CommitStrategy.BySize(largeCount + 1),
86+
properties = mapOf(ConsumerConfig.MAX_POLL_RECORDS_CONFIG to "1").toProperties()
87+
)
88+
assertEquals(
89+
KafkaReceiver(settings)
90+
.receive(topic.name())
91+
.buffer(largeCount)
92+
.take(largeCount)
93+
.count(),
94+
largeCount
95+
)
96+
}
97+
98+
@Test
99+
fun `Can consume after backpressure`() = withTopic {
100+
publishToKafka(topic, produced)
101+
assertEquals(
102+
KafkaReceiver()
103+
.receive(topic.name())
104+
.map { record ->
105+
yield()
106+
Pair(record.key(), record.value())
107+
.also { record.offset.acknowledge() }
108+
}.take(count)
109+
.toSet(),
110+
produced.toSet()
111+
)
112+
}
113+
114+
@Test
115+
fun `Concurrent commits while receiving`() = withTopic {
116+
publishToKafka(topic, produced)
117+
val receiver = KafkaReceiver(
118+
receiverSetting().copy(
119+
commitStrategy = CommitStrategy.BySize(count / 2)
120+
)
121+
)
122+
receiver.receive(topic.name())
123+
.take(count)
124+
.collect { it.offset.acknowledge() }
125+
126+
assertEquals(receiver.committedCount(topic.name()), count.toLong())
127+
}
128+
56129
@Test
57130
fun `All produced records with headers are received`() = withTopic(partitions = 1) {
58131
val producerRecords = produced.map { (key, value) ->

0 commit comments

Comments
 (0)