-
Notifications
You must be signed in to change notification settings - Fork 59
Description
Hi folks,
Another issue I could use some assistance with please.
I have a BPMN which uses the Camunda-Camel library to communicate to a third party service through Kafka, i.e.:
- A BPMN Send Task of a BPMN process sends a message to a Camel route.
That task is the fist one in the BPMN process which may be important. - The Camel route forwards the message to the "FromCamunda"Kafka topic.
- A third party service reads from the "FromCamunda" Kafka topic.
- The third party service does some processing and sends the reply to the "ToCamunda" Kafka topic.
- Another camel route configured on the same Camunda server as the one in steps 1-2 reads the message from the "ToCamunda" Kafka topic.
- The route forwards the message to the same Camunda BPMN process which sent the outgoing message in steps 1-2.
The BPMN Process Instance ID is used for the correlation.
The routing sometime breaks with the following error:
2019-04-25 11:56:33,696 ERROR o.a.c.p.DefaultErrorHandler [-1) thread #1 - KafkaConsumer[toCamunda]] -> Failed delivery for (MessageId: ID-LAPTOP-9OG51EMC-1556205751527-0-4 on ExchangeId: ID-LAPTOP-9OG51EMC-1556205751527-0-3). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[asyncResponse ] [asyncResponse ] [kafka://localhost:9092?autoOffsetReset=earliest&brokers=localhost%3A9092&consu] [ 100]
[asyncResponse ] [unmarshal1 ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@473c9784] ] [ 22]
[asyncResponse ] [process2 ] [Processor@0x3a92a57c ] [ 0]
[asyncResponse ] [to2 ] [camunda-bpm:message?messageName=camel.answer©BodyAsVariable=aggregateData ] [ 75]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
2019-04-25 11:56:33,698 WARN o.a.c.c.k.KafkaConsumer [-1) thread #1 - KafkaConsumer[toCamunda]] -> Error during processing. Exchange[ID-LAPTOP-9OG51EMC-1556205751527-0-3]. Caused by: [java.lang.RuntimeException - Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer']
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
The BPMN:
<?xml version="1.0" encoding="UTF-8"?> <bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_0wxf2ek" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="2.2.4"> <bpmn:message id="Message_1buba0w" name="camel.answer" /> <bpmn:message id="Message_19viyxu" name="camel.answer" /> <bpmn:process id="camel_asynch" name="Camel async" isExecutable="true"> <bpmn:endEvent id="EndEvent_1egevrf"> <bpmn:incoming>SequenceFlow_0aodgcz</bpmn:incoming> </bpmn:endEvent> <bpmn:sendTask id="SendTask_1ouqvcf" name="Call async service" camunda:expression="#{camel.sendTo('direct:asyncService')}" camunda:resultVariable="aggregateData"> <bpmn:incoming>SequenceFlow_1yw1ho6</bpmn:incoming> <bpmn:outgoing>SequenceFlow_1i1ujc8</bpmn:outgoing> </bpmn:sendTask> <bpmn:sequenceFlow id="SequenceFlow_0aodgcz" sourceRef="IntermediateCatchEvent_133vikc" targetRef="EndEvent_1egevrf" /> <bpmn:sequenceFlow id="SequenceFlow_1yw1ho6" sourceRef="StartEvent_1cmiw6j" targetRef="SendTask_1ouqvcf" /> <bpmn:intermediateCatchEvent id="IntermediateCatchEvent_133vikc" name="message 'answer' received"> <bpmn:incoming>SequenceFlow_1i1ujc8</bpmn:incoming> <bpmn:outgoing>SequenceFlow_0aodgcz</bpmn:outgoing> <bpmn:messageEventDefinition id="MessageEventDefinition_0anjhqa" messageRef="Message_0u86xqd" /> </bpmn:intermediateCatchEvent> <bpmn:sequenceFlow id="SequenceFlow_1i1ujc8" sourceRef="SendTask_1ouqvcf" targetRef="IntermediateCatchEvent_133vikc" /> <bpmn:startEvent id="StartEvent_1cmiw6j"> <bpmn:outgoing>SequenceFlow_1yw1ho6</bpmn:outgoing> </bpmn:startEvent> </bpmn:process> <bpmn:message id="Message_0u86xqd" name="camel.answer" /> <bpmndi:BPMNDiagram id="BPMNDiagram_1"> <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="camel_asynch"> <bpmndi:BPMNShape id="EndEvent_1egevrf_di" bpmnElement="EndEvent_1egevrf"> <dc:Bounds x="848" y="88" width="36" height="36" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape id="SendTask_1ouqvcf_di" bpmnElement="SendTask_1ouqvcf"> <dc:Bounds x="380" y="66" width="100" height="80" /> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_0aodgcz_di" bpmnElement="SequenceFlow_0aodgcz"> <di:waypoint x="748" y="106" /> <di:waypoint x="848" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_1yw1ho6_di" bpmnElement="SequenceFlow_1yw1ho6"> <di:waypoint x="101" y="106" /> <di:waypoint x="380" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="IntermediateCatchEvent_133vikc_di" bpmnElement="IntermediateCatchEvent_133vikc"> <dc:Bounds x="712" y="88" width="36" height="36" /> <bpmndi:BPMNLabel> <dc:Bounds x="687" y="131" width="88" height="27" /> </bpmndi:BPMNLabel> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_1i1ujc8_di" bpmnElement="SequenceFlow_1i1ujc8"> <di:waypoint x="480" y="106" /> <di:waypoint x="712" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="StartEvent_1cmiw6j_di" bpmnElement="StartEvent_1cmiw6j"> <dc:Bounds x="65" y="88" width="36" height="36" /> </bpmndi:BPMNShape> </bpmndi:BPMNPlane> </bpmndi:BPMNDiagram> </bpmn:definitions>
The Camel routes:
from("direct://asyncService")
.routeId("asyncService")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
Map inMap = (Map)exchange.getIn().getBody();
Map<String, String> kafkaOutParams = new HashMap<>();
kafkaOutParams.put("aggregateSchemaId", inMap.get("aggregateSchemaId?").toString());
kafkaOutParams.put("aggregateInstanceId", inMap.get("aggregateInstanceId?").toString());
exchange.getOut().setBody(kafkaOutParams, Map.class);
exchange.getOut().setHeader(KafkaConstants.KEY,
exchange.getProperties().get(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID));
}
})
.marshal().json(JsonLibrary.Jackson, Map.class)
.convertBodyTo(String.class)
.to("kafka:localhost:9092?topic=fromCamunda&brokers=localhost:9092");
from("kafka:localhost:9092?topic=toCamunda&groupId=group_id&autoOffsetReset=earliest&consumersCount=1&brokers=localhost:9092")
.routeId("asyncResponse")
.unmarshal().json(JsonLibrary.Jackson, Map.class)
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.setProperty(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID,
exchange.getIn().getHeader(KafkaConstants.KEY));
Map<String,Object> result = new HashMap<>();
result.put("aggregateData", exchange.getIn().getBody());
exchange.getIn().setBody(result, Map.class);
}
})
.to("camunda-bpm:message?messageName=camel.answer©BodyAsVariable=aggregateData");
As far as I understand there's a race condition between the thread in the Camunda process that sends the outgoing Kafka message and the thread that reads the Kafka message and uses the Process Instance ID to find the process to forward the message to.
I.e. when the correlation is done in the second thread the process hasn'd been committed to the DB in the first thread.
-
Does this theory sound right?
-
If it doesn't what else could be the problem
-
Any advice how to fix the issue?
Any help is greatly appreciated!
Thanks,
Andrey.