Skip to content

Commit fe9da78

Browse files
JackShi148stuBirdFlymiyuan-ljrshenyunlongmaochongxin
authored
Namespace mapping to database in observer in one HBase client; set RPC connect timeout to Table Client (#78)
* finish connection function * hbase 1.x compatibility: filter relevant (#51) * filter * add scan tests * add scan lease test (#53) * support allowPatialResult (#56) * [Chore] use obkv-table-client 1.2.13-SNAPSHOT for testing (#57) * init bufferedMutator * finish validateFamily and asyncExecute * correct log in OHBufferedMutatorImpl * pass self-test * format code * get bugfix * add retry when batch fails * remove test print * format code * make interface more generalized * test (#59) * random row filter (#62) * format BufferedMutator test case * remove redundancy, add some comments * fix (#66) * fix type of a bufferedMutator. Optimize by review * OHBufferedMutator in OBKV Hbase 1_x_comp (#64) * init bufferedMutator * finish validateFamily and asyncExecute * correct log in OHBufferedMutatorImpl * pass self-test * format code * add retry when batch fails * remove test print * format code * make interface more generalized * format BufferedMutator test case * remove redundancy, add some comments * fix type of a bufferedMutator. Optimize by review * hbase_multi_column_family_dev (#67) * hbase_multi_column_family_dev * fix * fix revierw * OHBufferedMutator set and use runtimeBatchExecutor in ObTableClient * OHBufferedMutator set and add runtimeBatchExecutor in ObTableClient (#68) * init bufferedMutator * finish validateFamily and asyncExecute * correct log in OHBufferedMutatorImpl * pass self-test * format code * add retry when batch fails * remove test print * format code * make interface more generalized * format BufferedMutator test case * remove redundancy, add some comments * fix type of a bufferedMutator. Optimize by review * OHBufferedMutator set and use runtimeBatchExecutor in ObTableClient * Bugfix/sql and conflict fixes (#69) * add sql for multi-cf test; fix conflict error * fix buffered mutation family_violation check * Remove multi-family checks for buffered mutations * fix check family empty * [Chore] refresh code from master branch and upgrade table client version (#71) * [Fix] checkAndMutate and get/Scan with filter return -5006 when include special character (#58) * [Fix] checkAndMutate and get/Scan with filter return -5006 when include special character * [Chore] fix review * [Fix] fix review * [Fix] fix test * [Fix] fix test testScanSessionClean * multi-namespace in one HBase client * format code * set rpcConnectTimeout to tableClient * multi-namespace in ocp mode and odp mode * revert self-defined pom xml * use the test as initial testing case * format code * use 'default' database when param_url misses the database parameter * add new test case testing multi-namespace, remove some comments --------- Co-authored-by: stuBirdFly <1065492934@qq.com> Co-authored-by: stuBirdFly <84010733+stuBirdFly@users.noreply.github.com> Co-authored-by: miyuan-ljr <miyuan.ljr@antgroup.com> Co-authored-by: Shen Yunlong <44497386+shenyunlong@users.noreply.github.com> Co-authored-by: DingZhen <maochongxin@gmail.com>
1 parent 3c8c531 commit fe9da78

File tree

11 files changed

+292
-13
lines changed

11 files changed

+292
-13
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
3636
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3737
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
38+
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
3839
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
3940
import com.alipay.oceanbase.rpc.table.ObKVParams;
4041
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
@@ -195,8 +196,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
195196
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
196197
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
197198
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
198-
this.obTableClient = ObTableClientManager
199-
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
199+
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
200+
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
201+
this.tableNameString, ohConnectionConf));
200202

201203
finishSetUp();
202204
}
@@ -242,8 +244,9 @@ public OHTable(Configuration configuration, final byte[] tableName,
242244
this.tableNameString = Bytes.toString(tableName);
243245
this.executePool = executePool;
244246
this.cleanupPoolOnClose = false;
245-
this.obTableClient = ObTableClientManager
246-
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
247+
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
248+
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
249+
this.tableNameString, ohConnectionConf));
247250

248251
finishSetUp();
249252
}
@@ -260,6 +263,7 @@ public OHTable(Configuration configuration, final byte[] tableName,
260263
* @param executePool ExecutorService to be used.
261264
* @throws IllegalArgumentException if the param error
262265
*/
266+
@InterfaceAudience.Private
263267
public OHTable(final byte[] tableName, final ObTableClient obTableClient,
264268
final ExecutorService executePool) {
265269
checkArgument(tableName != null, "tableNameString is blank.");
@@ -306,7 +310,8 @@ public OHTable(TableName tableName, Connection connection,
306310
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
307311
this.writeBufferSize = connectionConfig.getWriteBufferSize();
308312
this.tableName = tableName.getName();
309-
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
313+
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
314+
this.tableNameString, connectionConfig));
310315
}
311316

