Consuming application/x-ndjson asynchronously #46637
-
Greetings Quarkus community, I'm trying to create a REST API that wraps around a generic Ollama model. Ollama provides a default endpoint (http://localhost:11434/api/generate) to interact with its running model, which responds with Content-Type {"model":"llava","created_at":"2025-03-05T16:58:38.273513751Z","response":" D","done":false}
{"model":"llava","created_at":"2025-03-05T16:58:38.375692251Z","response":"ocker","done":false}
...
{"model":"llava","created_at":"2025-03-05T16:58:56.19444126Z","response":"","done":true,"done_reason":"stop","context":[...],"total_duration":20191370718,"load_duration":1819885626,"prompt_eval_count":17,"prompt_eval_duration":447000000,"eval_count":261,"eval_duration":17921000000} I'm using Quarkus Rest Client with Kotlin so I tried to represent this with an interface as usual: @RegisterRestClient(configKey = "llm")
@Path("/api/generate")
interface LlmClient {
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces("application/x-ndjson")
suspend fun generate(request: LlmRequest): Flow<String>
} Here's how I'm calling it (in this simple example I'm basically trying to stream what I get from the model to my API clients): @Path("/hello")
class ExampleResource {
@RestClient
private lateinit var client: LlmClient
@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
suspend fun hello(): Flow<String> {
val request = LlmRequest("llava", "What is Docker?", listOf())
return client.generate(request)
}
} However, when I call the endpoint from Insomnia I get the following error: {
"details": "Error id b1baae48-207d-4342-a5b1-d9f9b96235c4-1, jakarta.ws.rs.ProcessingException: Response could not be mapped to type kotlinx.coroutines.flow.Flow<java.lang.String> for response with media type application/x-ndjson",
"stack": "jakarta.ws.rs.ProcessingException: Response could not be mapped to type kotlinx.coroutines.flow.Flow<java.lang.String> for response with media type application/x-ndjson\n\tat org.jboss.resteasy.reactive.client.impl.ClientReaderInterceptorContextImpl.proceed(ClientReaderInterceptorContextImpl.java:132)\n\tat org.jboss.resteasy.reactive.client.impl.ClientSerialisers.invokeClientReader(ClientSerialisers.java:160)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext.readEntity(RestClientRequestContext.java:219)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.mapToResponse(ClientResponseCompleteRestHandler.java:113)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:35)\n\tat org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler.handle(ClientResponseCompleteRestHandler.java:31)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.invokeHandler(AbstractResteasyReactiveContext.java:231)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)\n\tat org.jboss.resteasy.reactive.client.impl.RestClientRequestContext$1.lambda$execute$0(RestClientRequestContext.java:324)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:270)\n\tat io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:252)\n\tat io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:50)\n\tat io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)\n\tat io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)"
} The only way I was able to solve this was changing the RestClient endpoint's return type from Flow/Multi (which also produces this error) to InputStream. However, this makes it so that I can only produce data to my users AFTER reading the entire InputStream. If my model takes 5 minutes producing an answer to a prompt, instead of being able to stream its' answer to my users as it is being produced, my API can only send data after the model's response is complete. After looking around the docs, searching some forums and asking some AIs about it, I didn't find a solution, so I'm asking you: do you know any way of doing this in an asynchronous fashion, allowing me to pretty much "redirect" the model's stream to my API clients, instead of having to wait for the model's entire answer? Thank you for taking a while to think about this and I apologize if I created this in the wrong place, please guide me so I can improve it if necessary! |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 5 replies
-
/cc @geoand (kotlin) |
Beta Was this translation helpful? Give feedback.
-
Hi, The |
Beta Was this translation helpful? Give feedback.
-
If you add the example to GitHub, I can check it out next week
…On Thu, Mar 6, 2025, 12:21 Pedro Oliveira ***@***.***> wrote:
Thank you @geoand <https://github.com/geoand>, I wasn't aware of the
Langchain4j ecosystem. I started using it and immediately ran into a
problem:
I'm creating a demo where I feed an image to a model and it tells me if
the image is too blurry or not:
quarkus.langchain4j.chat-model.provider=ollama
quarkus.langchain4j.ollama.llava.chat-model.model-id=llava
@RegisterAiService(
modelName = "llava",
chatMemoryProviderSupplier = RegisterAiService.NoChatMemoryProviderSupplier::class
)interface LlavaService {
@SystemMessage("You are responsible for detecting if an image is blurry or not.")
@Usermessage("Is this image blurry?")
fun isBlurry(image: Image): Boolean
}
@path("/hello")class ExampleResource @Inject constructor(val llavaService: LlavaService) {
@get
@consumes(MediaType.MULTIPART_FORM_DATA)
fun ***@***.*** image: File): Boolean {
val base64Image: String = Base64.getEncoder().encodeToString(image.readBytes())
val langchain4jImage: Image = Image.builder()
.base64Data(base64Image)
.mimeType("image/jpeg")
.build()
return llavaService.isBlurry(langchain4jImage)
}
}
Everything compiles just fine. However, when I call this endpoint, I get
the following error:
{
"details": "Error id 71e549f8-c768-4c7f-a92f-ade8401ee0f3-1, java.lang.NullPointerException: ",
"stack": "java.lang.NullPointerException\n\tat java.base/java.util.Objects.requireNonNull(Objects.java:233)\n\tat io.quarkus.qute.TemplateInstanceBase.data(TemplateInstanceBase.java:45)\n\tat io.quarkiverse.langchain4j.QuarkusPromptTemplateFactory$QuteTemplate.render(QuarkusPromptTemplateFactory.java:67)\n\tat dev.langchain4j.model.input.PromptTemplate.apply(PromptTemplate.java:102)\n\tat io.quarkiverse.langchain4j.runtime.aiservice.AiServiceMethodImplementationSupport.prepareSystemMessage(AiServiceMethodImplementationSupport.java:619)\n\tat io.quarkiverse.langchain4j.runtime.aiservice.AiServiceMethodImplementationSupport.doImplement(AiServiceMethodImplementationSupport.java:161)\n\tat io.quarkiverse.langchain4j.runtime.aiservice.AiServiceMethodImplementationSupport.implement(AiServiceMethodImplementationSupport.java:143)\n\tat com.ctw.LlavaService$$QuarkusImpl.isBlurry(Unknown Source)\n\tat com.ctw.LlavaService$$QuarkusImpl_ClientProxy.isBlurry(Unknown Source)\n\tat com.ctw.ExampleResource.hello(ExampleResource.kt:26)\n\tat com.ctw.ExampleResource$quarkusrestinvoker$hello_4cd34cdf90b01c30dcaa152645cbb5afe56fd70c.invoke(Unknown Source)\n\tat org.jboss.resteasy.reactive.server.handlers.InvocationHandler.handle(InvocationHandler.java:29)\n\tat io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:141)\n\tat org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:147)\n\tat io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:635)\n\tat org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)\n\tat org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)\n\tat org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)\n\tat org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)\n\tat org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)"
}
I'm not sure if this will help, but following the stack trace we can see
that the Input passed to the AiServiceMethodimplementationSupport's
implement method contains an AiServiceMethodCreateInfo with a
systemMessageInfo. This is an Optional<TemplateInfo> that has its'
nameToParamPosition with a null key that breaks the program further along
the code.
image.png (view on web)
<https://github.com/user-attachments/assets/b71df8c6-ab9a-4a0e-8a04-d501b27baeb9>
I'm guessing this has something to do with quarkus-langchain4j-ollama not
supporting image models or multimodal models like Llava (I might be wrong
here). Is there any way of doing this with this extension, or should I pick
another model/provider to achieve this use case?
Thanks again for your time and let me know if any more details are
required to investigate this! :)
—
Reply to this email directly, view it on GitHub
<#46637 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABBMDPY56HG3FNCV3DGVVBD2TAVUZAVCNFSM6AAAAABYMUEQKCVHI2DSMVQWIX3LMV43URDJONRXK43TNFXW4Q3PNVWWK3TUHMYTENBRGI4TMNY>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
Very glad to hear that!
…On Mon, Mar 10, 2025, 12:43 Pedro Oliveira ***@***.***> wrote:
It's fixed!
After messing around with standalone langchain4j, I also really appreciate
how easy it is to stream: change the type to multi and annotate the
endpoint with @produces <https://github.com/produces>(Mediatype.SERVER_SENT_EVENTS).
Not even going into WebSockets, which I already saw that this extension
also has support for.
This is a great developer experience, congratulations and thank you very
much!
—
Reply to this email directly, view it on GitHub
<#46637 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ABBMDP3OJOD5AFSONZ6MQXD2TVUD7AVCNFSM6AAAAABYMUEQKCVHI2DSMVQWIX3LMV43URDJONRXK43TNFXW4Q3PNVWWK3TUHMYTENBUG42TCMI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
The problem is that you are using:
javaParameters = false
inbuild.gradle.kts
(there is also a warning abou that).If you change that to
true
, everything works.