-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Hello!
There are similar issues in other projects made by another author:
reactor/reactor-kotlin-extensions#59
reactor/reactor-core#3406
Maybe these changes should be made in https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
Please, take a look.
Use case
For users using kotlin coroutine, ThreadContextElement is used to propagate MDC, etc.
The block operator is supported by reactor/reactor-core#3420, but not the await function. Therefore, we are forced to write it this way.
// e.g. WebClient
webClient.post()
.uri("...")
// omitted
.contextCapture()
.awaitSingle()
It would be great if the await function is also supported, like the block operator.
The Shape of the API
This option can be achieved by adding contextCapture()
in ContextInjector
. It looks like we could add it to existing ReactorContextInjector
instead of creating new ContextInjector
.
Possible solution may look like:
package kotlinx.coroutines.reactor
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*
internal class ReactorContextInjector : ContextInjector {
override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
val reactorContext = coroutineContext[ReactorContext]?.context
return when(publisher) {
is Mono -> publisher.contextWriteNullable(reactorContext).contextCapture()
is Flux -> publisher.contextWriteNullable(reactorContext).contextCapture()
else -> publisher
}
}
private fun <T> Mono<T>.contextWriteNullable(reactorContext: Context?) = if (reactorContext == null) {
this
} else {
this.contextWrite(reactorContext)
}
private fun <T> Flux<T>.contextWriteNullable(reactorContext: Context?) = if (reactorContext == null) {
this
} else {
this.contextWrite(reactorContext)
}
}