Skip to content

Commit 3afd20c

Browse files
author
chengyitian
committed
AJ-636: delete no-use code or log;
1 parent 933b08a commit 3afd20c

File tree

6 files changed

+15
-91
lines changed

6 files changed

+15
-91
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ public abstract class AbstractClient implements MessageDispatcher {
4343
protected static Map<String, Long> lastExceptionTopicTimeMap = new ConcurrentHashMap<>();
4444
protected static Integer resubTimeout;
4545
protected static boolean subOnce;
46-
// protected static boolean createSubInfo;
4746
protected BlockingQueue<List<IMessage>> lastQueue;
4847
protected String lastSuccessSubscribeTopic = "";
4948

@@ -305,9 +304,9 @@ public boolean tryReconnect(String topic) {
305304

306305
// put sites to new sub topic.
307306
for (String key : trueTopicToSites.keySet()) {
308-
// 重新为 key 赋值
307+
// reassign the value to key.
309308
if (key.contains(sites[successfulSiteIndex].host+":"+sites[successfulSiteIndex].port)) {
310-
trueTopicToSites.put(key, sites); // 更新操作是安全的
309+
trueTopicToSites.put(key, sites);
311310
}
312311
}
313312

@@ -321,10 +320,7 @@ public boolean tryReconnect(String topic) {
321320
}
322321
}
323322

324-
// log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
325-
326323
if (!reconnected) {
327-
// waitReconnectTopic.add(topic);
328324
return false;
329325
} else {
330326
log.info("Successfully switched to node: " + sites[currentSiteIndexMap.get(topic)].host + ":" + sites[currentSiteIndexMap.get(topic)].port);
@@ -498,10 +494,6 @@ private String getTopic(String host, int port, String alias, String tableName, S
498494
return String.format("%s:%d:%s/%s/%s", host, port, alias, tableName, actionName);
499495
}
500496

501-
private String getTopic(String host, int port, String tableName, String actionName) {
502-
return String.format("%s:%d:%s/%s", host, port, tableName, actionName);
503-
}
504-
505497
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
506498
String tableName, String actionName, MessageHandler handler,
507499
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic)
@@ -614,16 +606,11 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
614606
}
615607
}
616608

617-
// Site curConnectedSite = parsedBackupSites.get(currentSiteIndexMap.get(topic));
618-
// checkServerVersion(curConnectedSite.host, curConnectedSite.port);
619-
620609
if (!isConnected)
621610
throw new IOException("All sites try connect failed.");
622611
}
623612

