Skip to content

"Couldn't find process instance" while using async communication #51

@ashulinskiy

Description

@ashulinskiy

Hi folks,

Another issue I could use some assistance with please.

I have a BPMN which uses the Camunda-Camel library to communicate to a third party service through Kafka, i.e.:

  1. A BPMN Send Task of a BPMN process sends a message to a Camel route.
    That task is the fist one in the BPMN process which may be important.
  2. The Camel route forwards the message to the "FromCamunda"Kafka topic.
  3. A third party service reads from the "FromCamunda" Kafka topic.
  4. The third party service does some processing and sends the reply to the "ToCamunda" Kafka topic.
  5. Another camel route configured on the same Camunda server as the one in steps 1-2 reads the message from the "ToCamunda" Kafka topic.
  6. The route forwards the message to the same Camunda BPMN process which sent the outgoing message in steps 1-2.
    The BPMN Process Instance ID is used for the correlation.

The routing sometime breaks with the following error:

2019-04-25 11:56:33,696 ERROR o.a.c.p.DefaultErrorHandler [-1) thread #1 - KafkaConsumer[toCamunda]] -> Failed delivery for (MessageId: ID-LAPTOP-9OG51EMC-1556205751527-0-4 on ExchangeId: ID-LAPTOP-9OG51EMC-1556205751527-0-3). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[asyncResponse     ] [asyncResponse     ] [kafka://localhost:9092?autoOffsetReset=earliest&brokers=localhost%3A9092&consu] [       100]
[asyncResponse     ] [unmarshal1        ] [unmarshal[org.apache.camel.model.dataformat.JsonDataFormat@473c9784]          ] [        22]
[asyncResponse     ] [process2          ] [Processor@0x3a92a57c                                                          ] [         0]
[asyncResponse     ] [to2               ] [camunda-bpm:message?messageName=camel.answer&copyBodyAsVariable=aggregateData ] [        75]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
	at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
	at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
2019-04-25 11:56:33,698 WARN  o.a.c.c.k.KafkaConsumer [-1) thread #1 - KafkaConsumer[toCamunda]] -> Error during processing. Exchange[ID-LAPTOP-9OG51EMC-1556205751527-0-3]. Caused by: [java.lang.RuntimeException - Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer']
java.lang.RuntimeException: Couldn't find waiting process instance with id 'b2c37f57-6772-11e9-9616-e86a64530ab3' for message 'camel.answer'
	at org.camunda.bpm.camel.component.producer.MessageProducer.process(MessageProducer.java:116)
	at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
	at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.doRun(KafkaConsumer.java:326)
	at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:215)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

The BPMN:

