Skip to content

Commit 803e71a

Browse files
author
chengyitian
committed
Merge remote-tracking branch 'origin/dev' into dev
2 parents c3f2794 + f9eaacb commit 803e71a

38 files changed

+2535
-363
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 457 additions & 139 deletions
Large diffs are not rendered by default.

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ public void setRunningThread(Thread runningThread){
118118
@Override
119119
public void run() {
120120
while (!pThread.isInterrupted()) {
121+
if (!AbstractClient.ifUseBackupSite) {
122+
// original logic:
121123
for (String site : this.dispatcher.getAllReconnectSites()) {
122124
if (dispatcher.getNeedReconnect(site) == 1) {
123125
Site s = dispatcher.getSiteByName(site);
@@ -150,10 +152,47 @@ public void run() {
150152
}
151153
}
152154

153-
try {
154-
Thread.sleep(1000);
155-
} catch (InterruptedException e) {
156-
break;
155+
try {
156+
Thread.sleep(1000);
157+
} catch (InterruptedException e) {
158+
break;
159+
}
160+
} else {
161+
// if set backupSite
162+
for (String site : this.dispatcher.getAllReconnectSites()) {
163+
if (dispatcher.getNeedReconnect(site) == 1) {
164+
Site s = dispatcher.getCurrentSiteByName(site);
165+
dispatcher.activeCloseConnection(s);
166+
for (String topic : dispatcher.getAllTopicsBySite(site)) {
167+
System.out.println("Daemon need reconnect: " + topic);
168+
// reconnect every info.resubTimeout ms
169+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
170+
continue;
171+
172+
dispatcher.tryReconnect(topic);
173+
}
174+
} else {
175+
Site s = dispatcher.getSiteByName(site);
176+
dispatcher.activeCloseConnection(s);
177+
for (String topic : dispatcher.getAllTopicsBySite(site)) {
178+
// reconnect every info.resubTimeout ms
179+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
180+
continue;
181+
182+
dispatcher.tryReconnect(topic);
183+
dispatcher.setReconnectTimestamp(site, System.currentTimeMillis());
184+
}
185+
}
186+
}
187+
188+
// not need to put and get from waitReconnectTopic.
189+
190+
try {
191+
// check reconnected interval time
192+
Thread.sleep(10);
193+
} catch (InterruptedException e) {
194+
break;
195+
}
157196
}
158197
}
159198
}

src/com/xxdb/streaming/client/MessageDispatcher.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ interface MessageDispatcher {
3232

3333
Site getSiteByName(String site);
3434

35+
Site getCurrentSiteByName(String site);
36+
3537
void activeCloseConnection(Site site);
3638

3739
List<String> getAllTopicsBySite(String site);

src/com/xxdb/streaming/client/MessageParser.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ public void run() {
107107
body = BasicEntityFactory.instance().createEntity(df, dt, in, extended);
108108
if (body.isTable() && body.rows() == 0) {
109109
for (String t : topic.split(",")) {
110+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
110111
dispatcher.setNeedReconnect(t, 0);
111112
}
112113
assert (body.rows() == 0);
@@ -161,13 +162,16 @@ public void run() {
161162
} catch (Exception e) {
162163
e.printStackTrace();
163164
if (dispatcher.isClosed(topic)) {
164-
log.error("check " + topic + " is unsubscribed");
165+
if (!AbstractClient.ifUseBackupSite)
166+
log.error("check " + topic + " is unsubscribed");
165167
return;
166168
} else {
169+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
167170
dispatcher.setNeedReconnect(topic, 1);
168171
}
169172
} catch (Throwable t) {
170173
t.printStackTrace();
174+
AbstractClient.lastExceptionTopicTimeMap.put(topic, System.currentTimeMillis());
171175
dispatcher.setNeedReconnect(topic, 1);
172176
} finally {
173177
try {

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 100 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
public class PollingClient extends AbstractClient {
2020
TopicPoller topicPoller = null;
21-
private HashMap<List<String>, List<String>> users = new HashMap<>();
21+
// private HashMap<List<String>, List<String>> users = new HashMap<>();
2222

2323
private static final Logger log = LoggerFactory.getLogger(PollingClient.class);
2424

@@ -36,19 +36,46 @@ public PollingClient(String subscribeHost, int subscribePort) throws SocketExcep
3636

3737
@Override
3838
protected boolean doReconnect(Site site) {
39-
try {
40-
Thread.sleep(1000);
41-
BlockingQueue<List<IMessage>> queue = subscribeInternal(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable);
42-
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
43-
topicPoller.setQueue(queue);
44-
return true;
45-
} catch (Exception ex) {
46-
log.error("Unable to subscribe table. Will try again after 1 seconds.");
47-
ex.printStackTrace();
48-
return false;
39+
if (!AbstractClient.ifUseBackupSite) {
40+
// not enable backupSite, use original logic.
41+
try {
42+
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
43+
Thread.sleep(1000);
44+
BlockingQueue<List<IMessage>> queue = subscribeInternal(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable);
45+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
46+
topicPoller.setQueue(queue);
47+
return true;
48+
} catch (Exception ex) {
49+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
50+
ex.printStackTrace();
51+
return false;
52+
}
53+
} else {
54+
// enable backupSite, try to switch site and subscribe.
55+
try {
56+
log.info("PollingClient doReconnect: " + site.host + ":" + site.port);
57+
Thread.sleep(1000);
58+
subscribe(site.host, site.port, site.tableName, site.actionName, (MessageHandler) null, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, site.msgAstable, false);
59+
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
60+
String curTopic = tableNameToTrueTopic.get(topicStr);
61+
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(curTopic);
62+
queueManager.changeQueue(curTopic, lastQueue);
63+
64+
log.info("Successfully reconnected and subscribed " + site.host + ":" + site.port + ":" + site.tableName);
65+
topicPoller.setQueue(lastQueue);
66+
return true;
67+
} catch (Exception ex) {
68+
log.error("Unable to subscribe table. Will try again after 1 seconds.");
69+
ex.printStackTrace();
70+
return false;
71+
}
4972
}
5073
}
5174

75+
protected void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String password, boolean msgAsTable, boolean createSubInfo) throws IOException {
76+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, createSubInfo);
77+
}
78+
5279
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord) throws IOException {
5380
return subscribe(host, port, tableName, actionName, offset, reconnect, filter, deserializer, userName, passWord, false);
5481
}
@@ -57,7 +84,23 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
5784
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable);
5885
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
5986
List<String> usr = Arrays.asList(userName, passWord);
60-
users.put(tp, usr);
87+
// users.put(tp, usr);
88+
topicPoller = new TopicPoller(queue);
89+
return topicPoller;
90+
}
91+
92+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
93+
if (resubTimeout < 0)
94+
// resubTimeout default: 100ms
95+
resubTimeout = 100;
96+
97+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
98+
topicPoller = new TopicPoller(queue);
99+
return topicPoller;
100+
}
101+
102+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
103+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, 100, false);
61104
topicPoller = new TopicPoller(queue);
62105
return topicPoller;
63106
}
@@ -120,47 +163,59 @@ public void unsubscribe(String host, int port, String tableName) throws IOExcept
120163

121164
@Override
122165
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
123-
DBConnection dbConn = new DBConnection();
124-
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
125-
List<String> usr = users.get(tp);
126-
String user = usr.get(0);
127-
String pwd = usr.get(1);
128-
if (!user.equals(""))
129-
dbConn.connect(host, port, user, pwd);
130-
else
131-
dbConn.connect(host, port);
132-
try {
133-
String localIP = this.listeningHost;
134-
if(localIP.equals(""))
135-
localIP = dbConn.getLocalAddress().getHostAddress();
136-
List<Entity> params = new ArrayList<Entity>();
137-
params.add(new BasicString(localIP));
138-
params.add(new BasicInt(this.listeningPort));
139-
params.add(new BasicString(tableName));
140-
params.add(new BasicString(actionName));
141-
142-
dbConn.run("stopPublishTable", params);
143-
String topic = null;
144-
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
145-
synchronized (tableNameToTrueTopic) {
146-
topic = tableNameToTrueTopic.get(fullTableName);
166+
synchronized (this) {
167+
DBConnection dbConn = new DBConnection();
168+
if (!currentSiteIndexMap.isEmpty()) {
169+
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
170+
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
171+
Site[] sites = trueTopicToSites.get(topic);
172+
host = sites[currentSiteIndex].host;
173+
port = sites[currentSiteIndex].port;
147174
}
148-
synchronized (trueTopicToSites) {
175+
176+
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
177+
List<String> usr = users.get(tp);
178+
String user = usr.get(0);
179+
String pwd = usr.get(1);
180+
if (!user.equals(""))
181+
dbConn.connect(host, port, user, pwd);
182+
else
183+
dbConn.connect(host, port);
184+
try {
185+
String localIP = this.listeningHost;
186+
if(localIP.equals(""))
187+
localIP = dbConn.getLocalAddress().getHostAddress();
188+
List<Entity> params = new ArrayList<Entity>();
189+
params.add(new BasicString(localIP));
190+
params.add(new BasicInt(this.listeningPort));
191+
params.add(new BasicString(tableName));
192+
params.add(new BasicString(actionName));
193+
194+
dbConn.run("stopPublishTable", params);
195+
String topic = null;
196+
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
197+
topic = tableNameToTrueTopic.get(fullTableName);
198+
149199
Site[] sites = trueTopicToSites.get(topic);
150200
if (sites == null || sites.length == 0)
151201
;
152202
for (int i = 0; i < sites.length; i++)
153203
sites[i].closed = true;
154-
}
155-
synchronized (queueManager) {
204+
156205
queueManager.removeQueue(topic);
206+
207+
// init backupSites related params.
208+
if (AbstractClient.ifUseBackupSite) {
209+
AbstractClient.ifUseBackupSite = false;
210+
AbstractClient.subOnce = false;
211+
AbstractClient.resubTimeout = 100;
212+
}
213+
log.info("Successfully unsubscribed table " + fullTableName);
214+
} catch (Exception ex) {
215+
throw ex;
216+
} finally {
217+
dbConn.close();
157218
}
158-
log.info("Successfully unsubscribed table " + fullTableName);
159-
} catch (Exception ex) {
160-
throw ex;
161-
} finally {
162-
dbConn.close();
163219
}
164-
return;
165220
}
166221
}

src/com/xxdb/streaming/client/QueueManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ public synchronized BlockingQueue<List<IMessage>> addQueue(String topic) {
1616
throw new RuntimeException("Topic " + topic + " already subscribed");
1717
}
1818

19+
public synchronized void changeQueue(String topic, BlockingQueue<List<IMessage>> q) {
20+
if (queueMap.containsKey(topic)) {
21+
queueMap.put(topic, q);
22+
} else {
23+
throw new RuntimeException("Topic " + topic + " doesn't subscribed");
24+
}
25+
}
26+
1927
public synchronized BlockingQueue<List<IMessage>> getQueue(String topic) {
2028
BlockingQueue<List<IMessage>> q = queueMap.get(topic);
2129
return q;

0 commit comments

Comments
 (0)