Skip to content

Commit 329b747

Browse files
Eivind Siqveland Larsenbrainlag
authored andcommitted
Limit max in flight
1 parent bf4e041 commit 329b747

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

src/main/java/com/github/brainlag/nsq/NSQConfig.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import java.net.InetAddress;
99
import java.net.UnknownHostException;
10+
import java.util.Optional;
1011

1112
public class NSQConfig {
1213

@@ -23,6 +24,7 @@ public enum Compression {NO_COMPRESSION, DEFLATE, SNAPPY}
2324
private Compression compression = Compression.NO_COMPRESSION;
2425
private Integer deflateLevel = null;
2526
private Integer sampleRate = null;
27+
private Optional<Integer> maxInFlight = Optional.empty();
2628
private String userAgent = null;
2729
private Integer msgTimeout = null;
2830
private SslContext sslContext = null;
@@ -62,6 +64,15 @@ public Integer getOutputBufferSize() {
6264
return outputBufferSize;
6365
}
6466

67+
public NSQConfig setMaxInFlight(final int maxInFlight) {
68+
this.maxInFlight = Optional.of(maxInFlight);
69+
return this;
70+
}
71+
72+
public Optional<Integer> getMaxInFlight() {
73+
return maxInFlight;
74+
}
75+
6576
public void setOutputBufferSize(final Integer outputBufferSize) {
6677
this.outputBufferSize = outputBufferSize;
6778
}
@@ -118,7 +129,6 @@ public void setMsgTimeout(final Integer msgTimeout) {
118129
this.msgTimeout = msgTimeout;
119130
}
120131

121-
122132
public SslContext getSslContext() {
123133
return sslContext;
124134
}

src/main/java/com/github/brainlag/nsq/NSQConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class NSQConsumer implements Closeable {
3939
private final AtomicLong totalMessages = new AtomicLong(0l);
4040

4141
private boolean started = false;
42-
private int messagesPerBatch = 200;
42+
private int messagesPerBatch;
4343
private long lookupPeriod = 60 * 1000; // how often to recheck for new nodes (and clean up non responsive nodes)
4444
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
4545
private Executor executor = Executors.newCachedThreadPool();
@@ -62,7 +62,7 @@ public NSQConsumer(final NSQLookup lookup, final String topic, final String chan
6262
this.config = config;
6363
this.callback = callback;
6464
this.errorCallback = errCallback;
65-
65+
this.messagesPerBatch = config.getMaxInFlight().orElse(200);
6666
}
6767

6868
public NSQConsumer start() {

0 commit comments

Comments
 (0)