modifyTableAsync(TableName tableName, TableDescriptor tableDescriptor)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -672,25 +734,67 @@ public void stopRegionServer(String s) throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
+ /**
+ * Get whole cluster status, containing status about:
+ *
+ * hbase version
+ * cluster id
+ * primary/backup master(s)
+ * master's coprocessors
+ * live/dead regionservers
+ * balancer
+ * regions in transition
+ *
+ *
+ * @return cluster status
+ * @throws IOException if a remote or network exception occurs
+ */
@Override
- public ClusterMetrics getClusterMetrics(EnumSet enumSet) throws IOException {
+ public ClusterStatus getClusterStatus() throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
+ /**
+ * Get cluster status with a set of {@link Option} to get desired status.
+ *
+ * @param options
+ * @return cluster status
+ * @throws IOException if a remote or network exception occurs
+ */
@Override
- public List getRegionMetrics(ServerName serverName) throws IOException {
+ public ClusterStatus getClusterStatus(EnumSet options) throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
+ /**
+ * Get {@link RegionLoad} of all regions hosted on a regionserver.
+ *
+ * @param serverName region server from which regionload is required.
+ * @return region load map of all regions hosted on a region server
+ * @throws IOException if a remote or network exception occurs
+ */
@Override
- public List getRegionMetrics(ServerName serverName, TableName tableName) throws IOException {
- if (tableName == null) {
- throw new FeatureNotSupportedException("does not support tableName is null");
- }
+ public Map getRegionLoad(ServerName serverName) throws IOException {
+ throw new FeatureNotSupportedException("does not support yet");
+ }
+
+ /**
+ * Get {@link RegionLoad} of all regions hosted on a regionserver for a table.
+ *
+ * @param serverName region server from which regionload is required.
+ * @param tableName get region load of regions belonging to the table
+ * @return region load map of all regions of a table hosted on a region server
+ * @throws IOException if a remote or network exception occurs
+ */
+ @Override
+ public Map getRegionLoad(ServerName serverName, TableName tableName)
+ throws IOException {
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
- ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
- OHRegionMetricsExecutor executor = new OHRegionMetricsExecutor(tableClient);
- return executor.getRegionMetrics(tableName.getNameAsString());
+ ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(
+ tableName, connectionConf);
+ OHRegionLoadExecutor executor = new OHRegionLoadExecutor(tableName.getNameAsString(),
+ tableClient);
+ return executor.getRegionLoad();
}
@Override
@@ -704,7 +808,8 @@ public void createNamespace(NamespaceDescriptor namespaceDescriptor) throws IOEx
}
@Override
- public Future createNamespaceAsync(NamespaceDescriptor namespaceDescriptor) throws IOException {
+ public Future createNamespaceAsync(NamespaceDescriptor namespaceDescriptor)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -714,7 +819,8 @@ public void modifyNamespace(NamespaceDescriptor namespaceDescriptor) throws IOEx
}
@Override
- public Future modifyNamespaceAsync(NamespaceDescriptor namespaceDescriptor) throws IOException {
+ public Future modifyNamespaceAsync(NamespaceDescriptor namespaceDescriptor)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -729,7 +835,8 @@ public Future deleteNamespaceAsync(String s) throws IOException {
}
@Override
- public NamespaceDescriptor getNamespaceDescriptor(String s) throws NamespaceNotFoundException, IOException {
+ public NamespaceDescriptor getNamespaceDescriptor(String s) throws NamespaceNotFoundException,
+ IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -768,7 +875,8 @@ public synchronized void close() throws IOException {
}
@Override
- public HTableDescriptor[] getTableDescriptorsByTableName(List list) throws IOException {
+ public HTableDescriptor[] getTableDescriptorsByTableName(List list)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -807,13 +915,25 @@ public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCl
throw new FeatureNotSupportedException("does not support yet");
}
+ /**
+ * Helper that delegates to getClusterStatus().getMasterCoprocessors().
+ *
+ * @return an array of master coprocessors
+ * @see ClusterStatus#getMasterCoprocessors()
+ */
+ @Override
+ public String[] getMasterCoprocessors() throws IOException {
+ throw new FeatureNotSupportedException("does not support yet");
+ }
+
@Override
public CompactionState getCompactionState(TableName tableName) throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public CompactionState getCompactionState(TableName tableName, CompactType compactType) throws IOException {
+ public CompactionState getCompactionState(TableName tableName, CompactType compactType)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -833,32 +953,45 @@ public long getLastMajorCompactionTimestampForRegion(byte[] bytes) throws IOExce
}
@Override
- public void snapshot(String s, TableName tableName) throws IOException, SnapshotCreationException, IllegalArgumentException {
+ public void snapshot(String s, TableName tableName) throws IOException,
+ SnapshotCreationException,
+ IllegalArgumentException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void snapshot(byte[] bytes, TableName tableName) throws IOException, SnapshotCreationException, IllegalArgumentException {
+ public void snapshot(byte[] bytes, TableName tableName) throws IOException,
+ SnapshotCreationException,
+ IllegalArgumentException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void snapshot(String s, TableName tableName, SnapshotType snapshotType) throws IOException, SnapshotCreationException, IllegalArgumentException {
+ public void snapshot(String s, TableName tableName, SnapshotType snapshotType)
+ throws IOException,
+ SnapshotCreationException,
+ IllegalArgumentException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void snapshot(SnapshotDescription snapshotDescription) throws IOException, SnapshotCreationException, IllegalArgumentException {
+ public void snapshot(SnapshotDescription snapshotDescription) throws IOException,
+ SnapshotCreationException,
+ IllegalArgumentException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void snapshotAsync(SnapshotDescription snapshotDescription) throws IOException, SnapshotCreationException {
+ public void snapshotAsync(SnapshotDescription snapshotDescription) throws IOException,
+ SnapshotCreationException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public boolean isSnapshotFinished(SnapshotDescription snapshotDescription) throws IOException, HBaseSnapshotException, UnknownSnapshotException {
+ public boolean isSnapshotFinished(SnapshotDescription snapshotDescription)
+ throws IOException,
+ HBaseSnapshotException,
+ UnknownSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -878,7 +1011,8 @@ public Future restoreSnapshotAsync(String s) throws IOException, RestoreSn
}
@Override
- public void restoreSnapshot(byte[] bytes, boolean b) throws IOException, RestoreSnapshotException {
+ public void restoreSnapshot(byte[] bytes, boolean b) throws IOException,
+ RestoreSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -888,27 +1022,35 @@ public void restoreSnapshot(String s, boolean b) throws IOException, RestoreSnap
}
@Override
- public void restoreSnapshot(String s, boolean b, boolean b1) throws IOException, RestoreSnapshotException {
+ public void restoreSnapshot(String s, boolean b, boolean b1) throws IOException,
+ RestoreSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void cloneSnapshot(byte[] bytes, TableName tableName) throws IOException, TableExistsException, RestoreSnapshotException {
+ public void cloneSnapshot(byte[] bytes, TableName tableName) throws IOException,
+ TableExistsException,
+ RestoreSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void cloneSnapshot(String s, TableName tableName, boolean b) throws IOException, TableExistsException, RestoreSnapshotException {
+ public void cloneSnapshot(String s, TableName tableName, boolean b) throws IOException,
+ TableExistsException,
+ RestoreSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public void cloneSnapshot(String s, TableName tableName) throws IOException, TableExistsException, RestoreSnapshotException {
+ public void cloneSnapshot(String s, TableName tableName) throws IOException,
+ TableExistsException,
+ RestoreSnapshotException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public Future cloneSnapshotAsync(String s, TableName tableName) throws IOException, TableExistsException {
+ public Future cloneSnapshotAsync(String s, TableName tableName) throws IOException,
+ TableExistsException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -918,12 +1060,14 @@ public void execProcedure(String s, String s1, Map map) throws I
}
@Override
- public byte[] execProcedureWithReturn(String s, String s1, Map map) throws IOException {
+ public byte[] execProcedureWithReturn(String s, String s1, Map map)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public boolean isProcedureFinished(String s, String s1, Map map) throws IOException {
+ public boolean isProcedureFinished(String s, String s1, Map map)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -948,7 +1092,8 @@ public List listTableSnapshots(String s, String s1) throws
}
@Override
- public List listTableSnapshots(Pattern pattern, Pattern pattern1) throws IOException {
+ public List listTableSnapshots(Pattern pattern, Pattern pattern1)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -992,11 +1137,6 @@ public QuotaRetriever getQuotaRetriever(QuotaFilter quotaFilter) throws IOExcept
throw new FeatureNotSupportedException("does not support yet");
}
- @Override
- public List getQuota(QuotaFilter quotaFilter) throws IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
@Override
public CoprocessorRpcChannel coprocessorService() {
throw new FeatureNotSupportedException("does not support yet");
@@ -1023,27 +1163,13 @@ public List getSecurityCapabilities() throws IOException {
}
@Override
- public boolean splitSwitch(boolean b, boolean b1) throws IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
- @Override
- public boolean mergeSwitch(boolean b, boolean b1) throws IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
- @Override
- public boolean isSplitEnabled() throws IOException {
+ public boolean[] splitOrMergeEnabledSwitch(boolean enabled, boolean synchronous,
+ MasterSwitchType... switchTypes) throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@Override
- public boolean isMergeEnabled() throws IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
- @Override
- public void addReplicationPeer(String s, ReplicationPeerConfig replicationPeerConfig, boolean b) throws IOException {
+ public boolean splitOrMergeEnabledSwitch(MasterSwitchType switchType) throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -1068,17 +1194,8 @@ public ReplicationPeerConfig getReplicationPeerConfig(String s) throws IOExcepti
}
@Override
- public void updateReplicationPeerConfig(String s, ReplicationPeerConfig replicationPeerConfig) throws IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
- @Override
- public void appendReplicationPeerTableCFs(String s, Map> map) throws ReplicationException, IOException {
- throw new FeatureNotSupportedException("does not support yet");
- }
-
- @Override
- public void removeReplicationPeerTableCFs(String s, Map> map) throws ReplicationException, IOException {
+ public void updateReplicationPeerConfig(String s, ReplicationPeerConfig replicationPeerConfig)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -1088,7 +1205,8 @@ public List listReplicationPeers() throws IOExceptio
}
@Override
- public List listReplicationPeers(Pattern pattern) throws IOException {
+ public List listReplicationPeers(Pattern pattern)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -1103,7 +1221,8 @@ public List listDecommissionedRegionServers() throws IOException {
}
@Override
- public void recommissionRegionServer(ServerName serverName, List list) throws IOException {
+ public void recommissionRegionServer(ServerName serverName, List list)
+ throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
@@ -1123,7 +1242,18 @@ public void disableTableReplication(TableName tableName) throws IOException {
}
@Override
- public void clearCompactionQueues(ServerName serverName, Set set) throws IOException, InterruptedException {
+ public void clearCompactionQueues(ServerName serverName, Set set) throws IOException,
+ InterruptedException {
+ throw new FeatureNotSupportedException("does not support yet");
+ }
+
+ /**
+ * List dead region servers.
+ *
+ * @return List of dead region servers.
+ */
+ @Override
+ public List listDeadServers() throws IOException {
throw new FeatureNotSupportedException("does not support yet");
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
index d6e63d40..69c8e2e7 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java
@@ -19,11 +19,11 @@
import com.alipay.oceanbase.hbase.OHTable;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import java.io.IOException;
@@ -59,7 +59,7 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
private final AtomicLong writeBufferPeriodicFlushTimeoutMs = new AtomicLong(
0);
private final AtomicLong writeBufferPeriodicFlushTimerTickMs = new AtomicLong(
- MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
+ 0);
private Timer writeBufferPeriodicFlushTimer = null;
private final long writeBufferSize;
@@ -96,14 +96,6 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
params.getOperationTimeout() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
.getOperationTimeout() : connectionConfig.getOperationTimeout());
- long newPeriodicFlushTimeoutMs = params.getWriteBufferPeriodicFlushTimeoutMs() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
- .getWriteBufferPeriodicFlushTimeoutMs() : connectionConfig
- .getWriteBufferPeriodicFlushTimeoutMs();
- long newPeriodicFlushTimeIntervalMs = params.getWriteBufferPeriodicFlushTimerTickMs() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
- .getWriteBufferPeriodicFlushTimerTickMs() : connectionConfig
- .getWriteBufferPeriodicFlushTimerTickMs();
- this.setWriteBufferPeriodicFlush(newPeriodicFlushTimeoutMs, newPeriodicFlushTimeIntervalMs);
-
this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
.getWriteBufferSize() : connectionConfig.getWriteBufferSize();
this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
@@ -210,41 +202,6 @@ public void timeTriggerForWriteBufferPeriodicFlush() {
}
}
- /**
- * set time for periodic flush timer
- * @param timeoutMs control when to flush from collecting first mutation
- * @param timerTickMs control time interval to trigger the timer
- * */
- @Override
- public synchronized void setWriteBufferPeriodicFlush(long timeoutMs, long timerTickMs) {
- long originalTimeoutMs = this.writeBufferPeriodicFlushTimeoutMs.get();
- long originalTimeTickMs = this.writeBufferPeriodicFlushTimerTickMs.get();
-
- writeBufferPeriodicFlushTimeoutMs.set(Math.max(0, timeoutMs));
- writeBufferPeriodicFlushTimerTickMs.set(Math.max(
- MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS, timerTickMs));
-
- // if time parameters are updated, stop the old timer
- if (writeBufferPeriodicFlushTimeoutMs.get() != originalTimeoutMs
- || writeBufferPeriodicFlushTimerTickMs.get() != originalTimeTickMs) {
- if (writeBufferPeriodicFlushTimer != null) {
- writeBufferPeriodicFlushTimer.cancel();
- writeBufferPeriodicFlushTimer = null;
- }
- }
-
- if (writeBufferPeriodicFlushTimer == null && writeBufferPeriodicFlushTimeoutMs.get() > 0) {
- writeBufferPeriodicFlushTimer = new Timer(true);
- writeBufferPeriodicFlushTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- OHBufferedMutatorImpl.this.timeTriggerForWriteBufferPeriodicFlush();
- }
- }, this.writeBufferPeriodicFlushTimerTickMs.get(),
- this.writeBufferPeriodicFlushTimerTickMs.get());
- }
- }
-
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
* there is an error, either throw the error, or use the listener to deal with the error.
@@ -275,15 +232,11 @@ private void execute(boolean flushAll) throws IOException {
// if commit all successfully, clean execBuffer
execBuffer.clear();
} catch (Exception ex) {
- LOGGER.error(LCD.convert("01-00026"), ex);
- if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) {
- LOGGER.error(tableName + ": One or more of the operations have failed after retries.");
- RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause();
- // recollect failed mutations
- execBuffer.clear();
- for (int i = 0; i < retryException.getNumExceptions(); ++i) {
- execBuffer.add((Mutation) retryException.getRow(i));
- }
+ // do not recollect error operations, notify outside
+ LOGGER.error("error happens, table name: {}", tableName.getNameAsString(), ex);
+ if (ex instanceof RetriesExhaustedWithDetailsException) {
+ LOGGER.error("TableName: {}, One or more of the operations have failed after retries.", tableName.getNameAsString(), ex);
+ RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex;
if (listener != null) {
listener.onException(retryException, this);
} else {
@@ -293,31 +246,16 @@ private void execute(boolean flushAll) throws IOException {
LOGGER.error("Errors unrelated to operations occur during mutation operation", ex);
throw ex;
}
- } finally {
- for (Mutation mutation : execBuffer) {
- long size = mutation.heapSize();
- currentAsyncBufferSize.addAndGet(size);
- asyncWriteBuffer.add(mutation);
- undealtMutationCount.incrementAndGet();
- }
}
}
- /**
- * reset the time parameters and cancel the timer (if exists)
- * */
- @Override
- public void disableWriteBufferPeriodicFlush() {
- setWriteBufferPeriodicFlush(0, MIN_WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS);
- }
-
@Override
public void close() throws IOException {
if (closed) {
return;
}
// reset timeout, timeTick and Timer
- disableWriteBufferPeriodicFlush();
+ // disableWriteBufferPeriodicFlush();
try {
execute(true);
} finally {
@@ -347,16 +285,6 @@ public void flush() throws IOException {
execute(true);
}
- @Override
- public long getWriteBufferPeriodicFlushTimeoutMs() {
- return writeBufferPeriodicFlushTimeoutMs.get();
- }
-
- @Override
- public long getWriteBufferPeriodicFlushTimerTickMs() {
- return writeBufferPeriodicFlushTimerTickMs.get();
- }
-
@Override
public long getWriteBufferSize() {
return this.writeBufferSize;
@@ -374,9 +302,6 @@ public void setOperationTimeout(int operationTimeout) {
this.ohTable.setOperationTimeout(operationTimeout);
}
- /**
- * Count the mutations which haven't been processed.
- */
@VisibleForTesting
public int size() {
return undealtMutationCount.get();
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java
index 3c966a18..0ab83451 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java
@@ -25,10 +25,6 @@
import java.util.Properties;
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
-import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
-import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
-import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS;
-import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;
@@ -55,8 +51,6 @@ public class OHConnectionConfiguration {
private final int readRpcTimeout;
private final int writeRpcTimeout;
private final int rpcConnectTimeout;
- private final long writeBufferPeriodicFlushTimeoutMs;
- private final long writeBufferPeriodicFlushTimerTickMs;
private final int numRetries;
public OHConnectionConfiguration(Configuration conf) {
@@ -84,11 +78,6 @@ public OHConnectionConfiguration(Configuration conf) {
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- this.writeBufferPeriodicFlushTimeoutMs = conf.getLong(
- WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS, WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT);
- this.writeBufferPeriodicFlushTimerTickMs = conf.getLong(
- WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS,
- WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT);
int rpcConnectTimeout = -1;
if (conf.get(SOCKET_TIMEOUT_CONNECT) != null) {
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
@@ -102,7 +91,7 @@ public OHConnectionConfiguration(Configuration conf) {
}
this.rpcConnectTimeout = rpcConnectTimeout;
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.scannerCaching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
Integer.MAX_VALUE);
this.scannerMaxResultSize = conf.getLong(
@@ -205,14 +194,6 @@ public String getDatabase() {
return this.database;
}
- public long getWriteBufferPeriodicFlushTimeoutMs() {
- return this.writeBufferPeriodicFlushTimeoutMs;
- }
-
- public long getWriteBufferPeriodicFlushTimerTickMs() {
- return this.writeBufferPeriodicFlushTimerTickMs;
- }
-
public int getNumRetries() {
return this.numRetries;
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
index fd666159..811feca8 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java
@@ -144,8 +144,10 @@ public RegionLocator getRegionLocator(TableName tableName) throws IOException {
// need to use new connection configuration
// to avoid change the database in original param url by namespace in tableName
OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf);
- ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf);
- OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableName.toString(), obTableClient);
+ ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClientByTableName(
+ tableName, connectionConf);
+ OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableName.toString(),
+ obTableClient);
return executor.getRegionLocator(String.valueOf(tableName));
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHCreateTableExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHCreateTableExecutor.java
index a56e8097..4bab8e3b 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHCreateTableExecutor.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHCreateTableExecutor.java
@@ -17,7 +17,7 @@
package com.alipay.oceanbase.hbase.util;
-import com.alibaba.fastjson.JSON;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
@@ -53,7 +53,7 @@ public void createTable(TableDescriptor tableDescriptor, byte[][] splitKeys) thr
final ObTableMetaRequest request = new ObTableMetaRequest();
request.setMetaType(getMetaType());
Map requestData = new HashMap<>();
- requestData.put("htable_name", tableDescriptor.getTableName().getName());
+ requestData.put("htable_name", tableDescriptor.getTableName().getNameAsString());
Map> columnFamilies = new HashMap<>();
for (ColumnFamilyDescriptor columnDescriptor : tableDescriptor.getColumnFamilies()) {
Map columnFamily = new HashMap<>();
@@ -62,7 +62,8 @@ public void createTable(TableDescriptor tableDescriptor, byte[][] splitKeys) thr
columnFamilies.put(columnDescriptor.getNameAsString(), columnFamily);
}
requestData.put("column_families", columnFamilies);
- String jsonData = JSON.toJSONString(requestData);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonData = objectMapper.writeValueAsString(requestData);
request.setData(jsonData);
execute(client, request);
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java
index 47280128..1f689596 100644
--- a/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java
@@ -1,7 +1,23 @@
+/*-
+ * #%L
+ * com.oceanbase:obkv-hbase-client
+ * %%
+ * Copyright (C) 2022 - 2025 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
package com.alipay.oceanbase.hbase.util;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.annotation.JSONField;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
@@ -24,7 +40,6 @@ public ObTableRpcMetaType getMetaType() {
return ObTableRpcMetaType.HTABLE_DELETE_TABLE;
}
-
@Override
public Void parse(ObTableMetaResponse response) throws IOException {
// do nothing, error will be thrown from table
@@ -36,7 +51,8 @@ public Void deleteTable(String tableName) throws IOException {
request.setMetaType(getMetaType());
Map requestDataMap = new HashMap<>();
requestDataMap.put("table_name", tableName);
- String jsonData = JSON.toJSONString(requestDataMap);
+ ObjectMapper objectMapper = new ObjectMapper();
+ String jsonData = objectMapper.writeValueAsString(requestDataMap);
request.setData(jsonData);
return execute(tableClient, request);
}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoad.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoad.java
new file mode 100644
index 00000000..697d9646
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoad.java
@@ -0,0 +1,63 @@
+/*-
+ * #%L
+ * com.oceanbase:obkv-hbase-client
+ * %%
+ * Copyright (C) 2022 - 2025 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import org.apache.hadoop.hbase.RegionLoad;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+
+public class OHRegionLoad extends RegionLoad {
+ private final byte[] name; // tablet_name, id in String
+ private int storeFileSize; // tablet storage used in ssTable
+ private int memStoreSize; // tablet storage used in memTable
+
+ public OHRegionLoad(byte[] name, int storeFileSize, int memStoreSize) {
+ super(null);
+ this.name = name;
+ this.storeFileSize = storeFileSize;
+ this.memStoreSize = memStoreSize;
+ }
+
+ @Override
+ public byte[] getName() {
+ return name;
+ }
+
+ /**
+ * @return the number of stores
+ */
+ @Override
+ public int getStores() {
+ return 1;
+ }
+
+ /**
+ * @return the number of storefiles
+ */
+ @Override
+ public int getStorefiles() {
+ return 1;
+ }
+
+ /**
+ * @return the total size of the storefiles, in MB
+ */
+ @Override
+ public int getStorefileSizeMB() {
+ return memStoreSize + storeFileSize / 1024 / 1024;
+ }
+}
diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoadExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoadExecutor.java
new file mode 100644
index 00000000..c8d7b0d5
--- /dev/null
+++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLoadExecutor.java
@@ -0,0 +1,99 @@
+/*-
+ * #%L
+ * com.oceanbase:obkv-hbase-client
+ * %%
+ * Copyright (C) 2022 - 2025 OceanBase Group
+ * %%
+ * OBKV HBase Client Framework is licensed under Mulan PSL v2.
+ * You can use this software according to the terms and conditions of the Mulan PSL v2.
+ * You may obtain a copy of Mulan PSL v2 at:
+ * http://license.coscl.org.cn/MulanPSL2
+ * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
+ * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
+ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
+ * See the Mulan PSL v2 for more details.
+ * #L%
+ */
+
+package com.alipay.oceanbase.hbase.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor;
+import com.alipay.oceanbase.rpc.ObTableClient;
+import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
+import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
+import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hbase.RegionLoad;
+
+import java.io.IOException;
+import java.util.*;
+
+public class OHRegionLoadExecutor extends AbstractObTableMetaExecutor