Skip to content

Commit 2e72263

Browse files
author
chengyitian
committed
Merge branch 'dev' of dolphindb.net:dolphindb/api-java
# Conflicts: # README.md
2 parents 380285c + 7c300c7 commit 2e72263

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1607
-438
lines changed

README.md

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -111,15 +111,15 @@ boolean success = conn.connect("localhost", 8848, "admin", "123456");
111111

112112
If the connection is established without a username and password, you only have guest privileges. To be granted with more privileges, we can log in by executing `conn.login('admin', '123456', true)`.
113113

114-
To define and use user-defined functions in a Java program, you can pass in the user-defined scripts to the parameter initialScript. The advantages are:
115-
(1) These functions don't need to be defined repeatedly every time `run` is called;
114+
To define and use user-defined functions in a Java program, you can pass in the user-defined scripts to the parameter initialScript. The advantages are:
115+
(1) These functions don't need to be defined repeatedly every time `run` is called;
116116
(2) The API client can automatically connect to the server after disconnection. If the parameter *initialScript* is specified, the Java API will automatically execute the script and register the functions. The parameter can be very useful for scenarios where the network is not stable but the program needs to run continuously.
117117

118118
```java
119119
boolean success = conn.connect("localhost", 8848, "admin", "123456", "");
120120
```
121121

122-
To enable high availability, set the parameter *enableHighAvailability* to true.
122+
To enable high availability, set the parameter *enableHighAvailability* to true.
123123

124124
As of version 1.30.22.2, load balancing is automatically enabled for HA mode. Since 2.00.11.0, the `connect` method supports a new parameter *enableLoadBalance* which allows users to enable/disable load balancing in HA mode. Load balancing is only supported in HA mode and it is disabled by default.
125125

@@ -233,7 +233,7 @@ The Java API provides connection pool `ExclusiveDBConnectionPool`. Users can exe
233233
| getConnectionCount() | Get the number of connections. |
234234
| shutdown() | Shut down the connection pool. |
235235

236-
**Note**: If the current `ExclusiveDBConnectionPool` is no longer in use, Java API will automatically close the connection after a while. To release the connection resources, call `shutdown()`upon the completion of thread tasks.
236+
**Note**: If the current `ExclusiveDBConnectionPool` is no longer in use, Java API will automatically close the connection after a while. To release the connection resources, call `shutdown()`upon the completion of thread tasks.
237237

238238
`BasicDBTask` wraps the functions and arguments to be executed.
239239

