Skip to content

Commit fe943d2

Browse files
committed
adapt hbase namespace
1 parent 5712ffa commit fe943d2

File tree

6 files changed

+167
-34
lines changed

6 files changed

+167
-34
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException
220220
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
221221
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
222222
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
223-
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
224-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
223+
int numRetries = ohConnectionConf.getNumRetries();
225224
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
226225
this.tableNameString, ohConnectionConf));
227226
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
@@ -273,8 +272,7 @@ public OHTable(Configuration configuration, final byte[] tableName,
273272
this.executePool = executePool;
274273
this.cleanupPoolOnClose = false;
275274
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
276-
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
277-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
275+
int numRetries = ohConnectionConf.getNumRetries();
278276
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
279277
this.tableNameString, ohConnectionConf));
280278
this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
@@ -345,8 +343,7 @@ public OHTable(TableName tableName, Connection connection,
345343
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
346344
this.writeBufferSize = connectionConfig.getWriteBufferSize();
347345
this.tableName = tableName.getName();
348-
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
349-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
346+
int numRetries = connectionConfig.getNumRetries();
350347
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
351348
this.tableNameString, connectionConfig));
352349
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
@@ -389,8 +386,7 @@ public OHTable(Connection connection, ObTableBuilderBase builder,
389386
this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK,
390387
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
391388
this.writeBufferSize = connectionConfig.getWriteBufferSize();
392-
int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
393-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
389+
int numRetries = connectionConfig.getNumRetries();
394390
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
395391
this.tableNameString, connectionConfig));
396392
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
@@ -462,7 +458,7 @@ private void finishSetUp() {
462458
WRITE_BUFFER_SIZE_DEFAULT);
463459
}
464460

465-
private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
461+
public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
466462
OHConnectionConfiguration ohConnectionConf)
467463
throws IllegalArgumentException {
468464
if (tableNameString.indexOf(':') != -1) {

src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,17 @@
3030
import java.util.regex.Pattern;
3131

3232
public class OHAdmin implements Admin {
33-
private final ObTableClient tableClient;
33+
private boolean aborted = false;
3434
private final OHConnectionImpl connection;
35-
private boolean aborted = false;
36-
OHAdmin(ObTableClient tableClient, OHConnectionImpl connection) {
37-
this.tableClient = tableClient;
35+
private final Configuration conf;
36+
OHAdmin(OHConnectionImpl connection) {
3837
this.connection = connection;
38+
this.conf = connection.getConfiguration();
3939
}
4040

4141
@Override
4242
public int getOperationTimeout() {
43-
return (int) tableClient.getRuntimeMaxWait();
43+
return connection.getOHConnectionConfiguration().getOperationTimeout();
4444
}
4545

4646
@Override
@@ -62,6 +62,8 @@ public Connection getConnection() {
6262

6363
@Override
6464
public boolean tableExists(TableName tableName) throws IOException {
65+
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
66+
ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
6567
OHTableExistsExecutor executor = new OHTableExistsExecutor(tableClient);
6668
return executor.tableExists(tableName.getNameAsString());
6769
}
@@ -163,6 +165,8 @@ public Future<Void> createTableAsync(TableDescriptor tableDescriptor, byte[][] b
163165

164166
@Override
165167
public void deleteTable(TableName tableName) throws IOException {
168+
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
169+
ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
166170
OHDeleteTableExecutor executor = new OHDeleteTableExecutor(tableClient);
167171
try {
168172
executor.deleteTable(tableName.getNameAsString());
@@ -603,10 +607,12 @@ public List<RegionMetrics> getRegionMetrics(ServerName serverName) throws IOExce
603607

604608
@Override
605609
public List<RegionMetrics> getRegionMetrics(ServerName serverName, TableName tableName) throws IOException {
606-
OHRegionMetricsExecutor executor = new OHRegionMetricsExecutor(tableClient);
607610
if (tableName == null) {
608611
throw new FeatureNotSupportedException("does not support tableName is null");
609612
}
613+
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
614+
ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
615+
OHRegionMetricsExecutor executor = new OHRegionMetricsExecutor(tableClient);
610616
return executor.getRegionMetrics(tableName.getNameAsString());
611617
}
612618

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ public class OHConnectionConfiguration {
5757
private final int rpcConnectTimeout;
5858
private final long writeBufferPeriodicFlushTimeoutMs;
5959
private final long writeBufferPeriodicFlushTimerTickMs;
60+
private final int numRetries;
6061

6162
public OHConnectionConfiguration(Configuration conf) {
6263
this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL);
@@ -100,6 +101,8 @@ public OHConnectionConfiguration(Configuration conf) {
100101
}
101102
}
102103
this.rpcConnectTimeout = rpcConnectTimeout;
104+
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
105+
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
103106
this.scannerCaching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
104107
Integer.MAX_VALUE);
105108
this.scannerMaxResultSize = conf.getLong(
@@ -209,4 +212,8 @@ public long getWriteBufferPeriodicFlushTimeoutMs() {
209212
public long getWriteBufferPeriodicFlushTimerTickMs() {
210213
return this.writeBufferPeriodicFlushTimerTickMs;
211214
}
215+
216+
public int getNumRetries() {
217+
return this.numRetries;
218+
}
212219
}

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -141,21 +141,17 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
141141

142142
@Override
143143
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
144-
ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
145-
int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
146-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
147-
ObTableClientManager.initTimeoutAndRetryTimes(obTableClient, connectionConfig, numRetries);
144+
// need to use new connection configuration
145+
// to avoid change the database in original param url by namespace in tableName
146+
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
147+
ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
148148
OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(obTableClient);
149149
return executor.getRegionLocator(String.valueOf(tableName));
150150
}
151151

152152
@Override
153153
public Admin getAdmin() throws IOException {
154-
ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
155-
int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
156-
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
157-
ObTableClientManager.initTimeoutAndRetryTimes(obTableClient, connectionConfig, numRetries);
158-
return new OHAdmin(obTableClient, this);
154+
return new OHAdmin(this);
159155
}
160156

161157
private void shutdownBatchPool(ExecutorService pool) {

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919

2020
import com.alipay.oceanbase.rpc.ObTableClient;
2121
import com.alipay.oceanbase.rpc.constant.Constants;
22+
import com.alipay.oceanbase.hbase.OHTable;
2223
import com.google.common.base.Objects;
2324
import org.apache.hadoop.classification.InterfaceAudience;
25+
import org.apache.hadoop.hbase.TableName;
2426

2527
import java.io.IOException;
2628
import java.util.Map;
@@ -126,9 +128,18 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
126128
return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey);
127129
}
128130

129-
public static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf, int numRetries) {
131+
public static ObTableClient getOrCreateObTableClientByTableName(TableName tableName, OHConnectionConfiguration connectionConfig) throws IllegalArgumentException,
132+
IOException {
133+
String tableNameString = tableName.getNameAsString();
134+
ObTableClient obTableClient = getOrCreateObTableClient(
135+
OHTable.setUserDefinedNamespace(tableNameString, connectionConfig));
136+
ObTableClientManager.initTimeoutAndRetryTimes(obTableClient, connectionConfig);
137+
return obTableClient;
138+
}
139+
140+
private static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf) {
130141
obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout());
131-
obTableClient.setRuntimeRetryTimes(numRetries);
142+
obTableClient.setRuntimeRetryTimes(ohConnectionConf.getNumRetries());
132143
obTableClient.setRuntimeMaxWait(ohConnectionConf.getOperationTimeout());
133144
obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getOperationTimeout());
134145
}

0 commit comments

Comments
 (0)