Skip to content

ReactorContextInjector does not have contextCapture #4480

@olga-larina

Description

@olga-larina

Hello!
There are similar issues in other projects made by another author:
reactor/reactor-kotlin-extensions#59
reactor/reactor-core#3406

Maybe these changes should be made in https://github.com/Kotlin/kotlinx.coroutines/blob/f4f519b36734238ec686dfaec1e174086691781e/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt
Please, take a look.

Use case

For users using kotlin coroutine, ThreadContextElement is used to propagate MDC, etc.

The block operator is supported by reactor/reactor-core#3420, but not the await function. Therefore, we are forced to write it this way.

// e.g. WebClient
webClient.post()
    .uri("...")
    // omitted
    .contextCapture()
    .awaitSingle()

It would be great if the await function is also supported, like the block operator.

The Shape of the API

This option can be achieved by adding contextCapture() in ContextInjector. It looks like we could add it to existing ReactorContextInjector instead of creating new ContextInjector.
Possible solution may look like:

package kotlinx.coroutines.reactor

import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.*
import kotlin.coroutines.*

internal class ReactorContextInjector : ContextInjector {
    
    override fun <T> injectCoroutineContext(publisher: Publisher<T>, coroutineContext: CoroutineContext): Publisher<T> {
        val reactorContext = coroutineContext[ReactorContext]?.context
        return when(publisher) {
            is Mono -> publisher.contextWriteNullable(reactorContext).contextCapture()
            is Flux -> publisher.contextWriteNullable(reactorContext).contextCapture()
            else -> publisher
        }
    }

    private fun <T> Mono<T>.contextWriteNullable(reactorContext: Context?) = if (reactorContext == null) {
        this
    } else {
        this.contextWrite(reactorContext)
    }

    private fun <T> Flux<T>.contextWriteNullable(reactorContext: Context?) = if (reactorContext == null) {
        this
    } else {
        this.contextWrite(reactorContext)
    }
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions