Spring Cloud Stream Reactive trace/span info not populated in log #6074
Unanswered
huxun198933
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
Hi, we create an application with spring cloud stream framework and using reactive programming, auto instrumented by otel java agent. The tracing part looks good but trace id and span id is not able to be injected into logs. Anyone could give some advice how to solve this, or it is exsitng flaw?
the simple function we used to process message:
@Bean public Function<Flux<Message<SampleData>>, Flux<Message<SampleData>>> demoFunction() { return flux -> flux.filter(message -> xxx).map(message -> xxx).log(); }
the java agent log showing tracing is good as trace id and span id generated:
[KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'queue:///TESTQ send' : 558d11508f0c6d1a1a0f92b8e21892ef 840c3eb206393d2c PRODUCER [tracer: io.opentelemetry.jms-1.1:1.14.0-alpha] AttributesMap{data={messaging.destination=queue:///TESTQ, thread.id=71, messaging.message_id=ID:414d51204643544e414d514d31552020a6c180620a6cd323, thread.name=KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1, messaging.destination_kind=queue, messaging.system=jms}, capacity=128, totalAddedValues=6}
[KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'demo-processor.demoFunction-out-0 process' : 558d11508f0c6d1a1a0f92b8e21892ef 66f9b0b2d5028003 CONSUMER [tracer: io.opentelemetry.spring-integration-4.1:1.14.0-alpha] AttributesMap{data={thread.id=71, messaging.operation=process, thread.name=KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1}, capacity=128, totalAddedValues=3}
[KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'test process' : 558d11508f0c6d1a1a0f92b8e21892ef f4b22b34c4fbeedb CONSUMER [tracer: io.opentelemetry.spring-kafka-2.7:1.14.0-alpha] AttributesMap{data={messaging.destination=test, thread.id=71, messaging.message_payload_size_bytes=23, messaging.operation=process, messaging.kafka.partition=0, thread.name=KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1, messaging.destination_kind=topic, messaging.system=kafka}, capacity=128, totalAddedValues=8}
however the log doesnot show any trace info:
{"@timestamp":"2022-05-20T09:15:30.285Z","severity":"INFO","service":"demo-processor","trace":"","span":"","flag":"","pid":"9788","thread":"KafkaConsumerDestination{consumerDestinationName='test', partitions=3, dlqName='null'}.container-0-C-1","class":"reactor.Flux.Lift.1","rest":"onNext(GenericMessage [payload=SampleData(id=64, lastUpdatedDate=null, lastUpdatedBy=null), headers={sequenceNumber=1, sequenceSize=1, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test, accept=application/x-ndjson, target-protocol=kafka, kafka_offset=18948, scst_nativeHeadersPresent=true, traceparent=00-558d11508f0c6d1a1a0f92b8e21892ef-12956c4e7ed5bc69-01, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@58e7668b, correlationId=10d2e21f-f8ff-10d8-bce4-97352630639a, id=5a296449-bb23-8af8-2a2a-7fc82b15e678, kafka_receivedPartitionId=2, kafka_receivedTimestamp=1653038104630, contentType=application/json, kafka_groupId=demo-processor, timestamp=1653038130285}])"}
the logback config is like this:
<appender
name="logstash_console" class="net.logstash.logback.appender.LoggingEventAsyncDisruptorAppender">UTC
{
"severity": "%level",
"service": "${APP_NAME:-}",
"trace": "%mdc{trace_id}",
"span": "%mdc{span_id}",
"flag": "%mdc{trace_flags}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"rest": "%message"
}
`
We also tried to remove "Flux" in the function, then everything is working well, like below:
@Bean public Function<Message<SampleData>, Message<SampleData>> demoFunction() { return message-> { log.info("demoFunction: {}", message.toString()); return message; }; }
Beta Was this translation helpful? Give feedback.
All reactions