@@ -563,7 +563,7 @@ public void testVoid() throws IOException{
563563
There are 2 types of DolphinDB tables:
564564

565565
- In-memory table: it has the fastest access speed, but if the node shuts down the data will be lost.
566-
- DFS table: data are distributed across disks of multiple nodes.
566+
- DFS table: data are distributed across disks of multiple nodes.
567567

568568
### 7.1. Write to an In-Memory Table
569569

@@ -610,7 +610,7 @@ The example above uses partial application in DolphinDB to embed a table in `tab
610610

611611
#### 7.1.3. Save BasicTable Objects With Function `tableInsert`
612612

613-
Function `tableInsert` can also accept a BasicTable object in Java as a parameter to append data to a table in batches.
613+
Function `tableInsert` can also accept a BasicTable object in Java as a parameter to append data to a table in batches.
614614

615615
```java
616616
public void test_save_table(BasicTable table1) throws IOException {
@@ -762,7 +762,7 @@ public void test_loop_basicTable(BasicTable table1) throws Exception{
762762

763763
### 7.4. Append Data Asynchronously
764764

765-
You can use methods of `MultithreadedTableWriter` class to asynchronously append data to a DolphinDB in-memory table, dimension table, or a DFS table. The class maintains a buffer queue. Even when the server is fully occupied with network I/O operations, the writing threads of the API client will not be blocked.
765+
You can use methods of `MultithreadedTableWriter` class to asynchronously append data to a DolphinDB in-memory table, dimension table, or a DFS table. The class maintains a buffer queue. Even when the server is fully occupied with network I/O operations, the writing threads of the API client will not be blocked.
766766

767767
For asynchronous writes:
768768

@@ -791,7 +791,7 @@ Parameters:
791791
- **port**: port number
792792
- **userId** / **password**: username and password
793793
- **dbPath**: a STRING indicating the DFS database path. Leave it unspecified for an in-memory table.
794-
- **tableName**: a STRING indicating the in-memory or DFS table name.
794+
- **tableName**: a STRING indicating the in-memory or DFS table name.
795795

796796
**Note:** For API 1.30.17 or lower versions, when writing to an in-memory table, please specify the in-memory table name for *dbPath* and leave *tableName* empty.
797797

@@ -857,7 +857,7 @@ List<List<Entity>> unwrittenData = multithreadedTableWriter_.getUnwrittenData();
857857
ErrorCodeInfo insertUnwrittenData(List<List<Entity>> records)
858858
```
859859

860-
**Details:**
860+
**Details:**
861861

862862
Insert unwritten data. The result is in the same format as `insert`. The difference is that `insertUnwrittenData` can insert multiple records at a time.
863863

@@ -877,7 +877,7 @@ ErrorCodeInfo ret = multithreadedTableWriter_.insertUnwrittenData(unwrittenData)
877877
Status getStatus()
878878
```
879879

880-
**Details:**
880+
**Details:**
881881

882882
Get the current status of the `MultithreadedTableWriter` object.
883883

@@ -911,14 +911,14 @@ writeStatus = multithreadedTableWriter_.getStatus();
911911
- `hasError()`: return true if an error occurred, false otherwise.
912912
- `succeed()`: return true if the data is written successfully, false otherwise.
913913

914-
914+
915915
(5) waitForThreadCompletion
916916

917917
```java
918918
waitForThreadCompletion()
919919
```
920920

921-
**Details:**
921+
**Details:**
922922

923923
After calling the method, `MultithreadedTableWriter` will wait until all working threads complete their tasks. If you call `insert` or `insertUnwrittenData` after the execution of `waitForThreadCompletion`, an error "thread is exiting" will be raised.
924924

@@ -1000,7 +1000,7 @@ Output:
10001000
The above example calls method `writer.insert()` to write data to writer, and obtains the status with `writer.getStatus()`. Please note that the method `writer.waitForThreadCompletion()` will wait for `MultithreadedTableWriter` to finish the data writes, and then terminate all working threads with the last status retained. A new MTW object must be created to write data again.
10011001

10021002
As shown in the above example, `MultithreadedTableWriter` applies multiple threads to data conversion and writes. The API client also uses multiple threads to call `MultithreadedTableWriter`, and the implementation is thread-safe.
1003-
1003+
10041004
#### 7.4.2. Exceptions Raised by MultithreadedTableWriter
10051005

10061006
When calling method `insert` of class `MultithreadedTableWriter`:

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>com.dolphindb</groupId>
44
<artifactId>dolphindb-javaapi</artifactId>
5-
<version>3.00.1.0</version>
5+
<version>3.00.1.1</version>
66
<packaging>jar</packaging>
77

88
<properties>
9-
<dolphindb.version>3.00.1.0</dolphindb.version>
9+
<dolphindb.version>3.00.1.1</dolphindb.version>
1010
</properties>
1111
<name>DolphinDB Java API</name>
1212
<description>The messaging and data conversion protocol between Java and DolphinDB server</description>
@@ -31,7 +31,7 @@
3131
<connection>scm:git:git@github.com:dolphindb/api-java.git</connection>
3232
<developerConnection>scm:git:git@github.com:dolphindb/api-java.git</developerConnection>
3333
<url>git@github.com:dolphindb/api-java.git</url>
34-
<tag>api-java-3.00.1.0</tag>
34+
<tag>api-java-3.00.1.1</tag>
3535
</scm>
3636
<dependencies>
3737
<dependency>

src/com/xxdb/DBConnection.java

Lines changed: 85 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.net.InetAddress;
66
import java.net.InetSocketAddress;
77
import java.net.Socket;
8+
import java.rmi.RemoteException;
89
import java.security.PublicKey;
910
import java.security.cert.CertificateException;
1011
import java.security.cert.X509Certificate;
@@ -55,6 +56,7 @@ public class DBConnection {
5556
private long runSeqNo_ = 0;
5657
private int[] serverVersion_;
5758
private boolean isReverseStreaming_ = false;
59+
private int tryReconnectNums = -1;
5860

5961
private static final Logger log = LoggerFactory.getLogger(DBConnection.class);
6062

@@ -189,7 +191,6 @@ private boolean connect()throws IOException{
189191
socket_.connect(new InetSocketAddress(hostName_,port_), 3000);
190192
}
191193
} catch (ConnectException ex) {
192-
log.error("com.xxdb.DBConnection.DBConnectionImpl.connect() has exception. Current node hostName: " + this.hostName_ + ", port: " + this.port_);
193194
log.error("Connect to " + this.hostName_ + ":" + this.port_ + " failed.");
194195
throw ex;
195196
}
@@ -668,6 +669,11 @@ public boolean connect(String hostName, int port, int timeout, boolean reconnect
668669
return connect(hostName, port, "", "", null, false, null, reconnect);
669670
}
670671

672+
public boolean connect(String hostName, int port, int timeout, boolean reconnect, int tryReconnectNums) throws IOException {
673+
this.connTimeout_ = timeout;
674+
return connect(hostName, port, "", "", null, false, null, reconnect, tryReconnectNums);
675+
}
676+
671677
public boolean connect(String hostName, int port, String initialScript) throws IOException {
672678
return connect(hostName, port, "", "", initialScript, false, null);
673679
}
@@ -723,14 +729,30 @@ public boolean connect(String hostName, int port, String userId, String password
723729
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, false);
724730
}
725731

732+
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, int tryReconnectNums) throws IOException {
733+
if (enableHighAvailability)
734+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, true, tryReconnectNums);
735+
else
736+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, false, tryReconnectNums);
737+
}
738+
726739
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, boolean enableLoadBalance) throws IOException {
740+
return connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, enableLoadBalance, -1);
741+
}
742+
743+
public boolean connect(String hostName, int port, String userId, String password, String initialScript, boolean enableHighAvailability, String[] highAvailabilitySites, boolean reconnect, boolean enableLoadBalance, int tryReconnectNums) throws IOException {
727744
mutex_.lock();
728745
try {
729746
this.uid_ = userId;
730747
this.pwd_ = password;
731748
this.initialScript_ = initialScript;
732749
this.enableHighAvailability_ = enableHighAvailability;
733750
this.loadBalance_ = enableLoadBalance;
751+
if (tryReconnectNums <= 0)
752+
this.tryReconnectNums = -1;
753+
else
754+
this.tryReconnectNums = tryReconnectNums;
755+
734756
if (this.loadBalance_ && !this.enableHighAvailability_)
735757
throw new RuntimeException("Cannot only enable loadbalance but not enable highAvailablity.");
736758

@@ -746,20 +768,48 @@ public boolean connect(String hostName, int port, String userId, String password
746768

747769
Node connectedNode = new Node();
748770
BasicTable bt = null;
771+
749772
while (!closed_) {
773+
int totalConnectAttemptNums = this.tryReconnectNums * nodes_.size();
774+
int attempt = 0;
750775
while (!conn_.isConnected() && !closed_) {
751-
for (Node one : nodes_) {
752-
if (connectNode(one)) {
753-
connectedNode = one;
754-
break;
776+
if (this.tryReconnectNums > 0) {
777+
// finite try to connect.
778+
for (Node one : nodes_) {
779+
attempt ++;
780+
// System.out.println("Current init connect node: " + one.hostName + ":" + one.port);
781+
if (connectNode(one)) {
782+
connectedNode = one;
783+
break;
784+
}
785+
786+
try {
787+
Thread.sleep(100);
788+
} catch (Exception e){
789+
e.printStackTrace();
790+
return false;
791+
}
755792
}
756793

757-
try {
758-
Thread.sleep(100);
759-
} catch (Exception e){
760-
e.printStackTrace();
794+
if (attempt == totalConnectAttemptNums) {
795+
log.error("Connect failed after " + tryReconnectNums + " reconnect attemps for every node in high availability sites.");
761796
return false;
762797
}
798+
} else {
799+
// infinite try to connect.
800+
for (Node one : nodes_) {
801+
if (connectNode(one)) {
802+
connectedNode = one;
803+
break;
804+
}
805+
806+
try {
807+
Thread.sleep(100);
808+
} catch (Exception e){
809+
e.printStackTrace();
810+
return false;
811+
}
812+
}
763813
}
764814
}
765815

@@ -882,30 +932,43 @@ private void initConnection() throws IOException{
882932
}
883933

884934
public void switchDataNode(Node node) throws IOException{
935+
int attempt = 0;
936+
boolean isConnected = false;
885937
do {
886-
if (node.hostName != null && node.hostName.length() > 0){
887-
if (connectNode(node)){
938+
attempt ++;
939+
if (node.hostName != null && node.hostName.length() > 0) {
940+
if (connectNode(node)) {
888941
log.info("Switch to node: " + node.hostName + ":" + node.port + " successfully.");
942+
isConnected = true;
889943
break;
890944
}
891945
}
946+
892947
if (nodes_.isEmpty()){
893-
log.error("com.xxdb.DBConnection.switchDataNode nodes_ is empty. Current node hostName: " + node.hostName + ", port: " + node.port);
894948
log.error("Connect to " + node.hostName + ":" + node.port + " failed.");
895949
throw new RuntimeException("Connect to " + node.hostName + ":" + node.port + " failed.");
896950
}
897-
int index = nodeRandom_.nextInt(nodes_.size());
898-
if (connectNode(nodes_.get(index))){
899-
log.info("Switch to node: " + nodes_.get(index).hostName + ":" + nodes_.get(index).port + " successfully.");
900-
break;
951+
952+
if (nodes_.size() > 1) {
953+
int index = nodeRandom_.nextInt(nodes_.size());
954+
if (connectNode(nodes_.get(index))){
955+
log.info("Switch to node: " + nodes_.get(index).hostName + ":" + nodes_.get(index).port + " successfully.");
956+
isConnected = true;
957+
break;
958+
}
901959
}
960+
902961
try {
903962
Thread.sleep(1000);
904-
}catch (Exception e){
963+
} catch (Exception e){
905964
e.printStackTrace();
906965
return;
907966
}
908-
}while (!closed_);
967+
} while (!closed_ && (tryReconnectNums == -1 || attempt < tryReconnectNums));
968+
969+
if (!closed_ && !isConnected)
970+
throw new RuntimeException("Connect to " + node.hostName + ":" + node.port + " failed after " + attempt + " reconnect attemps.");
971+
909972
if (initialScript_!=null && initialScript_.length() > 0){
910973
run(initialScript_);
911974
}
@@ -918,7 +981,10 @@ public boolean connectNode(Node node) throws IOException{
918981
return conn_.connect(node.hostName, node.port, uid_, pwd_, connTimeout_);
919982
}catch (Exception e){
920983
if (isConnected()){
921-
ExceptionType type = parseException(e.getMessage(), node);
984+
Node tmpNode = new Node();
985+
tmpNode.hostName = node.hostName;
986+
tmpNode.port = node.port;
987+
ExceptionType type = parseException(e.getMessage(), tmpNode);
922988
if (type != ExceptionType.ET_NEWLEADER){
923989
if (type == ExceptionType.ET_IGNORE)
924990
return true;

0 commit comments

Comments
 (0)