Skip to content

Commit 359d1ac

Browse files
author
chengyitian
committed
AJ-650、AJ-651: adjust package structure for cep; extract Site from AbstractClient; rewrite EvenClient doReconnect;
1 parent 36db3a8 commit 359d1ac

18 files changed

+142
-64
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 1 addition & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -169,41 +169,6 @@ public Map<String, StreamDeserializer> getSubInfos(){
169169
return subInfos_;
170170
}
171171

172-
public class Site {
173-
String host;
174-
int port;
175-
String tableName;
176-
String actionName;
177-
MessageHandler handler;
178-
long msgId;
179-
boolean reconnect;
180-
Vector filter = null;
181-
public boolean closed = false;
182-
boolean allowExistTopic = false;
183-
StreamDeserializer deserializer;
184-
String userName = "";
185-
String passWord = "";
186-
187-
boolean msgAstable = false;
188-
189-
Site(String host, int port, String tableName, String actionName,
190-
MessageHandler handler, long msgId, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAstable) {
191-
this.host = host;
192-
this.port = port;
193-
this.tableName = tableName;
194-
this.actionName = actionName;
195-
this.handler = handler;
196-
this.msgId = msgId;
197-
this.reconnect = reconnect;
198-
this.filter = filter;
199-
this.allowExistTopic = allowExistTopic;
200-
this.deserializer = deserializer;
201-
this.userName = userName;
202-
this.passWord = passWord;
203-
this.msgAstable = msgAstable;
204-
}
205-
}
206-
207172
abstract protected boolean doReconnect(Site site);
208173

209174
public void setMsgId(String topic, long msgId) {
@@ -636,7 +601,7 @@ void checkServerVersion(String host, int port) throws IOException {
636601
}
637602
}
638603

