Skip to content

Commit d55ace3

Browse files
author
chengyitian
committed
Merge branch 'dev' of dolphindb.net:dolphindb/api-java
2 parents adda377 + d72e14f commit d55ace3

12 files changed

+1345
-1372
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>com.dolphindb</groupId>
44
<artifactId>dolphindb-javaapi</artifactId>
5-
<version>3.00.2.2</version>
5+
<version>3.00.2.3</version>
66
<packaging>jar</packaging>
77

88
<properties>
9-
<dolphindb.version>3.00.2.2</dolphindb.version>
9+
<dolphindb.version>3.00.2.3</dolphindb.version>
1010
</properties>
1111
<name>DolphinDB Java API</name>
1212
<description>The messaging and data conversion protocol between Java and DolphinDB server</description>
@@ -31,7 +31,7 @@
3131
<connection>scm:git:git@github.com:dolphindb/api-java.git</connection>
3232
<developerConnection>scm:git:git@github.com:dolphindb/api-java.git</developerConnection>
3333
<url>git@github.com:dolphindb/api-java.git</url>
34-
<tag>api-java-3.00.2.2</tag>
34+
<tag>api-java-3.00.2.3</tag>
3535
</scm>
3636
<dependencies>
3737
<dependency>

src/com/xxdb/data/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
public class Utils {
1717

18-
public static final String JAVA_API_VERSION = "3.00.2.2";
18+
public static final String JAVA_API_VERSION = "3.00.2.3";
1919

2020
public static final int DISPLAY_ROWS = 20;
2121
public static final int DISPLAY_COLS = 100;

src/com/xxdb/streaming/client/AbstractClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public abstract class AbstractClient implements MessageDispatcher {
4141
protected String lastBackupSiteTopic = "";
4242
protected Map<String, Integer> currentSiteIndexMap = new ConcurrentHashMap<>();
4343
protected static Map<String, Long> lastExceptionTopicTimeMap = new ConcurrentHashMap<>();
44-
protected static Integer resubTimeout;
44+
protected static Integer resubscribeInterval;
4545
protected static boolean subOnce;
4646
protected BlockingQueue<List<IMessage>> lastQueue;
4747
protected String lastSuccessSubscribeTopic = "";
@@ -521,7 +521,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
521521
String tableName, String actionName, MessageHandler handler,
522522
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
523523
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
524-
List<String> backupSites, int resubTimeout, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
524+
List<String> backupSites, int resubscribeInterval, boolean subOnce, boolean createSubInfo) throws IOException, RuntimeException {
525525
Entity re;
526526
String topic = "";
527527
DBConnection dbConn = null;
@@ -543,7 +543,7 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
543543
}
544544
}
545545

546-
AbstractClient.resubTimeout = resubTimeout;
546+
AbstractClient.resubscribeInterval = resubscribeInterval;
547547
AbstractClient.subOnce = subOnce;
548548
AbstractClient.ifUseBackupSite = true;
549549

@@ -771,8 +771,8 @@ protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,
771771
String tableName, String actionName, MessageHandler handler,
772772
long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer,
773773
boolean allowExistTopic, String userName, String passWord, boolean msgAsTable,
774-
List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException, RuntimeException {
775-
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce, true);
774+
List<String> backupSites, int resubscribeInterval, boolean subOnce) throws IOException, RuntimeException {
775+
return subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubscribeInterval, subOnce, true);
776776
}
777777

