Skip to content

Commit 905e57a

Browse files
author
chengyitian
committed
AJ-677: fix incompatibility problem about ThreadPooledClient;
1 parent 4705b36 commit 905e57a

File tree

1 file changed

+81
-29
lines changed

1 file changed

+81
-29
lines changed

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -233,21 +233,11 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
233233

234234
@Override
235235
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
236-
String originHost = host;
237-
int originPort = port;
238-
239-
synchronized (this) {
236+
if (!ifUseBackupSite) {
237+
// original logic:
240238
DBConnection dbConn = new DBConnection();
241-
if (!currentSiteIndexMap.isEmpty()) {
242-
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
243-
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
244-
Site[] sites = trueTopicToSites.get(topic);
245-
host = sites[currentSiteIndex].host;
246-
port = sites[currentSiteIndex].port;
247-
}
248-
249-
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
250-
List<String> usr = users.get(tp);
239+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
240+
List<String> usr = users.get(fullTableName);
251241
String user = usr.get(0);
252242
String pwd = usr.get(1);
253243
if (!user.equals(""))
@@ -266,32 +256,94 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
266256

267257
dbConn.run("stopPublishTable", params);
268258
String topic = null;
269-
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
270-
// synchronized (tableNameToTrueTopic) {
271-
topic = tableNameToTrueTopic.get(fullTableName);
272-
// }
273-
// synchronized (trueTopicToSites) {
274-
Site[] sites = trueTopicToSites.get(topic);
275-
if (sites == null || sites.length == 0)
276-
;
277-
for (int i = 0; i < sites.length; i++)
278-
sites[i].closed = true;
279-
// }
280-
// synchronized (queueManager) {
281-
queueManager.removeQueue(topic);
282-
// }
259+
synchronized (tableNameToTrueTopic) {
260+
topic = tableNameToTrueTopic.get(fullTableName);
261+
}
262+
synchronized (trueTopicToSites) {
263+
Site[] sites = trueTopicToSites.get(topic);
264+
if (sites == null || sites.length == 0)
265+
;
266+
for (int i = 0; i < sites.length; i++)
267+
sites[i].closed = true;
268+
}
269+
synchronized (queueManager) {
270+
queueManager.removeQueue(topic);
271+
}
272+
283273
log.info("Successfully unsubscribed table " + fullTableName);
284274
} catch (Exception ex) {
285275
throw ex;
286276
} finally {
287277
dbConn.close();
288-
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
278+
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
289279
QueueHandlerBinder queueHandler =null;
290280
synchronized (queueHandlers){
291281
queueHandler = queueHandlers.get(topicStr);
292282
queueHandlers.remove(topicStr);
293283
}
294284
}
285+
} else {
286+
// use backBackSite
287+
String originHost = host;
288+
int originPort = port;
289+
290+
synchronized (this) {
291+
DBConnection dbConn = new DBConnection();
292+
if (!currentSiteIndexMap.isEmpty()) {
293+
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
294+
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
295+
Site[] sites = trueTopicToSites.get(topic);
296+
host = sites[currentSiteIndex].host;
297+
port = sites[currentSiteIndex].port;
298+
}
299+
300+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
301+
List<String> usr = users.get(tp);
302+
String user = usr.get(0);
303+
String pwd = usr.get(1);
304+
if (!user.equals(""))
305+
dbConn.connect(host, port, user, pwd);
306+
else
307+
dbConn.connect(host, port);
308+
try {
309+
String localIP = this.listeningHost;
310+
if(localIP.equals(""))
311+
localIP = dbConn.getLocalAddress().getHostAddress();
312+
List<Entity> params = new ArrayList<Entity>();
313+
params.add(new BasicString(localIP));
314+
params.add(new BasicInt(this.listeningPort));
315+
params.add(new BasicString(tableName));
316+
params.add(new BasicString(actionName));
317+
318+
dbConn.run("stopPublishTable", params);
319+
String topic = null;
320+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
321+
// synchronized (tableNameToTrueTopic) {
322+
topic = tableNameToTrueTopic.get(fullTableName);
323+
// }
324+
// synchronized (trueTopicToSites) {
325+
Site[] sites = trueTopicToSites.get(topic);
326+
if (sites == null || sites.length == 0)
327+
;
328+
for (int i = 0; i < sites.length; i++)
329+
sites[i].closed = true;
330+
// }
331+
// synchronized (queueManager) {
332+
queueManager.removeQueue(topic);
333+
// }
334+
log.info("Successfully unsubscribed table " + fullTableName);
335+
} catch (Exception ex) {
336+
throw ex;
337+
} finally {
338+
dbConn.close();
339+
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
340+
QueueHandlerBinder queueHandler =null;
341+
synchronized (queueHandlers){
342+
queueHandler = queueHandlers.get(topicStr);
343+
queueHandlers.remove(topicStr);
344+
}
345+
}
346+
}
295347
}
296348
}
297349

0 commit comments

Comments
 (0)