Skip to content

Commit 3776ed8

Browse files
author
chengyitian
committed
AJ-685: fix issue about ThreadPooledClient unsubscribe;
1 parent 056c019 commit 3776ed8

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
public class ThreadPooledClient extends AbstractClient {
2121
private static int CORES = Runtime.getRuntime().availableProcessors();
2222
private ExecutorService threadPool;
23-
private HashMap<String, List<String>> users = new HashMap<>();
23+
// private HashMap<String, List<String>> users = new HashMap<>();
2424
private Object lock = new Object();
2525

2626
private static final Logger log = LoggerFactory.getLogger(ThreadPooledClient.class);
@@ -143,7 +143,7 @@ public void subscribe(String host, int port, String tableName, String actionName
143143
List<String> usr = Arrays.asList(userName, passWord);
144144
synchronized (queueHandlers) {
145145
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
146-
users.put(topicStr, usr);
146+
// users.put(topicStr, usr);
147147
}
148148
}
149149

@@ -157,7 +157,7 @@ public void subscribe(String host, int port, String tableName, String actionName
157157
List<String> usr = Arrays.asList(userName, passWord);
158158
synchronized (queueHandlers) {
159159
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
160-
users.put(topicStr, usr);
160+
// users.put(topicStr, usr);
161161
}
162162
}
163163

@@ -167,7 +167,7 @@ public void subscribe(String host, int port, String tableName, String actionName
167167
List<String> usr = Arrays.asList(userName, passWord);
168168
synchronized (queueHandlers) {
169169
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
170-
users.put(topicStr, usr);
170+
// users.put(topicStr, usr);
171171
}
172172
}
173173

@@ -236,8 +236,10 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
236236
if (!ifUseBackupSite) {
237237
// original logic:
238238
DBConnection dbConn = new DBConnection();
239-
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
240-
List<String> usr = users.get(fullTableName);
239+
// String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
240+
// List<String> usr = users.get(fullTableName);
241+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
242+
List<String> usr = users.get(tp);
241243
String user = usr.get(0);
242244
String pwd = usr.get(1);
243245
if (!user.equals(""))
@@ -256,6 +258,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
256258

257259
dbConn.run("stopPublishTable", params);
258260
String topic = null;
261+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
259262
synchronized (tableNameToTrueTopic) {
260263
topic = tableNameToTrueTopic.get(fullTableName);
261264
}

0 commit comments

Comments
 (0)