639-
public ConcurrentHashMap<String, AbstractClient.Site[]> getTopicToSites() {
604+
public ConcurrentHashMap<String, Site[]> getTopicToSites() {
640605
return trueTopicToSites;
641606
}
642607
}

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void run() {
120120
while (!pThread.isInterrupted()) {
121121
for (String site : this.dispatcher.getAllReconnectSites()) {
122122
if (dispatcher.getNeedReconnect(site) == 1) {
123-
AbstractClient.Site s = dispatcher.getSiteByName(site);
123+
Site s = dispatcher.getSiteByName(site);
124124
dispatcher.activeCloseConnection(s);
125125
String lastTopic = "";
126126
for (String topic : dispatcher.getAllTopicsBySite(site)) {
@@ -133,7 +133,7 @@ public void run() {
133133
// try reconnect after 3 second when reconnecting stat
134134
long ts = dispatcher.getReconnectTimestamp(site);
135135
if (System.currentTimeMillis() >= ts + 3000) {
136-
AbstractClient.Site s = dispatcher.getSiteByName(site);
136+
Site s = dispatcher.getSiteByName(site);
137137
dispatcher.activeCloseConnection(s);
138138
for (String topic : dispatcher.getAllTopicsBySite(site)) {
139139
log.info("try to reconnect topic " + topic);

src/com/xxdb/streaming/client/MessageDispatcher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@ interface MessageDispatcher {
3030

3131
List<String> getAllReconnectSites();
3232

33-
AbstractClient.Site getSiteByName(String site);
33+
Site getSiteByName(String site);
3434

35-
void activeCloseConnection(AbstractClient.Site site);
35+
void activeCloseConnection(Site site);
3636

3737
List<String> getAllTopicsBySite(String site);
3838

3939
Set<String> getAllReconnectTopic();
4040

4141
Map<String, StreamDeserializer> getSubInfos();
4242

43-
ConcurrentHashMap<String, AbstractClient.Site[]> getTopicToSites();
43+
ConcurrentHashMap<String, Site[]> getTopicToSites();
4444
}
4545

4646

src/com/xxdb/streaming/client/MessageParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private Boolean isListenMode(){
4545

4646
public void run() {
4747
Map<String, StreamDeserializer> subinfos = dispatcher.getSubInfos();
48-
ConcurrentHashMap<String, AbstractClient.Site[]> topicToSites = dispatcher.getTopicToSites();
48+
ConcurrentHashMap<String, Site[]> topicToSites = dispatcher.getTopicToSites();
4949
Socket socket = null;
5050
try {
5151
DBConnection conn;
@@ -121,7 +121,7 @@ public void run() {
121121
} else if (body.isVector()) {
122122
BasicAnyVector dTable = (BasicAnyVector) body;
123123

124-
AbstractClient.Site[] sites = topicToSites.get(topic);
124+
Site[] sites = topicToSites.get(topic);
125125
int colSize = dTable.rows();
126126
int rowSize = dTable.getEntity(0).rows();
127127
if (sites != null && sites[0].msgAstable) {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.xxdb.streaming.client;
2+
3+
import com.xxdb.data.Vector;
4+
5+
public class Site {
6+
String host;
7+
int port;
8+
String tableName;
9+
String actionName;
10+
MessageHandler handler;
11+
long msgId;
12+
boolean reconnect;
13+
Vector filter = null;
14+
boolean closed = false;
15+
boolean allowExistTopic = false;
16+
StreamDeserializer deserializer;
17+
String userName = "";
18+
String passWord = "";
19+
20+
boolean msgAstable = false;
21+
22+
Site(String host, int port, String tableName, String actionName,
23+
MessageHandler handler, long msgId, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAstable) {
24+
this.host = host;
25+
this.port = port;
26+
this.tableName = tableName;
27+
this.actionName = actionName;
28+
this.handler = handler;
29+
this.msgId = msgId;
30+
this.reconnect = reconnect;
31+
this.filter = filter;
32+
this.allowExistTopic = allowExistTopic;
33+
this.deserializer = deserializer;
34+
this.userName = userName;
35+
this.passWord = passWord;
36+
this.msgAstable = msgAstable;
37+
}
38+
39+
public String getHost() {
40+
return host;
41+
}
42+
43+
public int getPort() {
44+
return port;
45+
}
46+
47+
public String getTableName() {
48+
return tableName;
49+
}
50+
51+
public String getActionName() {
52+
return actionName;
53+
}
54+
55+
public MessageHandler getHandler() {
56+
return handler;
57+
}
58+
59+
public long getMsgId() {
60+
return msgId;
61+
}
62+
63+
public boolean isReconnect() {
64+
return reconnect;
65+
}
66+
67+
public Vector getFilter() {
68+
return filter;
69+
}
70+
71+
public boolean isClosed() {
72+
return closed;
73+
}
74+
75+
public void setClosed(boolean closed) {
76+
this.closed = closed;
77+
}
78+
79+
public boolean isAllowExistTopic() {
80+
return allowExistTopic;
81+
}
82+
83+
public StreamDeserializer getDeserializer() {
84+
return deserializer;
85+
}
86+
87+
public String getUserName() {
88+
return userName;
89+
}
90+
91+
public String getPassWord() {
92+
return passWord;
93+
}
94+
95+
public boolean isMsgAstable() {
96+
return msgAstable;
97+
}
98+
}

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
import java.net.SocketException;
1313
import java.text.DateFormat;
1414
import java.text.SimpleDateFormat;
15-
import java.time.Duration;
1615
import java.util.*;
17-
import java.time.LocalTime;
1816
import java.util.concurrent.BlockingQueue;
1917
import java.util.concurrent.TimeUnit;
2018

src/com/xxdb/streaming/client/TopicManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ public List<String> getAllTopic() {
8080
return re;
8181
}
8282

83-
public AbstractClient.Site[] getSites(String topic) {
83+
public Site[] getSites(String topic) {
8484
TopicInfo ti = getTopicInfo(topic);
85-
AbstractClient.Site[] q = ti.sites;
85+
Site[] q = ti.sites;
8686
return q;
8787
}
8888

@@ -91,7 +91,7 @@ public String getTopic(String HATopic) {
9191
return ti.originTopic;
9292
}
9393

94-
public void setSites(String topic, AbstractClient.Site[] sites) {
94+
public void setSites(String topic, Site[] sites) {
9595
TopicInfo ti = getTopicInfo(topic);
9696
ti.sites = sites;
9797
}
@@ -101,7 +101,7 @@ private class TopicInfo {
101101
private String originTopic; // originTopic is same to
102102
private BlockingQueue<List<IMessage>> messageQueue;
103103
private ConcurrentHashMap<String, Integer> nameToIndex;
104-
private AbstractClient.Site[] sites;
104+
private Site[] sites;
105105
private int reconnectStat;
106106

107107
public TopicInfo(String topic) {
@@ -133,11 +133,11 @@ public int getReconnectStat() {
133133
return this.reconnectStat;
134134
}
135135

136-
public void setSites(AbstractClient.Site[] sites) {
136+
public void setSites(Site[] sites) {
137137
this.sites = sites;
138138
}
139139

140-
public AbstractClient.Site[] getSites() {
140+
public Site[] getSites() {
141141
return this.sites;
142142
}
143143
}

src/com/xxdb/streaming/cep/AttributeSerializer.java renamed to src/com/xxdb/streaming/client/cep/AttributeSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.xxdb.streaming.cep;
1+
package com.xxdb.streaming.client.cep;
22

33
import com.xxdb.data.Entity;
44
import com.xxdb.io.ExtendedDataOutput;

src/com/xxdb/streaming/cep/EventClient.java renamed to src/com/xxdb/streaming/client/cep/EventClient.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.xxdb.streaming.cep;
1+
package com.xxdb.streaming.client.cep;
22

33
import com.xxdb.DBConnection;
44
import com.xxdb.comm.ErrorCodeInfo;
@@ -9,12 +9,16 @@
99
import com.xxdb.streaming.client.AbstractClient;
1010
import com.xxdb.streaming.client.IMessage;
1111
import com.xxdb.streaming.client.MessageHandler;
12+
import com.xxdb.streaming.client.Site;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415
import java.io.IOException;
1516
import java.net.SocketException;
17+
import java.text.DateFormat;
18+
import java.text.SimpleDateFormat;
1619
import java.util.ArrayList;
1720
import java.util.Arrays;
21+
import java.util.Date;
1822
import java.util.List;
1923
import java.util.concurrent.BlockingQueue;
2024

@@ -118,7 +122,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
118122
if (sites == null || sites.length == 0)
119123
;
120124
for (int i = 0; i < sites.length; i++)
121-
sites[i].closed = true;
125+
sites[i].setClosed(true);
122126
}
123127
synchronized (queueManager) {
124128
queueManager.removeQueue(topic);
@@ -135,6 +139,19 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
135139

136140
@Override
137141
protected boolean doReconnect(Site site) {
138-
return false;
142+
try {
143+
site.getHost();
144+
subscribe(site.getHost(), site.getPort(), site.getTableName(), site.getActionName(), site.getHandler(), site.getMsgId() + 1, true, site.getUserName(), site.getPassWord());
145+
Date d = new Date();
146+
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
147+
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
148+
return true;
149+
} catch (Exception ex) {
150+
Date d = new Date();
151+
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
152+
log.error(df.format(d) + " Unable to subscribe table. Will try again after 1 seconds." + site.getHost() + ":" + site.getPort() + "/" + site.getTableName() + "/" + site.getActionName());
153+
ex.printStackTrace();
154+
return false;
155+
}
139156
}
140157
}

src/com/xxdb/streaming/cep/EventHandler.java renamed to src/com/xxdb/streaming/client/cep/EventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.xxdb.streaming.cep;
1+
package com.xxdb.streaming.client.cep;
22

33
import com.xxdb.comm.ErrorCodeInfo;
44
import com.xxdb.data.*;

0 commit comments

Comments
 (0)