Skip to content

Commit 5e63718

Browse files
author
chengyitian
committed
AJ-647: override EventClient unsubscribeInternal;
1 parent 13c7592 commit 5e63718

File tree

3 files changed

+51
-2
lines changed

3 files changed

+51
-2
lines changed

src/com/xxdb/streaming/cep/EventClient.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import com.xxdb.DBConnection;
44
import com.xxdb.comm.ErrorCodeInfo;
5+
import com.xxdb.data.BasicInt;
6+
import com.xxdb.data.BasicString;
57
import com.xxdb.data.Entity;
68
import com.xxdb.data.Utils;
79
import com.xxdb.streaming.client.AbstractClient;
@@ -12,6 +14,7 @@
1214
import java.io.IOException;
1315
import java.net.SocketException;
1416
import java.util.ArrayList;
17+
import java.util.Arrays;
1518
import java.util.List;
1619
import java.util.concurrent.BlockingQueue;
1720

@@ -83,6 +86,52 @@ public void unsubscribe(String host, int port, String tableName, String actionNa
8386
unsubscribeInternal(host, port, tableName, actionName);
8487
}
8588

89+
@Override
90+
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
91+
DBConnection dbConn = new DBConnection();
92+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
93+
List<String> usr = users.get(tp);
94+
String user = usr.get(0);
95+
String pwd = usr.get(1);
96+
if (!user.equals(""))
97+
dbConn.connect(host, port, user, pwd);
98+
else
99+
dbConn.connect(host, port);
100+
try {
101+
String localIP = this.listeningHost;
102+
if(localIP.equals(""))
103+
localIP = dbConn.getLocalAddress().getHostAddress();
104+
List<Entity> params = new ArrayList<Entity>();
105+
params.add(new BasicString(localIP));
106+
params.add(new BasicInt(this.listeningPort));
107+
params.add(new BasicString(tableName));
108+
params.add(new BasicString(actionName));
109+
110+
dbConn.run("stopPublishTable", params);
111+
String topic = null;
112+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
113+
synchronized (tableNameToTrueTopic) {
114+
topic = tableNameToTrueTopic.get(fullTableName);
115+
}
116+
synchronized (trueTopicToSites) {
117+
Site[] sites = trueTopicToSites.get(topic);
118+
if (sites == null || sites.length == 0)
119+
;
120+
for (int i = 0; i < sites.length; i++)
121+
sites[i].closed = true;
122+
}
123+
synchronized (queueManager) {
124+
queueManager.removeQueue(topic);
125+
}
126+
log.info("Successfully unsubscribed table " + fullTableName);
127+
} catch (Exception ex) {
128+
throw ex;
129+
} finally {
130+
dbConn.close();
131+
}
132+
return;
133+
}
134+
86135

87136
@Override
88137
protected boolean doReconnect(Site site) {

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public class Site {
178178
long msgId;
179179
boolean reconnect;
180180
Vector filter = null;
181-
boolean closed = false;
181+
public boolean closed = false;
182182
boolean allowExistTopic = false;
183183
StreamDeserializer deserializer;
184184
String userName = "";

src/com/xxdb/streaming/client/QueueManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import java.util.concurrent.ArrayBlockingQueue;
55
import java.util.concurrent.BlockingQueue;
66

7-
class QueueManager {
7+
public class QueueManager {
88
private HashMap<String, BlockingQueue<List<IMessage>>> queueMap = new HashMap<String, BlockingQueue<List<IMessage>>>();
99

1010
public synchronized BlockingQueue<List<IMessage>> addQueue(String topic) {

0 commit comments

Comments
 (0)