Skip to content

Commit 53cc3a1

Browse files
authored
fix(#3727): Implement failure and recovery strategy in ContinuousPlcRequestReader (#3728)
1 parent f49a66f commit 53cc3a1

File tree

1 file changed

+65
-13
lines changed
  • streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection

1 file changed

+65
-13
lines changed

streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.plc4x.java.api.PlcConnection;
2828
import org.apache.plc4x.java.api.PlcConnectionManager;
29+
import org.apache.plc4x.java.api.messages.PlcReadResponse;
2930
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
@@ -36,32 +37,83 @@ public class ContinuousPlcRequestReader
3637
extends OneTimePlcRequestReader implements IPullAdapter {
3738

3839
private static final Logger LOG = LoggerFactory.getLogger(ContinuousPlcRequestReader.class);
40+
private static final int MAX_IDLE_PULLS = 300;
3941

4042
private final IEventCollector collector;
43+
private int idlePullsBeforeNextAttempt = 0;
44+
private int currentIdlePulls = 0;
4145

42-
public ContinuousPlcRequestReader(PlcConnectionManager connectionManager,
43-
Plc4xConnectionSettings settings,
44-
PlcRequestProvider requestProvider,
45-
IEventCollector collector) {
46+
/**
47+
* Failure and recovery strategy:
48+
* - If a read fails, the number of idle pulls before the next attempt is doubled, up to a maximum of 300.
49+
* - If the read is successful, the idle pull counter is reset.
50+
*/
51+
public ContinuousPlcRequestReader(
52+
PlcConnectionManager connectionManager,
53+
Plc4xConnectionSettings settings,
54+
PlcRequestProvider requestProvider,
55+
IEventCollector collector
56+
) {
4657
super(connectionManager, settings, requestProvider);
4758
this.collector = collector;
4859
}
4960

5061
@Override
5162
public void pullData() throws RuntimeException {
63+
if (currentIdlePulls < idlePullsBeforeNextAttempt) {
64+
idleRead();
65+
} else {
66+
connectAndReadPlcData();
67+
}
68+
}
69+
70+
private void connectAndReadPlcData() {
5271
try (PlcConnection plcConnection = connectionManager.getConnection(settings.connectionString())) {
5372
var readRequest = requestProvider.makeReadRequest(plcConnection, settings.nodes());
54-
var readResponse = readRequest.execute().get(5000, TimeUnit.MILLISECONDS);
55-
var event = eventGenerator.makeEvent(readResponse);
56-
collector.collect(event);
73+
var readResponse = readRequest.execute()
74+
.get(5000, TimeUnit.MILLISECONDS);
75+
processPlcReadResponse(readResponse);
5776
} catch (Exception e) {
58-
// ensure that the cached connection manager removes the broken connection
59-
if (connectionManager instanceof CachedPlcConnectionManager) {
60-
((CachedPlcConnectionManager) connectionManager).removeCachedConnection(settings.connectionString());
61-
}
62-
LOG.error("Error while reading from PLC with connection string {}: {} ",
63-
settings.connectionString(), e.getMessage());
77+
handleFailingPlcRead(e);
78+
}
79+
}
80+
81+
private void processPlcReadResponse(PlcReadResponse readResponse) {
82+
var event = eventGenerator.makeEvent(readResponse);
83+
collector.collect(event);
84+
this.resetIdlePulls();
85+
}
86+
87+
private void handleFailingPlcRead(Exception e) {
88+
// ensure that the cached connection manager removes the broken connection
89+
if (connectionManager instanceof CachedPlcConnectionManager) {
90+
((CachedPlcConnectionManager) connectionManager).removeCachedConnection(settings.connectionString());
6491
}
92+
93+
// Increase backoff counter on failure
94+
if (idlePullsBeforeNextAttempt == 0) {
95+
idlePullsBeforeNextAttempt = 1;
96+
} else {
97+
idlePullsBeforeNextAttempt = Math.min(idlePullsBeforeNextAttempt * 2, MAX_IDLE_PULLS);
98+
}
99+
100+
LOG.error(
101+
"Error while reading from PLC with connection string {}. Setting adapter to idle for {} attemtps. {} ",
102+
settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
103+
);
104+
105+
currentIdlePulls = 0;
106+
}
107+
108+
private void idleRead() {
109+
LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
110+
settings.connectionString(), idlePullsBeforeNextAttempt - currentIdlePulls);
111+
currentIdlePulls++;
112+
}
113+
114+
private void resetIdlePulls() {
115+
idlePullsBeforeNextAttempt = 0;
116+
currentIdlePulls = 0;
65117
}
66118

67119
@Override

0 commit comments

Comments
 (0)