Skip to content

Commit b193e00

Browse files
author
lucaijun
committed
AJ-542: add symbol connection pool
1 parent 021d7fd commit b193e00

File tree

2 files changed

+315
-0
lines changed

2 files changed

+315
-0
lines changed
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package com.xxdb;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.ArrayList;
7+
import java.util.Objects;
8+
import java.util.concurrent.CopyOnWriteArrayList;
9+
import java.util.concurrent.atomic.AtomicBoolean;
10+
11+
public class SimpleDBConnectionPool {
12+
private SimpleDBConnectionPoolImpl connectionPool;
13+
private String hostName;
14+
private int port;
15+
private String userId;
16+
private String password;
17+
private int initialPoolSize;
18+
private String initialScript;
19+
private boolean compress;
20+
private boolean useSSL;
21+
private boolean usePython;
22+
private boolean loadBalance;
23+
private boolean enableHighAvailability;
24+
private String[] highAvailabilitySites;
25+
private boolean reconnect = true;
26+
private static final Logger log = LoggerFactory.getLogger(DBConnection.class);
27+
28+
public SimpleDBConnectionPool(SimpleDBConnectionPoolConfig simpleDBConnectionPoolConfig) {
29+
if (Objects.isNull(simpleDBConnectionPoolConfig.getHostName()))
30+
throw new RuntimeException("The host name can not be null.");
31+
this.hostName = simpleDBConnectionPoolConfig.getHostName();
32+
if (simpleDBConnectionPoolConfig.getPort() <= 0)
33+
throw new RuntimeException("The port needs to be greater than 0.");
34+
this.port = simpleDBConnectionPoolConfig.getPort();
35+
if (Objects.isNull(simpleDBConnectionPoolConfig.getUserId()))
36+
throw new RuntimeException("The user Id can not be null.");
37+
this.userId = simpleDBConnectionPoolConfig.getUserId();
38+
if (Objects.isNull(simpleDBConnectionPoolConfig.getPassword()))
39+
throw new RuntimeException("The password can not be null.");
40+
this.password = simpleDBConnectionPoolConfig.getPassword();
41+
if (simpleDBConnectionPoolConfig.getInitialPoolSize() <= 0)
42+
throw new RuntimeException("The number of connection pools needs to be greater than 0.");
43+
this.initialPoolSize = simpleDBConnectionPoolConfig.getInitialPoolSize();
44+
this.initialScript = simpleDBConnectionPoolConfig.getInitialScript();
45+
this.compress = simpleDBConnectionPoolConfig.isCompress();
46+
this.useSSL = simpleDBConnectionPoolConfig.isUseSSL();
47+
this.usePython = simpleDBConnectionPoolConfig.isUsePython();
48+
this.loadBalance = simpleDBConnectionPoolConfig.isLoadBalance();
49+
this.enableHighAvailability = simpleDBConnectionPoolConfig.isEnableHighAvailability();
50+
this.highAvailabilitySites = simpleDBConnectionPoolConfig.getHighAvailabilitySites();
51+
}
52+
53+
public DBConnection getConnection() {
54+
if (isClosed())
55+
throw new RuntimeException("The connection pool has been closed.");
56+
else if (Objects.nonNull(connectionPool)) {
57+
return connectionPool.getConnection();
58+
} else {
59+
synchronized (this) {
60+
if (Objects.isNull(connectionPool)) {
61+
connectionPool = new SimpleDBConnectionPoolImpl();
62+
}
63+
}
64+
return connectionPool.getConnection();
65+
}
66+
}
67+
68+
public int getActiveConnections() {
69+
return connectionPool.getCount(false);
70+
}
71+
72+
public int getIdleConnections() {
73+
return connectionPool.getCount(true);
74+
}
75+
76+
public void close() {
77+
if (Objects.nonNull(connectionPool))
78+
connectionPool.close();
79+
else
80+
log.info("The connection pool is closed.");
81+
}
82+
83+
public boolean isClosed() {
84+
if (Objects.nonNull(connectionPool))
85+
return connectionPool.isClosed();
86+
else
87+
return false;
88+
}
89+
90+
protected class SimpleDBConnectionPoolImpl {
91+
private CopyOnWriteArrayList<PoolEntry> poolEntries;
92+
private AtomicBoolean isShutdown = new AtomicBoolean();
93+
94+
SimpleDBConnectionPoolImpl() {
95+
ArrayList<PoolEntry> poolEntryArrayList = new ArrayList<>(initialPoolSize);
96+
try {
97+
for (int i = 0; i < initialPoolSize; i++) {
98+
PoolEntry poolEntry = new PoolEntry(useSSL, compress, usePython, String.format("DolphinDBConnection_%d", i + 1));
99+
if (!poolEntry.connect(hostName, port, userId, password, initialScript, enableHighAvailability, highAvailabilitySites, reconnect, loadBalance)){
100+
log.error(String.format("Connection %s connect failure.", poolEntry.connectionName));
101+
};
102+
poolEntryArrayList.add(poolEntry);
103+
}
104+
poolEntries = new CopyOnWriteArrayList<>(poolEntryArrayList);
105+
} catch (Exception e) {
106+
log.error("Create connection pool failure, because " + e.getMessage());
107+
}
108+
}
109+
110+
DBConnection getConnection() {
111+
for (PoolEntry poolEntry : poolEntries) {
112+
if (poolEntry.inUse.compareAndSet(false, true)) {
113+
return poolEntry;
114+
}
115+
}
116+
log.error("All connections in the connection pool are currently in use.");
117+
return null;
118+
}
119+
120+
int getCount(boolean isIdle) {
121+
int count = 0;
122+
for (PoolEntry poolEntry : this.poolEntries) {
123+
if (poolEntry.isIdle() == isIdle)
124+
count++;
125+
}
126+
return count;
127+
}
128+
129+
void close() {
130+
if (!this.isShutdown.getAndSet(true)) {
131+
log.info("Closing the connection pool......");
132+
for (PoolEntry poolEntry : poolEntries) {
133+
poolEntry.release();
134+
}
135+
log.info("Closing the connection pool finished.");
136+
} else {
137+
log.info("The connection pool is closed.");
138+
}
139+
}
140+
141+
boolean isClosed() {
142+
return this.isShutdown.get();
143+
}
144+
145+
}
146+
147+
public class PoolEntry extends DBConnection {
148+
AtomicBoolean inUse = new AtomicBoolean(false);
149+
String connectionName;
150+
151+
public PoolEntry(boolean useSSL, boolean compress, boolean usePython, String connectionName) {
152+
super(false, useSSL, compress, usePython);
153+
this.connectionName = connectionName;
154+
}
155+
156+
public String getConnectionName() {
157+
return connectionName;
158+
}
159+
160+
public boolean isIdle() {
161+
return !this.inUse.get();
162+
}
163+
164+
@Override
165+
public void close() {
166+
if (isBusy())
167+
log.error("Cannot release the connection,is running now.");
168+
else
169+
inUse.compareAndSet(true, false);
170+
}
171+
172+
private void release() {
173+
super.close();
174+
}
175+
}
176+
}
177+
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package com.xxdb;
2+
3+
public class SimpleDBConnectionPoolConfig {
4+
private String hostName;
5+
private int port;
6+
private String userId;
7+
private String password;
8+
private int initialPoolSize;
9+
private String initialScript = null;
10+
private boolean compress = false;
11+
private boolean useSSL = false;
12+
private boolean usePython = false;
13+
private boolean loadBalance;
14+
private boolean enableHighAvailability = false;
15+
private String[] highAvailabilitySites = null;
16+
17+
public SimpleDBConnectionPoolConfig(String hostName, int port, String userId, String password, int initialPoolSize, String initialScript, boolean compress, boolean useSSL, boolean usePython, boolean loadBalance, boolean enableHighAvailability, String[] highAvailabilitySites) {
18+
this.hostName = hostName;
19+
this.port = port;
20+
this.userId = userId;
21+
this.password = password;
22+
this.initialPoolSize = initialPoolSize;
23+
this.initialScript = initialScript;
24+
this.compress = compress;
25+
this.useSSL = useSSL;
26+
this.usePython = usePython;
27+
this.loadBalance = loadBalance;
28+
this.enableHighAvailability = enableHighAvailability;
29+
this.highAvailabilitySites = highAvailabilitySites;
30+
}
31+
32+
public SimpleDBConnectionPoolConfig(String hostName, int port, String userId, String password, int initialPoolSize, boolean loadBalance) {
33+
this.hostName = hostName;
34+
this.port = port;
35+
this.userId = userId;
36+
this.password = password;
37+
this.initialPoolSize = initialPoolSize;
38+
this.loadBalance = loadBalance;
39+
}
40+
41+
public String getHostName() {
42+
return hostName;
43+
}
44+
45+
public void setHostName(String hostName) {
46+
this.hostName = hostName;
47+
}
48+
49+
public int getPort() {
50+
return port;
51+
}
52+
53+
public void setPort(int port) {
54+
this.port = port;
55+
}
56+
57+
public String getUserId() {
58+
return userId;
59+
}
60+
61+
public void setUserId(String userId) {
62+
this.userId = userId;
63+
}
64+
65+
public String getPassword() {
66+
return password;
67+
}
68+
69+
public void setPassword(String password) {
70+
this.password = password;
71+
}
72+
73+
public int getInitialPoolSize() {
74+
return initialPoolSize;
75+
}
76+
77+
public void setInitialPoolSize(int initialPoolSize) {
78+
this.initialPoolSize = initialPoolSize;
79+
}
80+
81+
public String getInitialScript() {
82+
return initialScript;
83+
}
84+
85+
public void setInitialScript(String initialScript) {
86+
this.initialScript = initialScript;
87+
}
88+
89+
public boolean isCompress() {
90+
return compress;
91+
}
92+
93+
public void setCompress(boolean compress) {
94+
this.compress = compress;
95+
}
96+
97+
public boolean isUseSSL() {
98+
return useSSL;
99+
}
100+
101+
public void setUseSSL(boolean useSSL) {
102+
this.useSSL = useSSL;
103+
}
104+
105+
public boolean isUsePython() {
106+
return usePython;
107+
}
108+
109+
public void setUsePython(boolean usePython) {
110+
this.usePython = usePython;
111+
}
112+
113+
public boolean isLoadBalance() {
114+
return loadBalance;
115+
}
116+
117+
public void setLoadBalance(boolean loadBalance) {
118+
this.loadBalance = loadBalance;
119+
}
120+
121+
public boolean isEnableHighAvailability() {
122+
return enableHighAvailability;
123+
}
124+
125+
public void setEnableHighAvailability(boolean enableHighAvailability) {
126+
this.enableHighAvailability = enableHighAvailability;
127+
}
128+
129+
public String[] getHighAvailabilitySites() {
130+
return highAvailabilitySites;
131+
}
132+
133+
public void setHighAvailabilitySites(String[] highAvailabilitySites) {
134+
this.highAvailabilitySites = highAvailabilitySites;
135+
}
136+
}
137+
138+

0 commit comments

Comments
 (0)