<?xml version="1.0" encoding="UTF-8"?> <bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" id="Definitions_0wxf2ek" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="2.2.4"> <bpmn:message id="Message_1buba0w" name="camel.answer" /> <bpmn:message id="Message_19viyxu" name="camel.answer" /> <bpmn:process id="camel_asynch" name="Camel async" isExecutable="true"> <bpmn:endEvent id="EndEvent_1egevrf"> <bpmn:incoming>SequenceFlow_0aodgcz</bpmn:incoming> </bpmn:endEvent> <bpmn:sendTask id="SendTask_1ouqvcf" name="Call async service" camunda:expression="#{camel.sendTo(&#39;direct:asyncService&#39;)}" camunda:resultVariable="aggregateData"> <bpmn:incoming>SequenceFlow_1yw1ho6</bpmn:incoming> <bpmn:outgoing>SequenceFlow_1i1ujc8</bpmn:outgoing> </bpmn:sendTask> <bpmn:sequenceFlow id="SequenceFlow_0aodgcz" sourceRef="IntermediateCatchEvent_133vikc" targetRef="EndEvent_1egevrf" /> <bpmn:sequenceFlow id="SequenceFlow_1yw1ho6" sourceRef="StartEvent_1cmiw6j" targetRef="SendTask_1ouqvcf" /> <bpmn:intermediateCatchEvent id="IntermediateCatchEvent_133vikc" name="message &#39;answer&#39; received"> <bpmn:incoming>SequenceFlow_1i1ujc8</bpmn:incoming> <bpmn:outgoing>SequenceFlow_0aodgcz</bpmn:outgoing> <bpmn:messageEventDefinition id="MessageEventDefinition_0anjhqa" messageRef="Message_0u86xqd" /> </bpmn:intermediateCatchEvent> <bpmn:sequenceFlow id="SequenceFlow_1i1ujc8" sourceRef="SendTask_1ouqvcf" targetRef="IntermediateCatchEvent_133vikc" /> <bpmn:startEvent id="StartEvent_1cmiw6j"> <bpmn:outgoing>SequenceFlow_1yw1ho6</bpmn:outgoing> </bpmn:startEvent> </bpmn:process> <bpmn:message id="Message_0u86xqd" name="camel.answer" /> <bpmndi:BPMNDiagram id="BPMNDiagram_1"> <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="camel_asynch"> <bpmndi:BPMNShape id="EndEvent_1egevrf_di" bpmnElement="EndEvent_1egevrf"> <dc:Bounds x="848" y="88" width="36" height="36" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape id="SendTask_1ouqvcf_di" bpmnElement="SendTask_1ouqvcf"> <dc:Bounds x="380" y="66" width="100" height="80" /> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_0aodgcz_di" bpmnElement="SequenceFlow_0aodgcz"> <di:waypoint x="748" y="106" /> <di:waypoint x="848" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_1yw1ho6_di" bpmnElement="SequenceFlow_1yw1ho6"> <di:waypoint x="101" y="106" /> <di:waypoint x="380" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="IntermediateCatchEvent_133vikc_di" bpmnElement="IntermediateCatchEvent_133vikc"> <dc:Bounds x="712" y="88" width="36" height="36" /> <bpmndi:BPMNLabel> <dc:Bounds x="687" y="131" width="88" height="27" /> </bpmndi:BPMNLabel> </bpmndi:BPMNShape> <bpmndi:BPMNEdge id="SequenceFlow_1i1ujc8_di" bpmnElement="SequenceFlow_1i1ujc8"> <di:waypoint x="480" y="106" /> <di:waypoint x="712" y="106" /> </bpmndi:BPMNEdge> <bpmndi:BPMNShape id="StartEvent_1cmiw6j_di" bpmnElement="StartEvent_1cmiw6j"> <dc:Bounds x="65" y="88" width="36" height="36" /> </bpmndi:BPMNShape> </bpmndi:BPMNPlane> </bpmndi:BPMNDiagram> </bpmn:definitions>

The Camel routes:

      from("direct://asyncService")
                .routeId("asyncService")
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        Map inMap = (Map)exchange.getIn().getBody();
                        Map<String, String> kafkaOutParams = new HashMap<>();
                        kafkaOutParams.put("aggregateSchemaId", inMap.get("aggregateSchemaId?").toString());
                        kafkaOutParams.put("aggregateInstanceId", inMap.get("aggregateInstanceId?").toString());
                        exchange.getOut().setBody(kafkaOutParams, Map.class);
                        exchange.getOut().setHeader(KafkaConstants.KEY,
                            exchange.getProperties().get(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID));
                    }
                })
                .marshal().json(JsonLibrary.Jackson, Map.class)
                .convertBodyTo(String.class)
                .to("kafka:localhost:9092?topic=fromCamunda&brokers=localhost:9092");

        from("kafka:localhost:9092?topic=toCamunda&groupId=group_id&autoOffsetReset=earliest&consumersCount=1&brokers=localhost:9092")
            .routeId("asyncResponse")
            .unmarshal().json(JsonLibrary.Jackson, Map.class)
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    exchange.setProperty(CamundaBpmConstants.EXCHANGE_HEADER_PROCESS_INSTANCE_ID,
                        exchange.getIn().getHeader(KafkaConstants.KEY));
                    Map<String,Object> result = new HashMap<>();
                    result.put("aggregateData", exchange.getIn().getBody());
                    exchange.getIn().setBody(result, Map.class);
                }
            })
            .to("camunda-bpm:message?messageName=camel.answer&copyBodyAsVariable=aggregateData");

As far as I understand there's a race condition between the thread in the Camunda process that sends the outgoing Kafka message and the thread that reads the Kafka message and uses the Process Instance ID to find the process to forward the message to.
I.e. when the correlation is done in the second thread the process hasn'd been committed to the DB in the first thread.

  • Does this theory sound right?

  • If it doesn't what else could be the problem

  • Any advice how to fix the issue?

Any help is greatly appreciated!

Thanks,
Andrey.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions