Skip to content

Commit eba9489

Browse files
authored
Merge pull request #185 from nomisRev/beta5
2.0.0-Beta5 build failure
2 parents 2850735 + b54a9b5 commit eba9489

File tree

5 files changed

+40
-34
lines changed

5 files changed

+40
-34
lines changed

build.gradle.kts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,18 @@
1-
import com.bnorm.power.PowerAssertGradleExtension
21
import kotlinx.knit.KnitPluginExtension
3-
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
42
import org.gradle.api.tasks.testing.logging.TestExceptionFormat.*
5-
import org.gradle.api.tasks.testing.logging.TestLogEvent
63
import org.gradle.api.tasks.testing.logging.TestLogEvent.FAILED
7-
import org.gradle.api.tasks.testing.logging.TestLogEvent.PASSED
84
import org.gradle.api.tasks.testing.logging.TestLogEvent.SKIPPED
95
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_ERROR
10-
import org.gradle.api.tasks.testing.logging.TestLogEvent.STANDARD_OUT
116
import org.jetbrains.dokka.gradle.DokkaTask
7+
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion.KOTLIN_2_0
128

139
plugins {
1410
alias(libs.plugins.kotlin.jvm)
11+
alias(libs.plugins.kotlin.assert)
1512
alias(libs.plugins.dokka)
1613
alias(libs.plugins.spotless)
1714
alias(libs.plugins.knit)
1815
alias(libs.plugins.publish)
19-
alias(libs.plugins.power.assert)
2016
}
2117

