Skip to content

Commit af52197

Browse files
committed
AJ-817: rename param from resubTimeout to resubscribeTimeout
1 parent 3467b86 commit af52197

File tree

5 files changed

+27
-27
lines changed

5 files changed

+27
-27
lines changed

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public abstract class AbstractClient implements MessageDispatcher {
4141
protected String lastBackupSiteTopic = "";
4242
protected Map<String, Integer> currentSiteIndexMap = new ConcurrentHashMap<>();
4343
protected static Map<String, Long> lastExceptionTopicTimeMap = new ConcurrentHashMap<>();
44-
protected static Integer resubTimeout;
44+
protected static Integer resubscribeInterval;
4545
protected static boolean subOnce;
4646
protected BlockingQueue<List<IMessage>> lastQueue;
4747
protected String lastSuccessSubscribeTopic = "";
@@ -521,7 +521,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
521521
String tableName, String actionName, MessageHandler handler,
522522
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
523523
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
524-
List<String> backupSites, int resubTimeout, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
524+
List<String> backupSites, int resubscribeInterval, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
525525
Entity re;
526526
String topic = "";
527527
DBConnection dbConn = null;
@@ -543,7 +543,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
543543
}
544544
}
545545

546-
AbstractClient.resubTimeout = resubTimeout;
546+
AbstractClient.resubscribeInterval = resubscribeInterval;
547547
AbstractClient.subOnce = subOnce;
548548
AbstractClient.ifUseBackupSite = true;
549549

@@ -771,8 +771,8 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
771771
String tableName, String actionName, MessageHandler handler,
772772
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
773773
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
774-
List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException, RuntimeException {
775-
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce, true);
774+
List<String> backupSites, int resubscribeInterval, boolean subOnce) throws IOException, RuntimeException {
775+
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubscribeInterval, subOnce, true);
776776
}
777777

778778
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ public void run() {
165165
dispatcher.activeCloseConnection(s);
166166
for (String topic : dispatcher.getAllTopicsBySite(site)) {
167167
System.out.println("Daemon need reconnect: " + topic);
168-
// reconnect every info.resubTimeout ms
169-
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
168+
// reconnect every info.resubscribeTimeout ms
169+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubscribeInterval)
170170
continue;
171171

172172
dispatcher.tryReconnect(topic);
@@ -175,8 +175,8 @@ public void run() {
175175
Site s = dispatcher.getSiteByName(site);
176176
dispatcher.activeCloseConnection(s);
177177
for (String topic : dispatcher.getAllTopicsBySite(site)) {
178-
// reconnect every info.resubTimeout ms
179-
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
178+
// reconnect every info.resubscribeTimeout ms
179+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubscribeInterval)
180180
continue;
181181

182182
dispatcher.tryReconnect(topic);

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
8989
return topicPoller;
9090
}
9191

92-
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
93-
if (resubTimeout < 0)
94-
// resubTimeout default: 100ms
95-
resubTimeout = 100;
92+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubscribeTimeout, boolean subOnce) throws IOException {
93+
if (resubscribeTimeout < 0)
94+
// resubscribeTimeout default: 100ms
95+
resubscribeTimeout = 100;
9696

97-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
97+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubscribeTimeout, subOnce);
9898
topicPoller = new TopicPoller(queue);
9999
return topicPoller;
100100
}
@@ -208,7 +208,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
208208
if (AbstractClient.ifUseBackupSite) {
209209
AbstractClient.ifUseBackupSite = false;
210210
AbstractClient.subOnce = false;
211-
AbstractClient.resubTimeout = 100;
211+
AbstractClient.resubscribeInterval = 100;
212212
}
213213
log.info("Successfully unsubscribed table " + fullTableName);
214214
} catch (Exception ex) {

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,12 @@ public void subscribe(String host, int port, String tableName, String actionName
193193
}
194194
}
195195

196-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
197-
if (resubTimeout < 0)
198-
// resubTimeout default: 100ms
199-
resubTimeout = 100;
196+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubscribeTimeout, boolean subOnce) throws IOException {
197+
if (resubscribeTimeout < 0)
198+
// resubscribeTimeout default: 100ms
199+
resubscribeTimeout = 100;
200200

201-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
201+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubscribeTimeout, subOnce);
202202
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
203203
synchronized (queueHandlers) {
204204
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
@@ -375,7 +375,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
375375
if (AbstractClient.ifUseBackupSite) {
376376
AbstractClient.ifUseBackupSite = false;
377377
AbstractClient.subOnce = false;
378-
AbstractClient.resubTimeout = 100;
378+
AbstractClient.resubscribeInterval = 100;
379379
}
380380
log.info("Successfully unsubscribed table " + fullTableName);
381381
} catch (Exception ex) {

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,15 @@ public void subscribe(String host, int port, String tableName, String actionName
237237
}
238238
}
239239

240-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
240+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password, List<String> backupSites, int resubscribeTimeout, boolean subOnce) throws IOException {
241241
if(batchSize<=0)
242242
throw new IllegalArgumentException("BatchSize must be greater than zero");
243243
if(throttle<0)
244244
throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
245-
if (resubTimeout < 0)
246-
// resubTimeout default: 100ms
247-
resubTimeout = 100;
248-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, backupSites, resubTimeout, subOnce);
245+
if (resubscribeTimeout < 0)
246+
// resubscribeTimeout default: 100ms
247+
resubscribeTimeout = 100;
248+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, backupSites, resubscribeTimeout, subOnce);
249249
HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
250250
handlerLopper.start();
251251
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
@@ -538,7 +538,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
538538
if (AbstractClient.ifUseBackupSite) {
539539
AbstractClient.ifUseBackupSite = false;
540540
AbstractClient.subOnce = false;
541-
AbstractClient.resubTimeout = 100;
541+
AbstractClient.resubscribeInterval = 100;
542542
}
543543
log.info("Successfully unsubscribed table " + fullTableName);
544544
} catch (Exception ex) {

0 commit comments

Comments
 (0)