778778
protected BlockingQueue<List<IMessage>> subscribeInternal(String host, int port,

src/com/xxdb/streaming/client/Daemon.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ public void run() {
165165
dispatcher.activeCloseConnection(s);
166166
for (String topic : dispatcher.getAllTopicsBySite(site)) {
167167
System.out.println("Daemon need reconnect: " + topic);
168-
// reconnect every info.resubTimeout ms
169-
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
168+
// reconnect every info.resubscribeInterval ms
169+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubscribeInterval)
170170
continue;
171171

172172
dispatcher.tryReconnect(topic);
@@ -175,8 +175,8 @@ public void run() {
175175
Site s = dispatcher.getSiteByName(site);
176176
dispatcher.activeCloseConnection(s);
177177
for (String topic : dispatcher.getAllTopicsBySite(site)) {
178-
// reconnect every info.resubTimeout ms
179-
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubTimeout)
178+
// reconnect every info.resubscribeInterval ms
179+
if (System.currentTimeMillis() - AbstractClient.lastExceptionTopicTimeMap.get(topic) <= AbstractClient.resubscribeInterval)
180180
continue;
181181

182182
dispatcher.tryReconnect(topic);

src/com/xxdb/streaming/client/PollingClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,12 @@ public TopicPoller subscribe(String host, int port, String tableName, String act
8989
return topicPoller;
9090
}
9191

92-
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
93-
if (resubTimeout < 0)
94-
// resubTimeout default: 100ms
95-
resubTimeout = 100;
92+
public TopicPoller subscribe(String host, int port, String tableName, String actionName, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubscribeInterval, boolean subOnce) throws IOException {
93+
if (resubscribeInterval < 0)
94+
// resubscribeInterval default: 100ms
95+
resubscribeInterval = 100;
9696

97-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
97+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, (MessageHandler) null, offset, reconnect, filter, deserializer, false, userName, passWord, msgAsTable, backupSites, resubscribeInterval, subOnce);
9898
topicPoller = new TopicPoller(queue);
9999
return topicPoller;
100100
}
@@ -208,7 +208,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
208208
if (AbstractClient.ifUseBackupSite) {
209209
AbstractClient.ifUseBackupSite = false;
210210
AbstractClient.subOnce = false;
211-
AbstractClient.resubTimeout = 100;
211+
AbstractClient.resubscribeInterval = 100;
212212
}
213213
log.info("Successfully unsubscribed table " + fullTableName);
214214
} catch (Exception ex) {

src/com/xxdb/streaming/client/ThreadPooledClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,12 @@ public void subscribe(String host, int port, String tableName, String actionName
193193
}
194194
}
195195

196-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
197-
if (resubTimeout < 0)
198-
// resubTimeout default: 100ms
199-
resubTimeout = 100;
196+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, String userName, String passWord, boolean msgAsTable, List<String> backupSites, int resubscribeInterval, boolean subOnce) throws IOException {
197+
if (resubscribeInterval < 0)
198+
// resubscribeInterval default: 100ms
199+
resubscribeInterval = 100;
200200

201-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubTimeout, subOnce);
201+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, passWord, msgAsTable, backupSites, resubscribeInterval, subOnce);
202202
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
203203
synchronized (queueHandlers) {
204204
queueHandlers.put(tableNameToTrueTopic.get(topicStr), new QueueHandlerBinder(queue, handler));
@@ -375,7 +375,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
375375
if (AbstractClient.ifUseBackupSite) {
376376
AbstractClient.ifUseBackupSite = false;
377377
AbstractClient.subOnce = false;
378-
AbstractClient.resubTimeout = 100;
378+
AbstractClient.resubscribeInterval = 100;
379379
}
380380
log.info("Successfully unsubscribed table " + fullTableName);
381381
} catch (Exception ex) {

src/com/xxdb/streaming/client/ThreadedClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,15 @@ public void subscribe(String host, int port, String tableName, String actionName
237237
}
238238
}
239239

240-
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password, List<String> backupSites, int resubTimeout, boolean subOnce) throws IOException {
240+
public void subscribe(String host, int port, String tableName, String actionName, MessageHandler handler, long offset, boolean reconnect, Vector filter, StreamDeserializer deserializer, boolean allowExistTopic, int batchSize, int throttle, String userName, String password, List<String> backupSites, int resubscribeInterval, boolean subOnce) throws IOException {
241241
if(batchSize<=0)
242242
throw new IllegalArgumentException("BatchSize must be greater than zero");
243243
if(throttle<0)
244244
throw new IllegalArgumentException("Throttle must be greater than or equal to zero");
245-
if (resubTimeout < 0)
246-
// resubTimeout default: 100ms
247-
resubTimeout = 100;
248-
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, backupSites, resubTimeout, subOnce);
245+
if (resubscribeInterval < 0)
246+
// resubscribeInterval default: 100ms
247+
resubscribeInterval = 100;
248+
BlockingQueue<List<IMessage>> queue = subscribeInternal(host, port, tableName, actionName, handler, offset, reconnect, filter, deserializer, allowExistTopic, userName, password, false, backupSites, resubscribeInterval, subOnce);
249249
HandlerLopper handlerLopper = new HandlerLopper(queue, handler, batchSize, throttle == 0 ? -1 : throttle);
250250
handlerLopper.start();
251251
String topicStr = host + ":" + port + "/" + tableName + "/" + actionName;
@@ -538,7 +538,7 @@ protected void unsubscribeInternal(String host, int port, String tableName, Stri
538538
if (AbstractClient.ifUseBackupSite) {
539539
AbstractClient.ifUseBackupSite = false;
540540
AbstractClient.subOnce = false;
541-
AbstractClient.resubTimeout = 100;
541+
AbstractClient.resubscribeInterval = 100;
542542
}
543543
log.info("Successfully unsubscribed table " + fullTableName);
544544
} catch (Exception ex) {

0 commit comments

Comments
 (0)