312317
/**
@@ -364,6 +369,36 @@ private void finishSetUp() {
364369
WRITE_BUFFER_SIZE_DEFAULT);
365370
}
366371

372+
private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
373+
OHConnectionConfiguration ohConnectionConf)
374+
throws IOException {
375+
if (tableNameString.indexOf(':') != -1) {
376+
String[] params = tableNameString.split(":");
377+
if (params.length != 2) {
378+
throw new IllegalArgumentException("Please check the format of self-defined "
379+
+ "namespace and qualifier: { "
380+
+ tableNameString + " }");
381+
}
382+
String database = params[0];
383+
checkArgument(isNotBlank(database), "self-defined namespace cannot be blank or null { "
384+
+ tableNameString + " }");
385+
if (ohConnectionConf.isOdpMode()) {
386+
ohConnectionConf.setDatabase(database);
387+
} else {
388+
String databaseSuffix = "database=" + database;
389+
String paramUrl = ohConnectionConf.getParamUrl();
390+
int databasePos = paramUrl.indexOf("database");
391+
if (databasePos == -1) {
392+
paramUrl += "&" + databaseSuffix;
393+
} else {
394+
paramUrl = paramUrl.substring(0, databasePos) + databaseSuffix;
395+
}
396+
ohConnectionConf.setParamUrl(paramUrl);
397+
}
398+
}
399+
return ohConnectionConf;
400+
}
401+
367402
@Override
368403
public byte[] getTableName() {
369404
return tableName;

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
public class OHTablePool implements Closeable {
5151

52+
private String originTabelName = null;
5253
private final PoolMap<String, HTableInterface> tables;
5354
private final int maxSize;
5455
private final PoolMap.PoolType poolType;
@@ -314,6 +315,9 @@ int getCurrentPoolSize(String tableName) {
314315
* @param paramUrl the table root server http url
315316
*/
316317
public void setParamUrl(final String tableName, String paramUrl) {
318+
if (originTabelName == null) {
319+
originTabelName = tableName;
320+
}
317321
setTableAttribute(tableName, HBASE_OCEANBASE_PARAM_URL, Bytes.toBytes(paramUrl));
318322
}
319323

