17
17
import java .util .Map ;
18
18
import java .util .Optional ;
19
19
import java .util .Set ;
20
- import java .util .concurrent .ExecutorService ;
20
+ import java .util .concurrent .Executor ;
21
21
import java .util .concurrent .Executors ;
22
22
import java .util .concurrent .RejectedExecutionException ;
23
23
import java .util .concurrent .ScheduledExecutorService ;
@@ -42,7 +42,7 @@ public class NSQConsumer implements Closeable {
42
42
private int messagesPerBatch = 200 ;
43
43
private long lookupPeriod = 60 * 1000 ; // how often to recheck for new nodes (and clean up non responsive nodes)
44
44
private ScheduledExecutorService scheduler = Executors .newSingleThreadScheduledExecutor ();
45
- private ExecutorService executor = Executors .newCachedThreadPool ();
45
+ private Executor executor = Executors .newCachedThreadPool ();
46
46
private Optional <ScheduledFuture <?>> timeout = Optional .empty ();
47
47
48
48
public NSQConsumer (final NSQLookup lookup , final String topic , final String channel , final NSQMessageCallback callback ) {
@@ -184,7 +184,7 @@ public NSQConsumer setLookupPeriod(final long periodMillis) {
184
184
private void connect () {
185
185
for (final Iterator <Map .Entry <ServerAddress , Connection >> it = connections .entrySet ().iterator (); it .hasNext (); ) {
186
186
Connection cnn = it .next ().getValue ();
187
- if (!cnn .isConnected () || !cnn .isHeartbeatStatusOK ()){
187
+ if (!cnn .isConnected () || !cnn .isHeartbeatStatusOK ()) {
188
188
//force close
189
189
cnn .close ();
190
190
it .remove ();
@@ -227,7 +227,7 @@ public long getTotalMessages() {
227
227
* The executer can only changed before the client is started.
228
228
* Default is a cached threadpool.
229
229
*/
230
- public NSQConsumer setExecutor (final ExecutorService executor ) {
230
+ public NSQConsumer setExecutor (final Executor executor ) {
231
231
if (!started ) {
232
232
this .executor = executor ;
233
233
}
0 commit comments