Streaming CSV back to API client #37750
-
Hi, i have a case where I need to serve CSVs to my clients, which can be hundreds of megabytes in size. Some pseudo code snippet would be nice. I was thinking of something like this, but kotlin friendly: @Path("/csv")
@GET
@Produces("text/csv")
fun getCSV(): Multi<String> {
return db.query("SELECT * FROM time_entry")
.execute()
.toMulti()
.onItem()
.transform { rowSet ->
rowSet.joinToString("\n") { row ->
"""${row.getUUID("id")},${row.getUUID("client_id")},${row.getString("description")},${
row.getOffsetDateTime(
"started_on"
)
},${row.getOffsetDateTime("completed_on")}"""
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 7 replies
-
/cc @evanchooly (kotlin), @geoand (kotlin) |
Beta Was this translation helpful? Give feedback.
-
Yes, you can 😃, Resteasy Reactive has built in support for Kotlin You have the following options:
|
Beta Was this translation helpful? Give feedback.
-
@mschorsch Thank you for the prompt response! I really appreciate it. One quick question: does this mean that the single item will be pumped into response as soon as I prepare it ( |
Beta Was this translation helpful? Give feedback.
-
I have not tested it, but something like that should do the trick: import io.smallrye.mutiny.coroutines.awaitSuspending
import io.vertx.mutiny.pgclient.PgPool
import io.vertx.mutiny.sqlclient.*
import jakarta.enterprise.context.ApplicationScoped
import jakarta.ws.rs.GET
import jakarta.ws.rs.Path
import jakarta.ws.rs.Produces
import jakarta.ws.rs.core.MediaType
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.flow
@Path("/fruits")
@ApplicationScoped
class FruitsResource(
private val pgPool: PgPool
) {
@GET
@Produces(MediaType.TEXT_PLAIN)
fun getFruits(): Flow<String> {
return flow { getFruits() }
}
private suspend fun FlowCollector<String>.getFruits() {
pgPool.withTransactionAwait {
val pq = prepare("SELECT name from fruits").awaitSuspending()
val cursor = pq.cursor()
while (cursor.hasMore()) {
cursor.read(50).awaitSuspending().forEach {
emit(it.getString(1))
}
}
}
}
private suspend fun <T> PgPool.withTransactionAwait(block: suspend SqlConnection.() -> T) {
val sqlConnection = connection.awaitSuspending()
try {
val transaction = sqlConnection.begin().awaitSuspending()
try {
block(sqlConnection)
transaction.commit().awaitSuspending()
} catch (ex: Exception) {
transaction.rollback().awaitSuspending()
}
} finally {
sqlConnection.close().awaitSuspending()
}
}
} |
Beta Was this translation helpful? Give feedback.
I have not tested it, but something like that should do the trick: