Skip to content

Commit dcece13

Browse files
author
chengyitian
committed
AJ-686: optimize 'threadPool management' for ThreadPooledClient;
1 parent 3776ed8 commit dcece13

File tree

1 file changed

+59
-11
lines changed

1 file changed

+59
-11
lines changed

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
import java.util.concurrent.BlockingQueue;
1616
import java.util.concurrent.ExecutorService;
1717
import java.util.concurrent.Executors;
18+
import java.util.concurrent.TimeUnit;
1819

1920

2021
public class ThreadPooledClient extends AbstractClient {
2122
private static int CORES = Runtime.getRuntime().availableProcessors();
2223
private ExecutorService threadPool;
2324
// private HashMap<String, List<String>> users = new HashMap<>();
2425
private Object lock = new Object();
26+
private int threadCount = -1;
2527

2628
private static final Logger log = LoggerFactory.getLogger(ThreadPooledClient.class);
2729

@@ -54,6 +56,7 @@ public ThreadPooledClient(String subscribeHost, int subscribePort, int threadCou
5456
if (threadCount <= 0)
5557
throw new RuntimeException("The 'threadCount' parameter cannot be less than or equal to zero.");
5658

59+
this.threadCount = threadCount;
5760
threadPool = Executors.newFixedThreadPool(threadCount);
5861
new Thread() {
5962
private LinkedList<IMessage> backlog = new LinkedList<>();
@@ -119,20 +122,65 @@ public void run() {
119122
}
120123

121124
protected boolean doReconnect(Site site) {
122-
log.info("ThreadPooledClient doReconnect: " + site.host + ":" + site.port);
123-
threadPool.shutdownNow();
124-
try {
125-
Thread.sleep(1000);
126-
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
127-
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + site.actionName);
128-
return true;
129-
} catch (Exception ex) {
130-
log.error("Unable to subscribe table. Will try again after 1 seconds.");
131-
ex.printStackTrace();
132-
return false;
125+
if (!AbstractClient.ifUseBackupSite) {
126+
// not enable backupSite, use original logic
127+
threadPool.shutdownNow();
128+
try {
129+
Thread.sleep(1000);
130+
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord);
131+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + site.actionName);
132+
return true;
133+
} catch (Exception ex) {
134+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
135+
ex.printStackTrace();
136+
return false;
137+
}
138+
} else {
139+
// enable backupSite, try to switch site and subscribe.
140+
log.info("ThreadPooledClient doReconnect: " + site.host + ":" + site.port);
141+
try {
142+
Thread.sleep(1000);
143+
backupSitesSubscribeInternal(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
144+
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
145+
String curTopic = tableNameToTrueTopic.get(topicStr);
146+
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(curTopic);
147+
queueManager.changeQueue(curTopic, lastQueue);
148+
149+
QueueHandlerBinder queueHandlerBinder = queueHandlers.get(lastBackupSiteTopic);
150+
queueHandlers.put(curTopic, queueHandlerBinder);
151+
152+
// shutdown last threadPool, and wait all tasks to finish.
153+
threadPool.shutdown();
154+
try {
155+
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
156+
threadPool.shutdownNow();
157+
if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
158+
log.error("last threadPool did not terminate.");
159+
}
160+
}
161+
} catch (InterruptedException ie) {
162+
threadPool.shutdownNow();
163+
Thread.currentThread().interrupt();
164+
}
165+
166+
// after last threadPool completely shutdown,create a new threadPool(because ExecutorService lifecycle is unidirectional).
167+
if (threadPool.isTerminated())
168+
threadPool = Executors.newFixedThreadPool(threadCount);
169+
170+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + site.actionName);
171+
return true;
172+
} catch (Exception ex) {
173+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
174+
ex.printStackTrace();
175+
return false;
176+
}
133177
}
134178
}
135179

180+
protected void backupSitesSubscribeInternal(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean createSubInfo) throws IOException {
181+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, createSubInfo);
182+
}
183+
136184
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord) throws IOException {
137185
subscribe(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, false);
138186
}

0 commit comments

Comments
 (0)