@@ -334,6 +338,9 @@ public String getParamUrl(final String tableName) {
334338
* @param fullUserName the table login username
335339
*/
336340
public void setFullUserName(final String tableName, final String fullUserName) {
341+
if (originTabelName == null) {
342+
originTabelName = tableName;
343+
}
337344
setTableAttribute(tableName, HBASE_OCEANBASE_FULL_USER_NAME, Bytes.toBytes(fullUserName));
338345
}
339346

@@ -354,6 +361,9 @@ public String getFullUserName(final String tableName) {
354361
* @param password the table login password
355362
*/
356363
public void setPassword(final String tableName, final String password) {
364+
if (originTabelName == null) {
365+
originTabelName = tableName;
366+
}
357367
setTableAttribute(tableName, HBASE_OCEANBASE_PASSWORD, Bytes.toBytes(password));
358368
}
359369

@@ -374,6 +384,9 @@ public String getPassword(final String tableName) {
374384
* @param sysUserName the sys username
375385
*/
376386
public void setSysUserName(final String tableName, final String sysUserName) {
387+
if (originTabelName == null) {
388+
originTabelName = tableName;
389+
}
377390
setTableAttribute(tableName, HBASE_OCEANBASE_SYS_USER_NAME, Bytes.toBytes(sysUserName));
378391
}
379392

@@ -394,6 +407,9 @@ public String getSysUserName(final String tableName) {
394407
* @param sysPassword the sys user password
395408
*/
396409
public void setSysPassword(final String tableName, final String sysPassword) {
410+
if (originTabelName == null) {
411+
originTabelName = tableName;
412+
}
397413
setTableAttribute(tableName, HBASE_OCEANBASE_SYS_PASSWORD, Bytes.toBytes(sysPassword));
398414
}
399415

@@ -481,6 +497,10 @@ public long getWriteBufferSize(String tableName) {
481497
.toLong(attr);
482498
}
483499

500+
public String getOriginTableName() {
501+
return this.originTabelName;
502+
}
503+
484504
/**
485505
* Sets the operation timeout for the specified tables in this pool.
486506
*
@@ -520,6 +540,9 @@ public int getOperationTimeout(String tableName) {
520540
* @param odpAddr ODP address
521541
*/
522542
public void setOdpAddr(final String tableName, String odpAddr) {
543+
if (originTabelName == null) {
544+
originTabelName = tableName;
545+
}
523546
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_ADDR, Bytes.toBytes(odpAddr));
524547
}
525548

@@ -545,6 +568,9 @@ public String getOdpAddr(String tableName) {
545568
* @param odpPort ODP port
546569
*/
547570
public void setOdpPort(final String tableName, int odpPort) {
571+
if (originTabelName == null) {
572+
originTabelName = tableName;
573+
}
548574
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_PORT, Bytes.toBytes(odpPort));
549575
}
550576

@@ -564,6 +590,9 @@ public int getOdpPort(String tableName) {
564590
* @param odpMode ODP mode
565591
*/
566592
public void setOdpMode(final String tableName, boolean odpMode) {
593+
if (originTabelName == null) {
594+
originTabelName = tableName;
595+
}
567596
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_MODE, Bytes.toBytes(odpMode));
568597
}
569598

@@ -583,6 +612,9 @@ public boolean getOdpMode(String tableName) {
583612
* @param database ODP database name
584613
*/
585614
public void setDatabase(final String tableName, String database) {
615+
if (originTabelName == null) {
616+
originTabelName = tableName;
617+
}
586618
setTableAttribute(tableName, HBASE_OCEANBASE_DATABASE, Bytes.toBytes(database));
587619
}
588620

@@ -653,6 +685,9 @@ public Object getTableExtendAttribute(String tableName, String attributeName) {
653685
}
654686

655687
public void setRuntimeBatchExecutor(String tableName, ExecutorService runtimeBatchExecutor) {
688+
if (originTabelName == null) {
689+
originTabelName = tableName;
690+
}
656691
setTableExtendAttribute(tableName, HBASE_OCEANBASE_BATCH_EXECUTOR, runtimeBatchExecutor);
657692
}
658693

src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,4 +159,8 @@ public final class OHConstants {
159159

160160
public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;
161161

162+
public static final String SOCKET_TIMEOUT = "ipc.socket.timeout";
163+
164+
public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds
165+
162166
}

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,25 +25,29 @@
2525
import java.util.Properties;
2626

2727
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
28+
import static org.apache.commons.lang.StringUtils.isBlank;
29+
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
30+
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
2831

2932
@InterfaceAudience.Private
3033
public class OHConnectionConfiguration {
34+
private String paramUrl;
35+
private String database;
3136
private final Properties properties;
32-
private final String paramUrl;
3337
private final String fullUsername;
3438
private final String password;
3539
private final String sysUsername;
3640
private final String sysPassword;
3741
private final String odpAddr;
3842
private final int odpPort;
3943
private final boolean odpMode;
40-
private final String database;
4144
private final long writeBufferSize;
4245
private final int operationTimeout;
4346
private final int scannerCaching;
4447
private final long scannerMaxResultSize;
4548
private final int maxKeyValueSize;
4649
private final int rpcTimeout;
50+
private final int rpcConnectTimeout;
4751

4852
public OHConnectionConfiguration(Configuration conf) {
4953
this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL);
@@ -54,11 +58,27 @@ public OHConnectionConfiguration(Configuration conf) {
5458
this.odpAddr = conf.get(HBASE_OCEANBASE_ODP_ADDR);
5559
this.odpPort = conf.getInt(HBASE_OCEANBASE_ODP_PORT, -1);
5660
this.odpMode = conf.getBoolean(HBASE_OCEANBASE_ODP_MODE, false);
57-
this.database = conf.get(HBASE_OCEANBASE_DATABASE);
61+
String database = conf.get(HBASE_OCEANBASE_DATABASE);
62+
if (isBlank(database)) {
63+
database = "default";
64+
}
65+
this.database = database;
5866
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
5967
this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000);
6068
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
6169
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
70+
int rpcConnectTimeout = -1;
71+
if (conf.get(SOCKET_TIMEOUT_CONNECT) != null) {
72+
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
73+
} else {
74+
if (conf.get(SOCKET_TIMEOUT) != null) {
75+
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
76+
} else {
77+
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT,
78+
DEFAULT_SOCKET_TIMEOUT_CONNECT);
79+
}
80+
}
81+
this.rpcConnectTimeout = rpcConnectTimeout;
6282
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", Integer.MAX_VALUE);
6383
this.scannerMaxResultSize = conf.getLong("hbase.client.scanner.max.result.size",
6484
WRITE_BUFFER_SIZE_DEFAULT);
@@ -72,6 +92,14 @@ public OHConnectionConfiguration(Configuration conf) {
7292
}
7393
}
7494

95+
public void setParamUrl(String paramUrl) {
96+
this.paramUrl = paramUrl;
97+
}
98+
99+
public void setDatabase(String database) {
100+
this.database = database;
101+
}
102+
75103
public long getWriteBufferSize() {
76104
return this.writeBufferSize;
77105
}
@@ -92,6 +120,10 @@ public int getRpcTimeout() {
92120
return this.rpcTimeout;
93121
}
94122

123+
public int getRpcConnectTimeout() {
124+
return this.rpcConnectTimeout;
125+
}
126+
95127
public long getScannerMaxResultSize() {
96128
return this.scannerMaxResultSize;
97129
}

src/main/java/com/alipay/oceanbase/hbase/util/OHTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public OHTableFactory(Configuration conf, OHTablePool tablePool,
6565
public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
6666
try {
6767
String tableNameStr = Bytes.toString(tableName);
68+
tableNameStr = tableNameStr.equals(this.tablePool.getOriginTableName()) ? tableNameStr
69+
: this.tablePool.getOriginTableName();
6870

6971
OHTable ht = new OHTable(adjustConfiguration(copyConfiguration(config), tableNameStr),
7072
tableName, this.threadPool);

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import com.alipay.oceanbase.rpc.constant.Constants;
2222
import com.google.common.base.Objects;
2323
import org.apache.hadoop.classification.InterfaceAudience;
24+
import org.apache.hadoop.conf.Configuration;
25+
import org.apache.hadoop.hbase.client.ConnectionConfiguration;
2426

2527
import java.io.IOException;
2628
import java.util.Map;
@@ -58,7 +60,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
5860
checkArgument(isNotBlank(connectionConfig.getParamUrl()), HBASE_OCEANBASE_PARAM_URL
5961
+ " is blank");
6062
obTableClientKey = new ObTableClientKey();
61-
obTableClientKey.setParamUrl(connectionConfig.getParamUrl());
63+
String paramUrl = connectionConfig.getParamUrl();
64+
if (!paramUrl.contains("database")) {
65+
paramUrl += "&database=default";
66+
}
67+
obTableClientKey.setParamUrl(paramUrl);
6268
obTableClientKey.setSysUserName(connectionConfig.getSysUsername());
6369
if (connectionConfig.getSysPassword() == null) {
6470
obTableClientKey.setSysPassword(Constants.EMPTY_STRING);
@@ -80,11 +86,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
8086
obTableClientKey.getProperties().put(property.getKey(), property.getValue());
8187
}
8288

83-
return getOrCreateObTableClient(obTableClientKey);
89+
return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
8490
}
8591

86-
public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey)
87-
throws IOException {
92+
public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey,
93+
int rpcConnectTimeout) throws IOException {
8894
if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
8995
ReentrantLock tmp = new ReentrantLock();
9096
ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp);
@@ -109,6 +115,7 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
109115
}
110116
obTableClient.setFullUserName(obTableClientKey.getFullUserName());
111117
obTableClient.setPassword(obTableClientKey.getPassword());
118+
obTableClient.setRpcConnectTimeout(rpcConnectTimeout);
112119
obTableClient.init();
113120
OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
114121
}

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,21 @@ public void testFilter() throws Exception {
509509
tryPut(hTable, putKey2Column2Value1);
510510
tryPut(hTable, putKey2Column2Value2);
511511

512+
// time may be different
513+
// +---------+-----+----------------+--------+
514+
// | K | Q | T | V |
515+
// +---------+-----+----------------+--------+
516+
// | getKey1 | abc | -1728834971469 | value1 |
517+
// | getKey1 | abc | -1728834971399 | value2 |
518+
// | getKey1 | abc | -1728834971330 | value1 |
519+
// | getKey1 | def | -1728834971748 | value2 |
520+
// | getKey1 | def | -1728834971679 | value1 |
521+
// | getKey1 | def | -1728834971609 | value2 |
522+
// | getKey1 | def | -1728834971540 | value1 |
523+
// | getKey2 | def | -1728834971887 | value2 |
524+
// | getKey2 | def | -1728834971818 | value1 |
525+
// +---------+-----+----------------+--------+
526+
512527
filter = new ColumnPrefixFilter(Bytes.toBytes("e"));
513528
get = new Get(toBytes(key1));
514529
get.setMaxVersions(10);

0 commit comments

Comments
 (0)