624613
if (parsedBackupSites.size() != 0) {
625-
// connList.add(dbConn);
626-
627614
// prepare parsedBackupSites
628615
for (int i = 0; i < parsedBackupSites.size(); i++) {
629616
String backupIP = parsedBackupSites.get(i).host;
@@ -639,11 +626,13 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
639626
HATopicToTrueTopic.put(topic, topic);
640627
}
641628
}
642-
if (subInfos_.containsKey(topic)){
629+
630+
if (subInfos_.containsKey(topic)) {
643631
throw new RuntimeException("Subscription with topic " + topic + " exist. ");
644-
}else {
632+
} else {
645633
subInfos_.put(topic, deserializer);
646634
}
635+
647636
synchronized (trueTopicToSites) {
648637
Site[] sitesArray = new Site[parsedBackupSites.size()];
649638
parsedBackupSites.toArray(sitesArray);
@@ -676,10 +665,8 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
676665
List<Entity> params = new ArrayList<Entity>();
677666
params.add(new BasicString(tableName));
678667
params.add(new BasicString(actionName));
679-
// subscribeInternalConnect(dbConn, host, port, userName, passWord);
680668
re = dbConn.run("getSubscriptionTopic", params);
681669
topic = ((BasicAnyVector) re).getEntity(0).getString();
682-
// lastBackupSiteTopic = topic;
683670
lastSuccessSubscribeTopic = topic;
684671
params.clear();
685672

@@ -805,7 +792,6 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
805792
protected void unsubscribeInternal(String host, int port, String tableName, String actionName) throws IOException {
806793
synchronized (this) {
807794
DBConnection dbConn = new DBConnection();
808-
809795
if (!currentSiteIndexMap.isEmpty()) {
810796
String topic = tableNameToTrueTopic.get( host + ":" + port + "/" + tableName + "/" + actionName);
811797
Integer currentSiteIndex = currentSiteIndexMap.get(topic);
@@ -837,17 +823,13 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
837823
String topic = null;
838824
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
839825

840-
// synchronized (tableNameToTrueTopic) {
841826
topic = tableNameToTrueTopic.get(fullTableName);
842-
// }
843827

844-
// synchronized (trueTopicToSites) {
845828
Site[] sites = trueTopicToSites.get(topic);
846829
if (sites == null || sites.length == 0)
847830
;
848831
for (int i = 0; i < sites.length; i++)
849832
sites[i].closed = true;
850-
// }
851833

852834
log.info("Successfully unsubscribed table " + fullTableName);
853835
} catch (Exception ex) {

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -161,54 +161,31 @@ public void run() {
161161
// if set backupSite
162162
for (String site : this.dispatcher.getAllReconnectSites()) {
163163
if (dispatcher.getNeedReconnect(site) == 1) {
164-
System.out.println("flag site: " + site);
165164
Site s = dispatcher.getCurrentSiteByName(site);
166165
dispatcher.activeCloseConnection(s);
167-
String lastTopic = "";
168166
for (String topic : dispatcher.getAllTopicsBySite(site)) {
169167
System.out.println("Daemon need reconnect: " + topic);
170168
// reconnect every info.resubTimeout ms
171169
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
172170
continue;
173171

174-
log.info("flag1 try to reconnect topic " + topic);
175172
dispatcher.tryReconnect(topic);
176-
// lastTopic = topic;
177173
}
178-
// dispatcher.setNeedReconnect(lastTopic, 2);
179174
} else {
180-
// try reconnect after 3 second when reconnecting stat
181-
// long ts = dispatcher.getReconnectTimestamp(site);
182-
// if (System.currentTimeMillis() >= ts + 3000) {
183175
Site s = dispatcher.getSiteByName(site);
184176
dispatcher.activeCloseConnection(s);
185177
for (String topic : dispatcher.getAllTopicsBySite(site)) {
186178
// reconnect every info.resubTimeout ms
187179
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
188180
continue;
189181

190-
log.info("flag2 try to reconnect topic " + topic);
191182
dispatcher.tryReconnect(topic);
192-
// }
193183
dispatcher.setReconnectTimestamp(site, System.currentTimeMillis());
194184
}
195185
}
196186
}
197187

198188
// not need to put and get from waitReconnectTopic.
199-
// Set<String> waitReconnectTopic = dispatcher.getAllReconnectTopic();
200-
// synchronized (waitReconnectTopic) {
201-
// for (String topic : waitReconnectTopic) {
202-
// // reconnect every info.resubTimeout ms
203-
//// if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
204-
//// continue;
205-
// long subTime = System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic);
206-
// System.out.println("flag3 subTime: " + subTime);
207-
// if (subTime <= AbstractClient.resubTimeout)
208-
// continue;
209-
// dispatcher.tryReconnect(topic);
210-
// }
211-
// }
212189

213190
try {
214191
// check reconnected interval time

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,12 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
9595
resubTimeout = 100;
9696

9797
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
98-
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
99-
List<String> usr = Arrays.asList(userName, passWord);
100-
// users.put(tp, usr);
10198
topicPoller = new TopicPoller(queue);
10299
return topicPoller;
103100
}
104101

105102
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
106103
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, 100, false);
107-
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
108-
List<String> usr = Arrays.asList(userName, passWord);
109-
// users.put(tp, usr);
110104
topicPoller = new TopicPoller(queue);
111105
return topicPoller;
112106
}
@@ -200,19 +194,15 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
200194
dbConn.run("stopPublishTable", params);
201195
String topic = null;
202196
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
203-
//synchronized (tableNameToTrueTopic) {
204197
topic = tableNameToTrueTopic.get(fullTableName);
205-
// }
206-
// synchronized (trueTopicToSites) {
198+
207199
Site[] sites = trueTopicToSites.get(topic);
208200
if (sites == null || sites.length == 0)
209201
;
210202
for (int i = 0; i < sites.length; i++)
211203
sites[i].closed = true;
212-
// }
213-
// synchronized (queueManager) {
204+
214205
queueManager.removeQueue(topic);
215-
// }
216206

217207
// init backupSites related params.
218208
if (AbstractClient.ifUseBackupSite) {

src/com/xxdb/streaming/client/QueueManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@ public synchronized BlockingQueue<List<IMessage>> addQueue(String topic) {
1616
throw new RuntimeException("Topic " + topic + " already subscribed");
1717
}
1818

19-
public synchronized BlockingQueue<List<IMessage>> changeQueue(String topic, BlockingQueue<List<IMessage>> q) {
19+
public synchronized void changeQueue(String topic, BlockingQueue<List<IMessage>> q) {
2020
if (queueMap.containsKey(topic)) {
2121
queueMap.put(topic, q);
22-
return q;
22+
} else {
23+
throw new RuntimeException("Topic " + topic + " doesn't subscribed");
2324
}
24-
throw new RuntimeException("Topic " + topic + " doesn't subscribed");
2525
}
2626

2727
public synchronized BlockingQueue<List<IMessage>> getQueue(String topic) {

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -202,20 +202,16 @@ public void subscribe(String host, int port, String tableName, String actionName
202202

203203
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
204204
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
205-
List<String> usr = Arrays.asList(userName, passWord);
206205
synchronized (queueHandlers) {
207206
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
208-
// users.put(topicStr, usr);
209207
}
210208
}
211209

212210
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites) throws IOException {
213211
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, 100, false);
214212
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
215-
List<String> usr = Arrays.asList(userName, passWord);
216213
synchronized (queueHandlers) {
217214
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
218-
// users.put(topicStr, usr);
219215
}
220216
}
221217

@@ -284,8 +280,6 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
284280
if (!ifUseBackupSite) {
285281
// original logic:
286282
DBConnection dbConn = new DBConnection();
287-
// String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
288-
// List<String> usr = users.get(fullTableName);
289283
List<String> tp = Arrays.asList(host, String.valueOf(port), tableName, actionName);
290284
List<String> usr = users.get(tp);
291285
String user = usr.get(0);
@@ -369,19 +363,15 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
369363
dbConn.run("stopPublishTable", params);
370364
String topic = null;
371365
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
372-
// synchronized (tableNameToTrueTopic) {
373366
topic = tableNameToTrueTopic.get(fullTableName);
374-
// }
375-
// synchronized (trueTopicToSites) {
367+
376368
Site[] sites = trueTopicToSites.get(topic);
377369
if (sites == null || sites.length == 0)
378370
;
379371
for (int i = 0; i < sites.length; i++)
380372
sites[i].closed = true;
381-
// }
382-
// synchronized (queueManager) {
373+
383374
queueManager.removeQueue(topic);
384-
// }
385375

386376
// init backupSites related params.
387377
if (AbstractClient.ifUseBackupSite) {

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -182,16 +182,11 @@ protected boolean doReconnect(Site site) {
182182
synchronized (this) {
183183
log.info("ThreadedClient doReconnect: " + site.host + ":" + site.port);
184184
try {
185-
System.out.println("doReconnect 尝试切换节点:" + site.host + ":" + site.port);
186-
// System.out.println("site msg id: " +site.msgId);
187185
subscribe(site.host, site.port, site.tableName, site.actionName, site.handler, site.msgId + 1, true, site.filter, site.deserializer, site.allowExistTopic, site.userName, site.passWord, false);
188-
System.out.println("doReconnect 尝试切换节点成功:" + site.host + ":" + site.port);
189186
String topicStr = site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName;
190187
String curTopic = tableNameToTrueTopic.get(topicStr);
191188
BlockingQueue<List<IMessage>> queue = queueManager.addQueue(curTopic);
192189
queueManager.changeQueue(curTopic, lastQueue);
193-
// System.out.println("切换后 handlerLoppers: " + handlerLoppers.get(topicStr).getName());
194-
// System.out.println("切换成功后,handlerLoppers size: " + handlerLoppers.size());
195190
Date d = new Date();
196191
DateFormat df = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
197192
log.info(df.format(d) + " Successfully reconnected and subscribed " + site.host + ":" + site.port + "/" + site.tableName + "/" + site.actionName);
@@ -254,10 +249,8 @@ public void subscribe(String host, int port, String tableName, String actionName
254249
HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
255250
handlerLopper.start();
256251
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
257-
List<String> usr = Arrays.asList(userName, password);
258252
synchronized (handlerLoppers) {
259253
handlerLoppers.put(topicStr, handlerLopper);
260-
// users.put(topicStr, usr);
261254
}
262255
}
263256

@@ -270,10 +263,8 @@ public void subscribe(String host, int port, String tableName, String actionName
270263
HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
271264
handlerLopper.start();
272265
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
273-
List<String> usr = Arrays.asList(userName, password);
274266
synchronized (handlerLoppers) {
275267
handlerLoppers.put(topicStr, handlerLopper);
276-
// users.put(topicStr, usr);
277268
}
278269
}
279270

@@ -533,19 +524,15 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
533524
dbConn.run("stopPublishTable", params);
534525
String topic = null;
535526
String fullTableName = host + ":" + port + "/" + tableName + "/" + actionName;
536-
// synchronized (tableNameToTrueTopic) {
537527
topic = tableNameToTrueTopic.get(fullTableName);
538-
// }
539-
// synchronized (trueTopicToSites) {
528+
540529
Site[] sites = trueTopicToSites.get(topic);
541530
if (sites == null || sites.length == 0)
542531
;
543532
for (int i = 0; i < sites.length; i++)
544533
sites[i].closed = true;
545-
// }
546-
// synchronized (queueManager) {
534+
547535
queueManager.removeQueue(lastBackupSiteTopic);
548-
// }
549536

550537
// init backupSites related params.
551538
if (AbstractClient.ifUseBackupSite) {
@@ -560,11 +547,9 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
560547
dbConn.close();
561548
String topicStr = originHost + ":" + originPort + "/" + tableName + "/" + actionName;
562549
HandlerLopper handlerLopper = null;
563-
// synchronized (handlerLoppers) {
564550
handlerLopper = handlerLoppers.get(topicStr);
565551
handlerLoppers.remove(topicStr);
566552
handlerLopper.interrupt();
567-
// }
568553
}
569554
}
570555
}

0 commit comments

Comments
 (0)