2218
repositories {
@@ -38,10 +34,15 @@ dependencies {
3834
testImplementation(libs.kotlinx.coroutines.test)
3935
}
4036

41-
configure<PowerAssertGradleExtension> {
37+
@Suppress("OPT_IN_USAGE")
38+
powerAssert {
4239
functions = listOf("kotlin.test.assertEquals")
4340
}
4441

42+
//configure<PowerAssertGradleExtension> {
43+
// functions = listOf("kotlin.test.assertEquals")
44+
//}
45+
4546
configure<KnitPluginExtension> {
4647
siteRoot = "https://nomisrev.github.io/kotlin-kafka/"
4748
}
@@ -54,6 +55,10 @@ configure<JavaPluginExtension> {
5455

5556
kotlin {
5657
explicitApi()
58+
compilerOptions {
59+
languageVersion.set(KOTLIN_2_0)
60+
apiVersion.set(KOTLIN_2_0)
61+
}
5762
}
5863

5964
tasks {

gradle/libs.versions.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[versions]
22
kotest = "5.8.1"
33
kafka = "3.7.0"
4-
kotlin = "1.9.23"
4+
kotlin = "2.0.0-RC2"
55
kotlinx-coroutines = "1.8.0"
66
dokka = "1.9.20"
77
knit = "0.5.0"
@@ -10,7 +10,6 @@ testcontainers-kafka = "1.19.7"
1010
slf4j = "2.0.12"
1111
spotless="6.25.0"
1212
publish="0.28.0"
13-
power-assert="0.13.0"
1413

1514
[libraries]
1615
kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" }
@@ -28,9 +27,9 @@ slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
2827

2928
[plugins]
3029
kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
30+
kotlin-assert = { id = "org.jetbrains.kotlin.plugin.power-assert", version.ref="kotlin" }
3131
dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" }
3232
kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" }
3333
spotless = { id = "com.diffplug.spotless", version.ref = "spotless" }
3434
publish = { id = "com.vanniktech.maven.publish", version.ref="publish" }
3535
knit = { id = "org.jetbrains.kotlinx.knit", version.ref="knit" }
36-
power-assert = { id = "com.bnorm.power.kotlin-power-assert", version.ref="power-assert" }

src/main/kotlin/io/github/nomisRev/kafka/internal/FlowTimeChunked.kt

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import kotlinx.coroutines.channels.Channel
1010
import kotlinx.coroutines.channels.ReceiveChannel
1111
import kotlinx.coroutines.channels.getOrElse
1212
import kotlinx.coroutines.channels.produce
13+
import kotlinx.coroutines.coroutineScope
1314
import kotlinx.coroutines.flow.Flow
14-
import kotlinx.coroutines.flow.internal.scopedFlow
15+
import kotlinx.coroutines.flow.flow
1516
import kotlinx.coroutines.selects.whileSelect
1617
import kotlinx.coroutines.selects.onTimeout
1718
import kotlin.time.Duration
@@ -45,30 +46,32 @@ public fun <T> Flow<T>.chunked(
4546
): Flow<List<T>> {
4647
require(size > 0) { "Cannot create chunks smaller than 0 but found $size" }
4748
require(!duration.isNegative() && duration != Duration.ZERO) { "Chunk duration should be positive non-zero duration" }
48-
return scopedFlow { downstream ->
49-
val emitNowAndMaybeContinue = Channel<Boolean>(capacity = Channel.RENDEZVOUS)
50-
val elements = produce(capacity = size) {
51-
collect { element ->
52-
val hasCapacity = channel.trySend(element).isSuccess
53-
if (!hasCapacity) {
54-
emitNowAndMaybeContinue.send(true)
55-
channel.send(element)
49+
return flow {
50+
coroutineScope {
51+
val emitNowAndMaybeContinue = Channel<Boolean>(capacity = Channel.RENDEZVOUS)
52+
val elements = produce(capacity = size) {
53+
collect { element ->
54+
val hasCapacity = channel.trySend(element).isSuccess
55+
if (!hasCapacity) {
56+
emitNowAndMaybeContinue.send(true)
57+
channel.send(element)
58+
}
5659
}
60+
emitNowAndMaybeContinue.send(false)
5761
}
58-
emitNowAndMaybeContinue.send(false)
59-
}
6062

61-
whileSelect {
62-
emitNowAndMaybeContinue.onReceive { shouldContinue ->
63-
val chunk = elements.drain(maxElements = size)
64-
if (chunk.isNotEmpty()) downstream.emit(chunk)
65-
shouldContinue
66-
}
63+
whileSelect {
64+
emitNowAndMaybeContinue.onReceive { shouldContinue ->
65+
val chunk = elements.drain(maxElements = size)
66+
if (chunk.isNotEmpty()) emit(chunk)
67+
shouldContinue
68+
}
6769

68-
onTimeout(duration) {
69-
val chunk: List<T> = elements.drain(maxElements = size)
70-
if (chunk.isNotEmpty()) downstream.emit(chunk)
71-
true
70+
onTimeout(duration) {
71+
val chunk: List<T> = elements.drain(maxElements = size)
72+
if (chunk.isNotEmpty()) emit(chunk)
73+
true
74+
}
7275
}
7376
}
7477
}

src/main/kotlin/io/github/nomisRev/kafka/publisher/PublisherScope.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
@file:JvmMultifileClass @file:JvmName("PublisherScope.kt")
2-
1+
//@file:JvmName("PublisherScope")
32
package io.github.nomisRev.kafka.publisher
43

54
import kotlinx.coroutines.CoroutineScope

src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/CommittableBatch.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ internal class CommittableBatch {
9393
fun getAndClearOffsets(): CommitArgs {
9494
val offsetMap: MutableMap<TopicPartition, OffsetAndMetadata> = HashMap()
9595
if (outOfOrderCommits) {
96-
deferred.forEach { (tp: TopicPartition, offsets: List<Long>) ->
96+
deferred.forEach { (tp: TopicPartition, offsets: MutableList<Long>) ->
9797
if (offsets.size > 0) {
9898
offsets.sort()
9999
val uncomittedThisPart: MutableList<Long> = uncommitted[tp]!!

0 commit comments

Comments
 (0)