Skip to content

Commit 04ab4a6

Browse files
DaltomonNicholas Dalton
and
Nicholas Dalton
authored
feat(assistant): add streaming (#400)
* Added feature assistant-streaming * Fixed THREAD_MESSAGE_CREATED data type --------- Co-authored-by: Nicholas Dalton <nick.dalton@hq.bill.com>
1 parent 4517677 commit 04ab4a6

File tree

12 files changed

+593
-2
lines changed

12 files changed

+593
-2
lines changed

guides/GettingStarted.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -849,3 +849,57 @@ val runSteps = openAI.runSteps(
849849
runId = RunId("run_abc123")
850850
)
851851
```
852+
853+
### Event streaming
854+
855+
Create a thread and run it in one request and process streaming events.
856+
857+
```kotlin
858+
openAI.createStreamingThreadRun(
859+
request = ThreadRunRequest(
860+
assistantId = AssistantId("asst_abc123"),
861+
thread = ThreadRequest(
862+
messages = listOf(
863+
ThreadMessage(
864+
role = Role.User,
865+
content = "Explain deep learning to a 5 year old."
866+
)
867+
)
868+
),
869+
)
870+
.onEach { assistantStreamEvent: AssistantStreamEvent -> println(assistantStreamEvent) }
871+
.collect()
872+
)
873+
```
874+
875+
Get data object from AssistantStreamEvent.
876+
877+
```kotlin
878+
//Type of data for generic type can be found in AssistantStreamEventType
879+
when(assistantStreamEvent.type) {
880+
AssistantStreamEventType.THREAD_CREATED -> {
881+
val thread = assistantStreamEvent.getData<Thread>()
882+
...
883+
}
884+
AssistantStreamEventType.MESSAGE_CREATED -> {
885+
val message = assistantStreamEvent.getData<Message>()
886+
...
887+
}
888+
AssistantStreamEventType.UNKNOWN -> {
889+
//Data field is a string and can be used instead of calling getData
890+
val data = assistantStreamEvent.data
891+
//Handle unknown message type
892+
}
893+
}
894+
```
895+
896+
If a new event type is released before the library is updated, you can create and deserialize your own type by providing a KSerializer.
897+
898+
```kotlin
899+
when(assistantStreamEvent.type) {
900+
AssistantStreamEventType.UNKNOWN -> {
901+
val data = assistantStreamEvent.getDate<MyCustomType>(myCustomSerializer)
902+
...
903+
}
904+
}
905+
```

openai-client/src/commonMain/kotlin/com.aallam.openai.client/Runs.kt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import com.aallam.openai.api.core.SortOrder
66
import com.aallam.openai.api.core.Status
77
import com.aallam.openai.api.run.*
88
import com.aallam.openai.api.thread.ThreadId
9+
import io.ktor.sse.ServerSentEvent
10+
import kotlinx.coroutines.flow.Flow
911

1012
/**
1113
* Represents an execution run on a thread.
@@ -23,6 +25,21 @@ public interface Runs {
2325
@BetaOpenAI
2426
public suspend fun createRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions? = null): Run
2527

28+
/**
29+
* Create a run with event streaming.
30+
*
31+
* @param threadId The ID of the thread to run
32+
* @param request request for a run
33+
* @param requestOptions request options.
34+
* @param block a lambda function that will be called for each event.
35+
*/
36+
@BetaOpenAI
37+
public suspend fun createStreamingRun(
38+
threadId: ThreadId,
39+
request: RunRequest,
40+
requestOptions: RequestOptions? = null
41+
) : Flow<AssistantStreamEvent>
42+
2643
/**
2744
* Retrieves a run.
2845
*
@@ -92,6 +109,25 @@ public interface Runs {
92109
requestOptions: RequestOptions? = null
93110
): Run
94111

112+
/**
113+
* When a run has the status: [Status.RequiresAction] and required action is [RequiredAction.SubmitToolOutputs],
114+
* this endpoint can be used to submit the outputs from the tool calls once they're all completed.
115+
* All outputs must be submitted in a single request using event streaming.
116+
*
117+
* @param threadId the ID of the thread to which this run belongs
118+
* @param runId the ID of the run to submit tool outputs for
119+
* @param output list of tool outputs to submit
120+
* @param requestOptions request options.
121+
* @param block a lambda function that will be called for each event.
122+
*/
123+
@BetaOpenAI
124+
public suspend fun submitStreamingToolOutput(
125+
threadId: ThreadId,
126+
runId: RunId,
127+
output: List<ToolOutput>,
128+
requestOptions: RequestOptions? = null
129+
) : Flow<AssistantStreamEvent>
130+
95131
/**
96132
* Cancels a run that is [Status.InProgress].
97133
*
@@ -111,6 +147,19 @@ public interface Runs {
111147
@BetaOpenAI
112148
public suspend fun createThreadRun(request: ThreadRunRequest, requestOptions: RequestOptions? = null): Run
113149

150+
/**
151+
* Create a thread and run it in one request with event streaming.
152+
*
153+
* @param request request for a thread run
154+
* @param requestOptions request options.
155+
* @param block a lambda function that will be called for each event.
156+
*/
157+
@BetaOpenAI
158+
public suspend fun createStreamingThreadRun(
159+
request: ThreadRunRequest,
160+
requestOptions: RequestOptions? = null
161+
) : Flow<AssistantStreamEvent>
162+
114163
/**
115164
* Retrieves a run step.
116165
*
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.aallam.openai.client.extension
2+
3+
import com.aallam.openai.api.run.AssistantStreamEvent
4+
import com.aallam.openai.client.internal.JsonLenient
5+
import kotlinx.serialization.KSerializer
6+
7+
/**
8+
* Get the data of the [AssistantStreamEvent] using the provided [serializer] from the corresponding event type.
9+
* @param <T> the type of the data.
10+
* @throws IllegalStateException if the [AssistantStreamEvent] data is null.
11+
* @throws ClassCastException if the [AssistantStreamEvent] data cannot be cast to the provided type.
12+
*/
13+
@Suppress("UNCHECKED_CAST")
14+
public fun <T> AssistantStreamEvent.getData(): T {
15+
return type
16+
.let { it.serializer as? KSerializer<T> }
17+
?.let(::getData)
18+
?: throw IllegalStateException("Failed to decode ServerSentEvent: $rawType")
19+
}
20+
21+
22+
/**
23+
* Get the data of the [AssistantStreamEvent] using the provided [serializer].
24+
* @throws IllegalStateException if the [AssistantStreamEvent] data is null.
25+
* @throws ClassCastException if the [AssistantStreamEvent] data cannot be cast to the provided type.
26+
*/
27+
public fun <T> AssistantStreamEvent.getData(serializer: KSerializer<T>): T =
28+
data
29+
?.let { JsonLenient.decodeFromString(serializer, it) }
30+
?: throw IllegalStateException("ServerSentEvent data was null: $rawType")
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.aallam.openai.client.extension
2+
3+
import com.aallam.openai.api.run.AssistantStreamEvent
4+
import com.aallam.openai.api.run.AssistantStreamEventType
5+
import com.aallam.openai.client.internal.JsonLenient
6+
import io.ktor.sse.ServerSentEvent
7+
import kotlinx.serialization.KSerializer
8+
9+
/**
10+
* Convert a [ServerSentEvent] to [AssistantStreamEvent]. Type will be [AssistantStreamEventType.UNKNOWN] if the event is null or unrecognized.
11+
*/
12+
internal fun ServerSentEvent.toAssistantStreamEvent() : AssistantStreamEvent =
13+
AssistantStreamEvent(
14+
event,
15+
event
16+
?.let(AssistantStreamEventType::fromEvent)
17+
?:AssistantStreamEventType.UNKNOWN,
18+
data
19+
)

openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/HttpClient.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.ktor.client.plugins.auth.*
1111
import io.ktor.client.plugins.auth.providers.*
1212
import io.ktor.client.plugins.contentnegotiation.*
1313
import io.ktor.client.plugins.logging.*
14+
import io.ktor.client.plugins.sse.SSE
1415
import io.ktor.http.*
1516
import io.ktor.serialization.kotlinx.*
1617
import io.ktor.util.*
@@ -71,6 +72,8 @@ internal fun createHttpClient(config: OpenAIConfig): HttpClient {
7172
exponentialDelay(config.retry.base, config.retry.maxDelay.inWholeMilliseconds)
7273
}
7374

75+
install(SSE)
76+
7477
defaultRequest {
7578
url(config.host.baseUrl)
7679
config.host.queryParams.onEach { (key, value) -> url.parameters.appendIfNameAbsent(key, value) }

openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/api/RunsApi.kt

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.aallam.openai.client.internal.api
22

3+
import com.aallam.openai.api.BetaOpenAI
34
import com.aallam.openai.api.core.PaginatedList
45
import com.aallam.openai.api.core.RequestOptions
56
import com.aallam.openai.api.core.SortOrder
@@ -13,20 +14,37 @@ import com.aallam.openai.client.internal.http.perform
1314
import io.ktor.client.call.*
1415
import io.ktor.client.request.*
1516
import io.ktor.http.*
17+
import kotlinx.coroutines.flow.Flow
18+
import kotlinx.coroutines.flow.collect
19+
import kotlinx.coroutines.flow.onEach
1620

1721
internal class RunsApi(val requester: HttpRequester) : Runs {
1822
override suspend fun createRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions?): Run {
1923
return requester.perform {
2024
it.post {
2125
url(path = "${ApiPath.Threads}/${threadId.id}/runs")
22-
setBody(request)
26+
setBody(request.copy(stream = false))
2327
contentType(ContentType.Application.Json)
2428
beta("assistants", 2)
2529
requestOptions(requestOptions)
2630
}.body()
2731
}
2832
}
2933

34+
@BetaOpenAI
35+
override suspend fun createStreamingRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions?) : Flow<AssistantStreamEvent> {
36+
return requester
37+
.performSse {
38+
url(path = "${ApiPath.Threads}/${threadId.id}/runs")
39+
setBody(request.copy(stream = true))
40+
contentType(ContentType.Application.Json)
41+
accept(ContentType.Text.EventStream)
42+
beta("assistants", 2)
43+
requestOptions(requestOptions)
44+
method = HttpMethod.Post
45+
}
46+
}
47+
3048
override suspend fun getRun(threadId: ThreadId, runId: RunId, requestOptions: RequestOptions?): Run {
3149
return requester.perform {
3250
it.get {
@@ -95,6 +113,25 @@ internal class RunsApi(val requester: HttpRequester) : Runs {
95113
}
96114
}
97115

116+
@BetaOpenAI
117+
override suspend fun submitStreamingToolOutput(
118+
threadId: ThreadId,
119+
runId: RunId,
120+
output: List<ToolOutput>,
121+
requestOptions: RequestOptions?
122+
) : Flow<AssistantStreamEvent> {
123+
return requester
124+
.performSse {
125+
url(path = "${ApiPath.Threads}/${threadId.id}/runs/${runId.id}/submit_tool_outputs")
126+
setBody(mapOf("tool_outputs" to output, "stream" to true))
127+
contentType(ContentType.Application.Json)
128+
accept(ContentType.Text.EventStream)
129+
beta("assistants", 2)
130+
requestOptions(requestOptions)
131+
method = HttpMethod.Post
132+
}
133+
}
134+
98135
override suspend fun cancel(threadId: ThreadId, runId: RunId, requestOptions: RequestOptions?): Run {
99136
return requester.perform {
100137
it.post {
@@ -109,14 +146,32 @@ internal class RunsApi(val requester: HttpRequester) : Runs {
109146
return requester.perform {
110147
it.post {
111148
url(path = "${ApiPath.Threads}/runs")
112-
setBody(request)
149+
setBody(request.copy(stream = false))
113150
contentType(ContentType.Application.Json)
114151
beta("assistants", 2)
115152
requestOptions(requestOptions)
116153
}.body()
117154
}
118155
}
119156

157+
@BetaOpenAI
158+
override suspend fun createStreamingThreadRun(
159+
request: ThreadRunRequest,
160+
requestOptions: RequestOptions?
161+
) : Flow<AssistantStreamEvent> {
162+
return requester
163+
.performSse {
164+
url(path = "${ApiPath.Threads}/runs")
165+
setBody(request.copy(stream = true))
166+
contentType(ContentType.Application.Json)
167+
accept(ContentType.Text.EventStream)
168+
beta("assistants", 2)
169+
requestOptions(requestOptions)
170+
method = HttpMethod.Post
171+
}
172+
}
173+
174+
120175
override suspend fun runStep(
121176
threadId: ThreadId,
122177
runId: RunId,

openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpRequester.kt

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.aallam.openai.client.internal.http
22

3+
import com.aallam.openai.api.run.AssistantStreamEvent
34
import io.ktor.client.*
5+
import io.ktor.client.plugins.sse.ClientSSESession
46
import io.ktor.client.request.*
57
import io.ktor.client.statement.*
8+
import io.ktor.sse.ServerSentEvent
69
import io.ktor.util.reflect.*
10+
import kotlinx.coroutines.flow.Flow
711

812
/**
913
* Http request performer.
@@ -15,6 +19,14 @@ internal interface HttpRequester : AutoCloseable {
1519
*/
1620
suspend fun <T : Any> perform(info: TypeInfo, block: suspend (HttpClient) -> HttpResponse): T
1721

22+
/**
23+
* Perform an HTTP request and process emitted server-side events.
24+
*
25+
*/
26+
suspend fun performSse(
27+
builderBlock: HttpRequestBuilder.() -> Unit
28+
): Flow<AssistantStreamEvent>
29+
1830
/**
1931
* Perform an HTTP request and get a result.
2032
*

openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpTransport.kt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,26 @@
11
package com.aallam.openai.client.internal.http
22

33
import com.aallam.openai.api.exception.*
4+
import com.aallam.openai.api.run.AssistantStreamEvent
5+
import com.aallam.openai.client.extension.toAssistantStreamEvent
6+
import com.aallam.openai.client.internal.api.ApiPath
47
import io.ktor.client.*
58
import io.ktor.client.call.*
69
import io.ktor.client.network.sockets.*
710
import io.ktor.client.plugins.*
11+
import io.ktor.client.plugins.sse.ClientSSESession
12+
import io.ktor.client.plugins.sse.sseSession
813
import io.ktor.client.request.*
914
import io.ktor.client.statement.*
15+
import io.ktor.http.ContentType
16+
import io.ktor.sse.ServerSentEvent
1017
import io.ktor.util.reflect.*
1118
import io.ktor.utils.io.errors.*
1219
import kotlinx.coroutines.CancellationException
20+
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.collect
22+
import kotlinx.coroutines.flow.map
23+
import kotlinx.coroutines.flow.onEach
1324

1425
/** HTTP transport layer */
1526
internal class HttpTransport(private val httpClient: HttpClient) : HttpRequester {
@@ -35,6 +46,19 @@ internal class HttpTransport(private val httpClient: HttpClient) : HttpRequester
3546
}
3647
}
3748

49+
override suspend fun performSse(
50+
builderBlock: HttpRequestBuilder.() -> Unit
51+
): Flow<AssistantStreamEvent> {
52+
try {
53+
return httpClient
54+
.sseSession(block = builderBlock)
55+
.incoming
56+
.map(ServerSentEvent::toAssistantStreamEvent)
57+
} catch (e: Exception) {
58+
throw handleException(e)
59+
}
60+
}
61+
3862
override fun close() {
3963
httpClient.close()
4064
}

0 commit comments

Comments
 (0)