Skip to content

Commit 3fa8d23

Browse files
authored
Merge pull request #28 from icanfly/master
add heartbeat interval exceeded check
2 parents 6f7c065 + 269fafe commit 3fa8d23

File tree

2 files changed

+16
-1
lines changed

2 files changed

+16
-1
lines changed

src/main/java/com/github/brainlag/nsq/Connection.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.LinkedBlockingQueue;
2525
import java.util.concurrent.TimeUnit;
2626
import java.util.concurrent.TimeoutException;
27+
import java.util.concurrent.atomic.AtomicReference;
2728

2829
public class Connection {
2930
public static final byte[] MAGIC_PROTOCOL_VERSION = " V2".getBytes();
@@ -39,6 +40,9 @@ public class Connection {
3940
private final EventLoopGroup eventLoopGroup;
4041
private final NSQConfig config;
4142

43+
public static final long HEARTBEAT_MAX_INTERVAL = 1L*60L*1000L;//default one minute
44+
private volatile AtomicReference<Long> lastHeartbeatSuccess = new AtomicReference<Long>(System.currentTimeMillis());
45+
4246

4347
public Connection(final ServerAddress serverAddress, final NSQConfig config) throws NoConnectionsException {
4448
this.address = serverAddress;
@@ -92,6 +96,13 @@ public boolean isRequestInProgress() {
9296
return requests.size() > 0;
9397
}
9498

99+
public boolean isHeartbeatStatusOK(){
100+
if(System.currentTimeMillis() - lastHeartbeatSuccess.get() > HEARTBEAT_MAX_INTERVAL){
101+
return false;
102+
}
103+
return true;
104+
}
105+
95106
public void incoming(final NSQFrame frame) {
96107
if (frame instanceof ResponseFrame) {
97108
if ("_heartbeat_".equals(((ResponseFrame) frame).getMessage())) {
@@ -137,6 +148,7 @@ public void incoming(final NSQFrame frame) {
137148
private void heartbeat() {
138149
LogManager.getLogger(this).trace("HEARTBEAT!");
139150
command(NSQCommand.instance("NOP"));
151+
lastHeartbeatSuccess.getAndSet(System.currentTimeMillis());
140152
}
141153

142154
public void setErrorCallback(final NSQErrorCallback callback) {

src/main/java/com/github/brainlag/nsq/NSQConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,10 @@ public NSQConsumer setLookupPeriod(final long periodMillis) {
183183

184184
private void connect() {
185185
for (final Iterator<Map.Entry<ServerAddress, Connection>> it = connections.entrySet().iterator(); it.hasNext(); ) {
186-
if (!it.next().getValue().isConnected()) {
186+
Connection cnn = it.next().getValue();
187+
if(!cnn.isConnected() || !cnn.isHeartbeatStatusOK()){
188+
//force close
189+
cnn.close();
187190
it.remove();
188191
}
189192
}

0 commit comments

Comments
 (0)