Skip to content

Commit cfc9c95

Browse files
author
chengyitian
committed
Merge branch 'dev' of dolphindb.net:dolphindb/api-java
2 parents 770e175 + a6b846b commit cfc9c95

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1047
-1087
lines changed

README_CN.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# JAVA API 使用说明
22

3+
> 注意:该 Readme 不再进行维护。用户可移步至 DolphinDB 官方文档中心 [Java API 手册](https://docs.dolphindb.cn/zh/api/java/java.html)
4+
35
本教程主要介绍以下内容:
46
- [JAVA API 使用说明](#java-api-使用说明)
57
- [1. Java API 概述](#1-java-api-概述)

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
<modelVersion>4.0.0</modelVersion>
33
<groupId>com.dolphindb</groupId>
44
<artifactId>dolphindb-javaapi</artifactId>
5-
<version>3.00.0.0</version>
5+
<version>3.00.0.1</version>
66
<packaging>jar</packaging>
77

88
<properties>
9-
<dolphindb.version>3.00.0.0</dolphindb.version>
9+
<dolphindb.version>3.00.0.1</dolphindb.version>
1010
</properties>
1111
<name>DolphinDB Java API</name>
1212
<description>The messaging and data conversion protocol between Java and DolphinDB server</description>
@@ -31,7 +31,7 @@
3131
<connection>scm:git:git@github.com:dolphindb/api-java.git</connection>
3232
<developerConnection>scm:git:git@github.com:dolphindb/api-java.git</developerConnection>
3333
<url>git@github.com:dolphindb/api-java.git</url>
34-
<tag>api-java-3.00.0.0</tag>
34+
<tag>api-java-3.00.0.1</tag>
3535
</scm>
3636
<dependencies>
3737
<dependency>

src/com/xxdb/DBConnection.java

Lines changed: 52 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -788,51 +788,64 @@ public boolean connect(String hostName, int port, String userId, String password
788788
if ( bt!=null && bt.getDataForm() != Entity.DATA_FORM.DF_TABLE)
789789
throw new IOException("Run getClusterPerf() failed.");
790790

791-
if (bt!=null && loadBalance_) {
792-
//ignore very high load nodes, rand one in low load nodes
793-
List<Node> lowLoadNodes=new ArrayList<>();
794-
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
795-
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
796-
BasicDoubleVector memLoad = (BasicDoubleVector) bt.getColumn("memLoad");
797-
BasicDoubleVector connLoad = (BasicDoubleVector) bt.getColumn("connLoad");
798-
BasicDoubleVector avgLoad = (BasicDoubleVector) bt.getColumn("avgLoad");
799-
for (int i = 0; i < colHost.rows(); i++) {
800-
Node nodex = new Node(colHost.getString(i), colPort.getInt(i));
801-
Node pexistNode = null;
802-
if (highAvailabilitySites != null) {
803-
for (Node node : nodes_) {
804-
if ((node.hostName.equals(nodex.hostName) || nodex.hostName.equals("localhost")) && node.port == nodex.port){
805-
pexistNode = node;
806-
break;
791+
if (bt!=null) {
792+
if (!loadBalance_) {
793+
if (highAvailabilitySites == null) {
794+
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
795+
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
796+
for (int i = 0; i < colHost.rows(); i++) {
797+
Node curNode = new Node(colHost.getString(i), colPort.getInt(i));
798+
if (!(curNode.hostName.equals(hostName) && curNode.port == port))
799+
nodes_.add(curNode);
800+
}
801+
}
802+
} else {
803+
// enable loadBalance
804+
//ignore very high load nodes, rand one in low load nodes
805+
List<Node> lowLoadNodes=new ArrayList<>();
806+
BasicStringVector colHost = (BasicStringVector) bt.getColumn("host");
807+
BasicIntVector colPort = (BasicIntVector) bt.getColumn("port");
808+
BasicDoubleVector memLoad = (BasicDoubleVector) bt.getColumn("memLoad");
809+
BasicDoubleVector connLoad = (BasicDoubleVector) bt.getColumn("connLoad");
810+
BasicDoubleVector avgLoad = (BasicDoubleVector) bt.getColumn("avgLoad");
811+
for (int i = 0; i < colHost.rows(); i++) {
812+
Node nodex = new Node(colHost.getString(i), colPort.getInt(i));
813+
Node pexistNode = null;
814+
if (highAvailabilitySites != null) {
815+
for (Node node : nodes_) {
816+
if ((node.hostName.equals(nodex.hostName) || nodex.hostName.equals("localhost")) && node.port == nodex.port){
817+
pexistNode = node;
818+
break;
819+
}
807820
}
821+
//node is out of highAvailabilitySites
822+
if (pexistNode == null)
823+
continue;
808824
}
809-
//node is out of highAvailabilitySites
810-
if (pexistNode == null)
811-
continue;
825+
double load=(memLoad.getDouble(i)+connLoad.getDouble(i)+avgLoad.getDouble(i))/3.0;
826+
if (pexistNode != null) {
827+
pexistNode.load = load;
828+
} else {
829+
pexistNode=new Node(colHost.getString(i), colPort.getInt(i), load);
830+
nodes_.add(pexistNode);
831+
}
832+
// low load
833+
if (memLoad.getDouble(i)<0.8 && connLoad.getDouble(i)<0.9 && avgLoad.getDouble(i)<0.8)
834+
lowLoadNodes.add(pexistNode);
812835
}
813-
double load=(memLoad.getDouble(i)+connLoad.getDouble(i)+avgLoad.getDouble(i))/3.0;
814-
if (pexistNode != null) {
815-
pexistNode.load = load;
836+
837+
Node pMinNode;
838+
if (!lowLoadNodes.isEmpty()) {
839+
pMinNode=lowLoadNodes.get(nodeRandom_.nextInt(lowLoadNodes.size()));
816840
} else {
817-
pexistNode=new Node(colHost.getString(i), colPort.getInt(i), load);
818-
nodes_.add(pexistNode);
841+
pMinNode=nodes_.get(nodeRandom_.nextInt(nodes_.size()));
819842
}
820-
// low load
821-
if (memLoad.getDouble(i)<0.8 && connLoad.getDouble(i)<0.9 && avgLoad.getDouble(i)<0.8)
822-
lowLoadNodes.add(pexistNode);
823-
}
824843

825-
Node pMinNode;
826-
if (!lowLoadNodes.isEmpty()) {
827-
pMinNode=lowLoadNodes.get(nodeRandom_.nextInt(lowLoadNodes.size()));
828-
} else {
829-
pMinNode=nodes_.get(nodeRandom_.nextInt(nodes_.size()));
830-
}
831-
832-
if (pMinNode != null && !pMinNode.equals(connectedNode)){
833-
log.info("Switch to node: " + pMinNode.hostName + ":" + pMinNode.port);
834-
conn_.close();
835-
switchDataNode(pMinNode);
844+
if (pMinNode != null && !pMinNode.equals(connectedNode)){
845+
log.info("Switch to node: " + pMinNode.hostName + ":" + pMinNode.port);
846+
conn_.close();
847+
switchDataNode(pMinNode);
848+
}
836849
}
837850
}
838851
} else {

src/com/xxdb/ExclusiveDBConnectionPool.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,18 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
7878
if (count <= 0)
7979
throw new RuntimeException("The thread count can not be less than 0");
8080
if (!loadBalance) {
81+
// not enable loadBalance
8182
for (int i=0; i<count; ++i) {
8283
DBConnection conn = new DBConnection(false, useSSL, compress, usePython);
83-
conn.setLoadBalance(false);
84-
if(!conn.connect(host, port, uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites))
85-
throw new RuntimeException("Can't connect to the specified host.");
84+
try {
85+
boolean isConnected = conn.connect(host, port, uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites, false, loadBalance);
86+
if (!isConnected) {
87+
throw new RuntimeException("Can't connect to the specified host.");
88+
}
89+
} catch (Exception e) {
90+
throw new RuntimeException("Can't connect to the specified host: ", e);
91+
}
92+
8693
workers_.add(new AsyncWorker(conn));
8794
}
8895
} else {
@@ -108,8 +115,7 @@ public ExclusiveDBConnectionPool(String host, int port, String uid, String pwd,
108115
}
109116
for (int i=0; i<count; ++i) {
110117
DBConnection conn = new DBConnection(false, useSSL, compress, usePython);
111-
conn.setLoadBalance(false);
112-
if(!conn.connect(hosts[i % nodeCount], ports[i % nodeCount], uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites))
118+
if(!conn.connect(hosts[i % nodeCount], ports[i % nodeCount], uid, pwd, initialScript, enableHighAvailability, highAvailabilitySites,false,false))
113119
throw new RuntimeException("Can't connect to the host " + nodes.getString(i));
114120
workers_.add(new AsyncWorker(conn));
115121
}

src/com/xxdb/data/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
public class Utils {
1717

18-
public static final String JAVA_API_VERSION = "3.00.0.0";
18+
public static final String JAVA_API_VERSION = "3.00.0.1";
1919

2020
public static final int DISPLAY_ROWS = 20;
2121
public static final int DISPLAY_COLS = 100;

test/com/xxdb/ConcurrentReadTest.java

Lines changed: 0 additions & 77 deletions
This file was deleted.

test/com/xxdb/ConcurrentTest.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package com.xxdb;
2+
3+
import com.xxdb.data.BasicLong;
4+
import com.xxdb.data.Entity;
5+
import org.junit.After;
6+
import org.junit.Before;
7+
import org.junit.Test;
8+
9+
import java.io.FileInputStream;
10+
import java.io.IOException;
11+
import java.util.Properties;
12+
import java.util.Random;
13+
import java.util.ResourceBundle;
14+
public class ConcurrentTest {
15+
private static DBConnection conn;
16+
static ResourceBundle bundle = ResourceBundle.getBundle("com/xxdb/setup/settings");
17+
static String HOST = bundle.getString("HOST");
18+
static int PORT = Integer.parseInt(bundle.getString("PORT"));
19+
@Before
20+
public void setUp() throws IOException {
21+
conn = new DBConnection();
22+
try {
23+
if (!conn.connect(HOST, PORT, "admin", "123456")) {
24+
throw new IOException("Failed to connect to 2xdb server");
25+
}
26+
} catch (IOException ex) {
27+
ex.printStackTrace();
28+
}
29+
}
30+
@After
31+
public void after() throws IOException, InterruptedException {
32+
conn.close();
33+
}
34+
35+
class ConcurrentRead extends Thread {
36+
@Override
37+
public void run() {
38+
DBConnection conn1= new DBConnection();
39+
StringBuilder sb = new StringBuilder();
40+
sb.append("exec count(*) from t where qty=");
41+
int i = new Random().nextInt(1000);
42+
sb.append(i);
43+
try {
44+
Thread.sleep(1000);
45+
if (!conn1.connect(HOST, PORT, "admin", "123456")) {
46+
throw new IOException("Failed to connect to 2xdb server");
47+
}
48+
} catch (IOException | InterruptedException ex) {
49+
ex.printStackTrace();
50+
}
51+
try {
52+
conn1.run("t= loadTable(\"dfs://db1\", `trades)");
53+
Entity res = conn1.run(sb.toString());
54+
System.out.println(res);
55+
} catch (IOException e) {
56+
e.printStackTrace();
57+
}
58+
conn1.close();
59+
}
60+
}
61+
@Test
62+
public void test_ConcurrentRead() throws IOException, InterruptedException {
63+
String script = "n=1000000;" +
64+
"date=rand(2018.08.01..2018.08.03,n);" +
65+
"sym=rand(`AAPL`MS`C`YHOO,n);" +
66+
"qty=rand(1..1000,n);" +
67+
"price=rand(100.0,n);" +
68+
"t=table(date,sym,qty,price);" +
69+
"if(existsDatabase(\"dfs://db1\")){\n" +
70+
"\tdropDatabase(\"dfs://db1\")\n" +
71+
"};" +
72+
"db=database(\"dfs://db1\",VALUE,2018.08.01..2018.08.03);" +
73+
"trades=db.createPartitionedTable(t,`trades,`date).append!(t);";
74+
conn.run(script);
75+
for (int i=0;i<100;i++){
76+
new Thread(new ConcurrentRead()).start();
77+
}
78+
Thread.sleep(1000);
79+
}
80+
81+
class ConcurrentWrite extends Thread {
82+
private final int id;
83+
84+
public ConcurrentWrite(int i) {
85+
this.id=i;
86+
}
87+
@Override
88+
public void run() {
89+
DBConnection conn1= new DBConnection();
90+
StringBuilder sb = new StringBuilder();
91+
sb.append("t=table(take("+this.id+",100) as id, take(`w,100) as val );");
92+
sb.append("tb= loadTable(\"dfs://writeJava\", `pt);");
93+
sb.append("tb.append!(t);exec count(*) from tb");
94+
try {
95+
if (!conn1.connect(HOST, PORT, "admin", "123456")) {
96+
throw new IOException("Failed to connect to 2xdb server");
97+
}
98+
BasicLong bit = (BasicLong) conn1.run(sb.toString());
99+
System.out.println(id+"="+bit.getLong());
100+
} catch (IOException ex) {
101+
ex.printStackTrace();
102+
}finally {
103+
conn1.close();
104+
}
105+
}
106+
}
107+
@Test
108+
public void test_ConcurrentWrite() throws IOException, InterruptedException {
109+
String script = "t=table(1 as id,`q as val);" +
110+
"if(existsDatabase(\"dfs://writeJava\")){\n" +
111+
"\tdropDatabase(\"dfs://writeJava\")\n" +
112+
"};" +
113+
"db=database(\"dfs://writeJava\",VALUE,1..1000);" +
114+
"pt=db.createPartitionedTable(t,`pt,`id).append!(t);";
115+
conn.run(script);
116+
for (int i=0;i<100;i++){
117+
new Thread(new ConcurrentWrite(i)).start();
118+
}
119+
Thread.sleep(5000);
120+
conn.run( "\tdropDatabase(\"dfs://writeJava\")\n" );
121+
}
122+
}

0 commit comments

Comments
 (0)