Skip to content

Commit 3ecab2e

Browse files
authored
feat(chat): enhance flow cancel capability (#333)
1 parent ad9bfe8 commit 3ecab2e

File tree

3 files changed

+57
-12
lines changed

3 files changed

+57
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
### Added
44
- **vector-stores**: add vector stores APIs (#324)
55

6+
### Fixed
7+
- **chat**: enhance flow cancel capability (#333)
8+
69
## 3.7.2
710
> Published 28 Apr 2024
811
Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.aallam.openai.client.internal.extension
22

33
import com.aallam.openai.client.internal.JsonLenient
4-
import io.ktor.client.call.body
5-
import io.ktor.client.statement.HttpResponse
6-
import io.ktor.utils.io.ByteReadChannel
7-
import io.ktor.utils.io.readUTF8Line
4+
import io.ktor.client.call.*
5+
import io.ktor.client.statement.*
6+
import io.ktor.utils.io.*
7+
import kotlinx.coroutines.currentCoroutineContext
88
import kotlinx.coroutines.flow.FlowCollector
9-
import kotlinx.serialization.decodeFromString
9+
import kotlinx.coroutines.isActive
1010

1111
private const val STREAM_PREFIX = "data:"
1212
private const val STREAM_END_TOKEN = "$STREAM_PREFIX [DONE]"
@@ -16,13 +16,17 @@ private const val STREAM_END_TOKEN = "$STREAM_PREFIX [DONE]"
1616
*/
1717
internal suspend inline fun <reified T> FlowCollector<T>.streamEventsFrom(response: HttpResponse) {
1818
val channel: ByteReadChannel = response.body()
19-
while (!channel.isClosedForRead) {
20-
val line = channel.readUTF8Line() ?: continue
21-
val value: T = when {
22-
line.startsWith(STREAM_END_TOKEN) -> break
23-
line.startsWith(STREAM_PREFIX) -> JsonLenient.decodeFromString(line.removePrefix(STREAM_PREFIX))
24-
else -> continue
19+
try {
20+
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
21+
val line = channel.readUTF8Line() ?: continue
22+
val value: T = when {
23+
line.startsWith(STREAM_END_TOKEN) -> break
24+
line.startsWith(STREAM_PREFIX) -> JsonLenient.decodeFromString(line.removePrefix(STREAM_PREFIX))
25+
else -> continue
26+
}
27+
emit(value)
2528
}
26-
emit(value)
29+
} finally {
30+
channel.cancel()
2731
}
2832
}

openai-client/src/commonTest/kotlin/com/aallam/openai/client/TestChatCompletions.kt

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@ package com.aallam.openai.client
22

33
import com.aallam.openai.api.chat.*
44
import com.aallam.openai.api.model.ModelId
5+
import kotlinx.coroutines.flow.collect
56
import kotlinx.coroutines.flow.launchIn
67
import kotlinx.coroutines.flow.onEach
8+
import kotlinx.coroutines.launch
9+
import kotlinx.coroutines.test.advanceTimeBy
710
import kotlinx.serialization.Serializable
811
import kotlinx.serialization.json.Json
12+
import kotlin.coroutines.cancellation.CancellationException
913
import kotlin.test.*
1014

1115
class TestChatCompletions : TestOpenAI() {
@@ -165,4 +169,38 @@ class TestChatCompletions : TestOpenAI() {
165169
assertEquals(response.usage!!.completionTokens, logprobs.content!!.size)
166170
assertEquals(logprobs.content!![0].topLogprobs?.size, expectedTopLogProbs)
167171
}
172+
173+
@Test
174+
fun cancellable() = test {
175+
val request = chatCompletionRequest {
176+
model = ModelId("gpt-3.5-turbo")
177+
messages {
178+
message {
179+
role = ChatRole.System
180+
content = "You are a helpful assistant.!"
181+
}
182+
message {
183+
role = ChatRole.User
184+
content = "Who won the world series in 2020?"
185+
}
186+
}
187+
}
188+
189+
val job = launch {
190+
try {
191+
openAI.chatCompletions(request).collect()
192+
} catch (e: CancellationException) {
193+
println("Flow was cancelled as expected.")
194+
} catch (e: Exception) {
195+
fail("Flow threw an unexpected exception: ${e.message}")
196+
}
197+
}
198+
199+
advanceTimeBy(1000)
200+
201+
job.cancel()
202+
job.join()
203+
204+
assertTrue(job.isCancelled, "Job should be cancelled")
205+
}
168206
}

0 commit comments

Comments
 (0)