diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index f66dcd8c..345fd894 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -220,8 +220,7 @@ public OHTable(Configuration configuration, String tableName) throws IOException DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME); this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime); OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + int numRetries = ohConnectionConf.getNumRetries(); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, ohConnectionConf)); this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); @@ -273,8 +272,7 @@ public OHTable(Configuration configuration, final byte[] tableName, this.executePool = executePool; this.cleanupPoolOnClose = false; OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + int numRetries = ohConnectionConf.getNumRetries(); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, ohConnectionConf)); this.obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); @@ -345,8 +343,7 @@ public OHTable(TableName tableName, Connection connection, DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = connectionConfig.getWriteBufferSize(); this.tableName = tableName.getName(); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + int numRetries = connectionConfig.getNumRetries(); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, connectionConfig)); this.obTableClient.setRpcExecuteTimeout(rpcTimeout); @@ -389,8 +386,7 @@ public OHTable(Connection connection, ObTableBuilderBase builder, this.putWriteBufferCheck = this.configuration.getInt(HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK, DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK); this.writeBufferSize = connectionConfig.getWriteBufferSize(); - int numRetries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + int numRetries = connectionConfig.getNumRetries(); this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace( this.tableNameString, connectionConfig)); this.obTableClient.setRpcExecuteTimeout(rpcTimeout); @@ -462,7 +458,7 @@ private void finishSetUp() { WRITE_BUFFER_SIZE_DEFAULT); } - private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, + public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString, OHConnectionConfiguration ohConnectionConf) throws IllegalArgumentException { if (tableNameString.indexOf(':') != -1) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/execute/AbstractObTableMetaExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/execute/AbstractObTableMetaExecutor.java new file mode 100644 index 00000000..d6d99da1 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/execute/AbstractObTableMetaExecutor.java @@ -0,0 +1,31 @@ +package com.alipay.oceanbase.hbase.execute; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.table.ObTable; + +import java.io.IOException; + +public abstract class AbstractObTableMetaExecutor implements ObTableMetaExecutor { + + @Override + public T execute(ObTableClient client, ObTableMetaRequest request) throws IOException { + if (request.getMetaType() != getMetaType()) { + throw new IOException("Invalid meta type, expected " + getMetaType()); + } + ObTable table = client.getRandomTable(); + ObTableMetaResponse response; + try { + response = (ObTableMetaResponse) client.executeWithRetry( + table, + request, + null /*tableName*/ + ); + } catch (Exception e) { + throw new IOException("Failed to execute request", e); + } + return parse(response); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/execute/ObTableMetaExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/execute/ObTableMetaExecutor.java new file mode 100644 index 00000000..4e8a0ffb --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/execute/ObTableMetaExecutor.java @@ -0,0 +1,32 @@ +package com.alipay.oceanbase.hbase.execute; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType; + +import java.io.IOException; + +public interface ObTableMetaExecutor { + /** + * 执行元数据请求 + * @param request 元数据请求 + * @return 解析后的元数据对象 + * @throws IOException 如果执行失败或解析失败 + */ + T execute(ObTableClient client, ObTableMetaRequest request) throws IOException; + + /** + * 解析元数据响应, 用户需要重写 + * @param response 元数据响应 + * @return 解析后的元数据对象 + * @throws IOException 如果解析失败 + */ + T parse(ObTableMetaResponse response) throws IOException; + + /** + * 获取元信息类型, 用户需要重写 + * @return 元信息类型 + */ + ObTableRpcMetaType getMetaType() throws IOException; +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java new file mode 100644 index 00000000..c2a41d57 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHAdmin.java @@ -0,0 +1,1057 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.bolt.transport.TransportCodes; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.exception.ObTableTransportException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.*; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.replication.TableCFs; +import org.apache.hadoop.hbase.client.security.SecurityCapability; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.quotas.QuotaFilter; +import org.apache.hadoop.hbase.quotas.QuotaRetriever; +import org.apache.hadoop.hbase.quotas.QuotaSettings; +import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; +import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException; +import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException; +import org.apache.hadoop.hbase.snapshot.SnapshotCreationException; +import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.Future; +import java.util.regex.Pattern; + +public class OHAdmin implements Admin { + private boolean aborted = false; + private final OHConnectionImpl connection; + private final Configuration conf; + OHAdmin(OHConnectionImpl connection) { + this.connection = connection; + this.conf = connection.getConfiguration(); + } + + @Override + public int getOperationTimeout() { + return connection.getOHConnectionConfiguration().getOperationTimeout(); + } + + @Override + public void abort(String msg, Throwable t) { + // do nothing, just throw the message and exception + this.aborted = true; + throw new RuntimeException(msg, t); + } + + @Override + public boolean isAborted() { + return this.aborted; + } + + @Override + public Connection getConnection() { + return this.connection; + } + + @Override + public boolean tableExists(TableName tableName) throws IOException { + OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf); + ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf); + OHTableExistsExecutor executor = new OHTableExistsExecutor(tableClient); + return executor.tableExists(tableName.getNameAsString()); + } + + @Override + public HTableDescriptor[] listTables() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableDescriptors() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] listTables(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableDescriptors(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] listTables(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] listTables(Pattern pattern, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableDescriptors(Pattern pattern, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] listTables(String s, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNames() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNames(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNames(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNames(Pattern pattern, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNames(String s, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor getTableDescriptor(TableName tableName) throws TableNotFoundException, IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableDescriptor getDescriptor(TableName tableName) throws TableNotFoundException, IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void createTable(TableDescriptor tableDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void createTable(TableDescriptor tableDescriptor, byte[] bytes, byte[] bytes1, int i) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void createTable(TableDescriptor tableDescriptor, byte[][] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future createTableAsync(TableDescriptor tableDescriptor, byte[][] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteTable(TableName tableName) throws IOException { + OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf); + ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf); + OHDeleteTableExecutor executor = new OHDeleteTableExecutor(tableClient); + try { + executor.deleteTable(tableName.getNameAsString()); + } catch (IOException e) { + if (e.getCause() instanceof ObTableTransportException + && ((ObTableTransportException) e.getCause()).getErrorCode() == TransportCodes.BOLT_TIMEOUT) { + throw new TimeoutIOException(e.getCause()); + } else { + throw e; + } + } + } + + @Override + public Future deleteTableAsync(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] deleteTables(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] deleteTables(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void truncateTable(TableName tableName, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future truncateTableAsync(TableName tableName, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void enableTable(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future enableTableAsync(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] enableTables(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] enableTables(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future disableTableAsync(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void disableTable(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] disableTables(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] disableTables(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isTableEnabled(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isTableDisabled(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isTableAvailable(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isTableAvailable(TableName tableName, byte[][] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Pair getAlterStatus(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Pair getAlterStatus(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamilyDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future addColumnFamilyAsync(TableName tableName, ColumnFamilyDescriptor columnFamilyDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteColumn(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteColumnFamily(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future deleteColumnFamilyAsync(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void modifyColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamilyDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future modifyColumnFamilyAsync(TableName tableName, ColumnFamilyDescriptor columnFamilyDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void closeRegion(String s, String s1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void closeRegion(byte[] bytes, String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean closeRegionWithEncodedRegionName(String s, String s1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void closeRegion(ServerName serverName, HRegionInfo hRegionInfo) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getOnlineRegions(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getRegions(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void flush(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void flushRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void flushRegionServer(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compact(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compactRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compact(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compactRegion(byte[] bytes, byte[] bytes1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compact(TableName tableName, CompactType compactType) throws IOException, InterruptedException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compact(TableName tableName, byte[] bytes, CompactType compactType) throws IOException, InterruptedException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompact(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompactRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompact(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompactRegion(byte[] bytes, byte[] bytes1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompact(TableName tableName, CompactType compactType) throws IOException, InterruptedException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompact(TableName tableName, byte[] bytes, CompactType compactType) throws IOException, InterruptedException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void compactRegionServer(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void majorCompactRegionServer(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void move(byte[] bytes, byte[] bytes1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void assign(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void unassign(byte[] bytes, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void offline(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean balancerSwitch(boolean b, boolean b1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean balance() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean balance(boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isBalancerEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CacheEvictionStats clearBlockCache(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean normalize() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isNormalizerEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean normalizerSwitch(boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean catalogJanitorSwitch(boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public int runCatalogJanitor() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isCatalogJanitorEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean cleanerChoreSwitch(boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean runCleanerChore() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isCleanerChoreEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void mergeRegions(byte[] bytes, byte[] bytes1, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future mergeRegionsAsync(byte[] bytes, byte[] bytes1, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future mergeRegionsAsync(byte[][] bytes, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void split(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void splitRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void split(TableName tableName, byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void splitRegion(byte[] bytes, byte[] bytes1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future splitRegionAsync(byte[] bytes, byte[] bytes1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void modifyTable(TableName tableName, TableDescriptor tableDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void modifyTable(TableDescriptor tableDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future modifyTableAsync(TableName tableName, TableDescriptor tableDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future modifyTableAsync(TableDescriptor tableDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void shutdown() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void stopMaster() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isMasterInMaintenanceMode() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void stopRegionServer(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public ClusterMetrics getClusterMetrics(EnumSet enumSet) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getRegionMetrics(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getRegionMetrics(ServerName serverName, TableName tableName) throws IOException { + if (tableName == null) { + throw new FeatureNotSupportedException("does not support tableName is null"); + } + OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf); + ObTableClient tableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf); + OHRegionMetricsExecutor executor = new OHRegionMetricsExecutor(tableClient); + return executor.getRegionMetrics(tableName.getNameAsString()); + } + + @Override + public Configuration getConfiguration() { + return connection.getConfiguration(); + } + + @Override + public void createNamespace(NamespaceDescriptor namespaceDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future createNamespaceAsync(NamespaceDescriptor namespaceDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void modifyNamespace(NamespaceDescriptor namespaceDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future modifyNamespaceAsync(NamespaceDescriptor namespaceDescriptor) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteNamespace(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future deleteNamespaceAsync(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public NamespaceDescriptor getNamespaceDescriptor(String s) throws NamespaceNotFoundException, IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public NamespaceDescriptor[] listNamespaceDescriptors() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] listTableDescriptorsByNamespace(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableDescriptorsByNamespace(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public TableName[] listTableNamesByNamespace(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getTableRegions(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getRegions(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public synchronized void close() throws IOException { + } + + @Override + public HTableDescriptor[] getTableDescriptorsByTableName(List list) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableDescriptors(List list) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public HTableDescriptor[] getTableDescriptors(List list) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean abortProcedure(long l, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future abortProcedureAsync(long l, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public String getProcedures() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public String getLocks() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void rollWALWriter(ServerName serverName) throws IOException, FailedLogCloseException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CompactionState getCompactionState(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CompactionState getCompactionState(TableName tableName, CompactType compactType) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CompactionState getCompactionStateForRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getLastMajorCompactionTimestamp(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getLastMajorCompactionTimestampForRegion(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void snapshot(String s, TableName tableName) throws IOException, SnapshotCreationException, IllegalArgumentException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void snapshot(byte[] bytes, TableName tableName) throws IOException, SnapshotCreationException, IllegalArgumentException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void snapshot(String s, TableName tableName, SnapshotType snapshotType) throws IOException, SnapshotCreationException, IllegalArgumentException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void snapshot(SnapshotDescription snapshotDescription) throws IOException, SnapshotCreationException, IllegalArgumentException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void snapshotAsync(SnapshotDescription snapshotDescription) throws IOException, SnapshotCreationException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isSnapshotFinished(SnapshotDescription snapshotDescription) throws IOException, HBaseSnapshotException, UnknownSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void restoreSnapshot(byte[] bytes) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void restoreSnapshot(String s) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future restoreSnapshotAsync(String s) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void restoreSnapshot(byte[] bytes, boolean b) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void restoreSnapshot(String s, boolean b) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void restoreSnapshot(String s, boolean b, boolean b1) throws IOException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void cloneSnapshot(byte[] bytes, TableName tableName) throws IOException, TableExistsException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void cloneSnapshot(String s, TableName tableName, boolean b) throws IOException, TableExistsException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void cloneSnapshot(String s, TableName tableName) throws IOException, TableExistsException, RestoreSnapshotException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Future cloneSnapshotAsync(String s, TableName tableName) throws IOException, TableExistsException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void execProcedure(String s, String s1, Map map) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public byte[] execProcedureWithReturn(String s, String s1, Map map) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isProcedureFinished(String s, String s1, Map map) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listSnapshots() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listSnapshots(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listSnapshots(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableSnapshots(String s, String s1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listTableSnapshots(Pattern pattern, Pattern pattern1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteSnapshot(byte[] bytes) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteSnapshot(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteSnapshots(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteSnapshots(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteTableSnapshots(String s, String s1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void deleteTableSnapshots(Pattern pattern, Pattern pattern1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void setQuota(QuotaSettings quotaSettings) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public QuotaRetriever getQuotaRetriever(QuotaFilter quotaFilter) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getQuota(QuotaFilter quotaFilter) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CoprocessorRpcChannel coprocessorService() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public CoprocessorRpcChannel coprocessorService(ServerName serverName) { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void updateConfiguration(ServerName serverName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void updateConfiguration() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List getSecurityCapabilities() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean splitSwitch(boolean b, boolean b1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean mergeSwitch(boolean b, boolean b1) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isSplitEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public boolean isMergeEnabled() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void addReplicationPeer(String s, ReplicationPeerConfig replicationPeerConfig, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void removeReplicationPeer(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void enableReplicationPeer(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void disableReplicationPeer(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public ReplicationPeerConfig getReplicationPeerConfig(String s) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void updateReplicationPeerConfig(String s, ReplicationPeerConfig replicationPeerConfig) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void appendReplicationPeerTableCFs(String s, Map> map) throws ReplicationException, IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void removeReplicationPeerTableCFs(String s, Map> map) throws ReplicationException, IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listReplicationPeers() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listReplicationPeers(Pattern pattern) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void decommissionRegionServers(List list, boolean b) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listDecommissionedRegionServers() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void recommissionRegionServer(ServerName serverName, List list) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List listReplicatedTableCFs() throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void enableTableReplication(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void disableTableReplication(TableName tableName) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public void clearCompactionQueues(ServerName serverName, Set set) throws IOException, InterruptedException { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public List clearDeadServers(List list) throws IOException { + throw new FeatureNotSupportedException("does not support yet"); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java index 9b98dd0b..3c966a18 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionConfiguration.java @@ -57,6 +57,7 @@ public class OHConnectionConfiguration { private final int rpcConnectTimeout; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; + private final int numRetries; public OHConnectionConfiguration(Configuration conf) { this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL); @@ -100,6 +101,8 @@ public OHConnectionConfiguration(Configuration conf) { } } this.rpcConnectTimeout = rpcConnectTimeout; + this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, + HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.scannerCaching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, Integer.MAX_VALUE); this.scannerMaxResultSize = conf.getLong( @@ -209,4 +212,8 @@ public long getWriteBufferPeriodicFlushTimeoutMs() { public long getWriteBufferPeriodicFlushTimerTickMs() { return this.writeBufferPeriodicFlushTimerTickMs; } + + public int getNumRetries() { + return this.numRetries; + } } diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java index 41a0690a..d134ca8d 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java @@ -19,6 +19,7 @@ import com.alipay.oceanbase.hbase.OHTable; import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.ObTableClient; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; @@ -140,12 +141,17 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I @Override public RegionLocator getRegionLocator(TableName tableName) throws IOException { - throw new FeatureNotSupportedException("not supported yet'"); + // need to use new connection configuration + // to avoid change the database in original param url by namespace in tableName + OHConnectionConfiguration connectionConf = new OHConnectionConfiguration(conf); + ObTableClient obTableClient = ObTableClientManager.getOrCreateObTableClientByTableName(tableName, connectionConf); + OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(obTableClient); + return executor.getRegionLocator(String.valueOf(tableName)); } @Override public Admin getAdmin() throws IOException { - throw new FeatureNotSupportedException("not supported yet'"); + return new OHAdmin(this); } private void shutdownBatchPool(ExecutorService pool) { diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java new file mode 100644 index 00000000..47280128 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHDeleteTableExecutor.java @@ -0,0 +1,43 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.annotation.JSONField; +import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class OHDeleteTableExecutor extends AbstractObTableMetaExecutor { + private final ObTableClient tableClient; + + OHDeleteTableExecutor(ObTableClient tableClient) { + this.tableClient = tableClient; + } + + @Override + public ObTableRpcMetaType getMetaType() { + return ObTableRpcMetaType.HTABLE_DELETE_TABLE; + } + + + @Override + public Void parse(ObTableMetaResponse response) throws IOException { + // do nothing, error will be thrown from table + return null; + } + + public Void deleteTable(String tableName) throws IOException { + ObTableMetaRequest request = new ObTableMetaRequest(); + request.setMetaType(getMetaType()); + Map requestDataMap = new HashMap<>(); + requestDataMap.put("table_name", tableName); + String jsonData = JSON.toJSONString(requestDataMap); + request.setData(jsonData); + return execute(tableClient, request); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocator.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocator.java new file mode 100644 index 00000000..f4474d92 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocator.java @@ -0,0 +1,84 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alipay.oceanbase.rpc.ObTableClient; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.util.Pair; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +public class OHRegionLocator implements RegionLocator { + public OHRegionLocator(byte[][] startKeys, byte[][] endKeys) { + + } + + @Override + public HRegionLocation getRegionLocation(byte[] bytes) throws IOException { + return null; + } + + @Override + public HRegionLocation getRegionLocation(byte[] bytes, boolean b) throws IOException { + return null; + } + + @Override + public List getAllRegionLocations() throws IOException { + return Collections.emptyList(); + } + + /** + * Gets the starting row key for every region in the currently open table. + *

+ * This is mainly useful for the MapReduce integration. + * + * @return Array of region starting row keys + * @throws IOException if a remote or network exception occurs + */ + @Override + public byte[][] getStartKeys() throws IOException { + return null; + } + + /** + * Gets the ending row key for every region in the currently open table. + *

+ * This is mainly useful for the MapReduce integration. + * + * @return Array of region ending row keys + * @throws IOException if a remote or network exception occurs + */ + @Override + public byte[][] getEndKeys() throws IOException { + return null; + } + + /** + * Gets the starting and ending row keys for every region in the currently + * open table. + *

+ * This is mainly useful for the MapReduce integration. + * + * @return Pair of arrays of region starting and ending row keys + * @throws IOException if a remote or network exception occurs + */ + @Override + public Pair getStartEndKeys() throws IOException { + return null; + } + + @Override + public TableName getName() { + return null; + } + + private ObTableClient tableClient; + + @Override + public void close() throws IOException { + + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocatorExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocatorExecutor.java new file mode 100644 index 00000000..0f896827 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocatorExecutor.java @@ -0,0 +1,49 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alibaba.fastjson.JSON; +import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.location.model.TableEntry; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class OHRegionLocatorExecutor extends AbstractObTableMetaExecutor { + OHRegionLocatorExecutor(ObTableClient client) { + this.client = client; + } + + @Override + public ObTableRpcMetaType getMetaType() { + return ObTableRpcMetaType.HTABLE_REGION_LOCATOR; + } + + @Override + public OHRegionLocator parse(ObTableMetaResponse response) throws IOException { + try { + String jsonData = response.getData(); + // process json + return new OHRegionLocator(null, null); + } catch (IllegalArgumentException e) { + throw new IOException("msg", e); + } + } + + public OHRegionLocator getRegionLocator(String tableName) throws IOException { + ObTableMetaRequest request = new ObTableMetaRequest(); + request.setMetaType(getMetaType()); + Map requestData = new HashMap<>(); + requestData.put("table_name", tableName); + String jsonData = JSON.toJSONString(requestData); + request.setData(jsonData); + + return execute(client, request); + } + + private final ObTableClient client; +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetrics.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetrics.java new file mode 100644 index 00000000..8f161575 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetrics.java @@ -0,0 +1,121 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.Size; + +import java.util.Collections; +import java.util.Map; + +public class OHRegionMetrics implements RegionMetrics { + private final String tablegroup; + private final byte[] name; // tablet_name, id in String + private final Size storeFileSize; // tablet storage used in ssTable + private final Size memStoreSize; // tablet storage used in memTable + + OHRegionMetrics(String tablegroup, byte[] name, Size storeFileSize, Size memStoreSize) { + this.tablegroup = tablegroup; + this.name = name; + this.storeFileSize = storeFileSize; + this.memStoreSize = memStoreSize; + } + + public String getTablegroup() { + return tablegroup; + } + + @Override + public byte[] getRegionName() { + return name; + } + + @Override + public int getStoreCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public int getStoreFileCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getStoreFileSize() { + return storeFileSize; + } + + @Override + public Size getMemStoreSize() { + return memStoreSize; + } + + @Override + public long getReadRequestCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getWriteRequestCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getFilteredReadRequestCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getStoreFileIndexSize() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getStoreFileRootLevelIndexSize() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getStoreFileUncompressedDataIndexSize() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getBloomFilterSize() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getCompactingCellCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getCompactedCellCount() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getCompletedSequenceId() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Map getStoreSequenceId() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public Size getUncompressedStoreFileSize() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public float getDataLocality() { + throw new FeatureNotSupportedException("does not support yet"); + } + + @Override + public long getLastMajorCompactionTimestamp() { + throw new FeatureNotSupportedException("does not support yet"); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetricsExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetricsExecutor.java new file mode 100644 index 00000000..430f568f --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHRegionMetricsExecutor.java @@ -0,0 +1,71 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.Size; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OHRegionMetricsExecutor extends AbstractObTableMetaExecutor> { + private final ObTableClient tableClient; + OHRegionMetricsExecutor(ObTableClient tableClient) { + this.tableClient = tableClient; + } + @Override + public ObTableRpcMetaType getMetaType() throws IOException { + return ObTableRpcMetaType.HTABLE_REGION_METRICS; + } + + /* + * { + tableName: "tablegroup_name", + regionList:{ + "regions": [200051, 200052, 200053, 200191, 200192, 200193, ...], + "memTableSize":[123, 321, 321, 123, 321, 321, ...], + "ssTableSize":[5122, 4111, 5661, 5122, 4111, 5661, ...] + } + } + * */ + @Override + public List parse(ObTableMetaResponse response) throws IOException { + List metricsList = new ArrayList<>(); + JSONObject metrcisJSONObject = JSON.parseObject(response.getData()); + String tableGroupName = metrcisJSONObject.getString("tableName"); + JSONObject regionList = metrcisJSONObject.getJSONObject("regionList"); + List regions = regionList.getJSONArray("regions").toJavaList(Integer.class); + List memTableSizeList = regionList.getJSONArray("memTableSize").toJavaList(Integer.class); + List ssTableSizeList = regionList.getJSONArray("ssTableSize").toJavaList(Integer.class); + if (regions.isEmpty() || regions.size() != memTableSizeList.size() || memTableSizeList.size() != ssTableSizeList.size()) { + throw new IOException("size length has to be the same"); + } + for (int i = 0; i < regions.size(); ++i) { + String name_str = Integer.toString(regions.get(i)); + byte[] name = name_str.getBytes(); + Size storeFileSize = new Size(((double) ssTableSizeList.get(i)) / (1024 * 1024) , Size.Unit.MEGABYTE); // The unit in original HBase is MEGABYTE, for us it is BYTE + Size memStoreSize = new Size(((double) memTableSizeList.get(i)) / (1024 * 1024), Size.Unit.MEGABYTE); // The unit in original HBase is MEGABYTE, for us it is BYTE + OHRegionMetrics ohRegionMetrics = new OHRegionMetrics(tableGroupName, name, storeFileSize, memStoreSize); + metricsList.add(ohRegionMetrics); + } + return metricsList; + } + + public List getRegionMetrics(String tableName) throws IOException { + ObTableMetaRequest request = new ObTableMetaRequest(); + request.setMetaType(getMetaType()); + Map requestData = new HashMap<>(); + requestData.put("table_name", tableName); + String jsonData = JSON.toJSONString(requestData); + request.setData(jsonData); + return execute(tableClient, request); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHTableExistsExecutor.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableExistsExecutor.java new file mode 100644 index 00000000..7b827226 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHTableExistsExecutor.java @@ -0,0 +1,43 @@ +package com.alipay.oceanbase.hbase.util; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.alipay.oceanbase.hbase.execute.AbstractObTableMetaExecutor; +import com.alipay.oceanbase.rpc.ObTableClient; +import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest; +import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse; +import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class OHTableExistsExecutor extends AbstractObTableMetaExecutor { + private final ObTableClient tableClient; + + OHTableExistsExecutor(ObTableClient tableClient) { + this.tableClient = tableClient; + } + + @Override + public ObTableRpcMetaType getMetaType() throws IOException { + return ObTableRpcMetaType.HTABLE_EXISTS; + } + + @Override + public Boolean parse(ObTableMetaResponse response) throws IOException { + String jsonData = response.getData(); + JSONObject object = JSONObject.parseObject(jsonData); + return object.getBoolean("exists"); + } + + public Boolean tableExists(String tableName) throws IOException { + ObTableMetaRequest request = new ObTableMetaRequest(); + request.setMetaType(getMetaType()); + Map requestData = new HashMap<>(); + requestData.put("table_name", tableName); + String jsonData = JSON.toJSONString(requestData); + request.setData(jsonData); + return execute(tableClient, request); + } +} diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java index 5db940aa..e0d50cea 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java @@ -19,8 +19,10 @@ import com.alipay.oceanbase.rpc.ObTableClient; import com.alipay.oceanbase.rpc.constant.Constants; +import com.alipay.oceanbase.hbase.OHTable; import com.google.common.base.Objects; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.TableName; import java.io.IOException; import java.util.Map; @@ -126,6 +128,22 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey); } + public static ObTableClient getOrCreateObTableClientByTableName(TableName tableName, OHConnectionConfiguration connectionConfig) throws IllegalArgumentException, + IOException { + String tableNameString = tableName.getNameAsString(); + ObTableClient obTableClient = getOrCreateObTableClient( + OHTable.setUserDefinedNamespace(tableNameString, connectionConfig)); + ObTableClientManager.initTimeoutAndRetryTimes(obTableClient, connectionConfig); + return obTableClient; + } + + private static void initTimeoutAndRetryTimes(ObTableClient obTableClient, OHConnectionConfiguration ohConnectionConf) { + obTableClient.setRpcExecuteTimeout(ohConnectionConf.getRpcTimeout()); + obTableClient.setRuntimeRetryTimes(ohConnectionConf.getNumRetries()); + obTableClient.setRuntimeMaxWait(ohConnectionConf.getOperationTimeout()); + obTableClient.setRuntimeBatchMaxWait(ohConnectionConf.getOperationTimeout()); + } + public static class ObTableClientKey { private String paramUrl; private String fullUserName; diff --git a/src/test/java/com/alipay/oceanbase/hbase/OHTableAdminInterfaceTest.java b/src/test/java/com/alipay/oceanbase/hbase/OHTableAdminInterfaceTest.java index 6d53616b..a499d73b 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/OHTableAdminInterfaceTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/OHTableAdminInterfaceTest.java @@ -17,16 +17,34 @@ package com.alipay.oceanbase.hbase; +import com.alipay.oceanbase.hbase.util.OHRegionMetrics; import com.alipay.oceanbase.hbase.util.ObHTableTestUtil; +import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException; +import com.alipay.oceanbase.rpc.exception.ObTableException; +import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.RegionMetrics; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import static com.alipay.oceanbase.hbase.constants.OHConstants.HBASE_HTABLE_TEST_LOAD_ENABLE; +import static org.apache.hadoop.hbase.util.Bytes.toBytes; +import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; public class OHTableAdminInterfaceTest { public OHTablePool setUpLoadPool() throws IOException { @@ -249,4 +267,309 @@ public void testGetStartEndKeysOHTablePoolLoadNon() throws Exception { Assert.assertEquals(0, startEndKeys.getFirst()[0].length); Assert.assertEquals(0, startEndKeys.getSecond()[0].length); } + + @Test + public void testAdminGetRegionMetrics() throws Exception { + java.sql.Connection conn = ObHTableTestUtil.getConnection(); + Statement st = conn.createStatement(); + st.execute("CREATE TABLEGROUP IF NOT EXISTS test_multi_cf SHARDING = 'ADAPTIVE';\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group2` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group3` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE DATABASE IF NOT EXISTS `n1`;\n" + + "use `n1`;\n" + + "CREATE TABLEGROUP IF NOT EXISTS `n1:test_multi_cf` SHARDING = 'ADAPTIVE';\n" + + "CREATE TABLE IF NOT EXISTS `n1:test_multi_cf$family_with_group1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test_multi_cf` PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "CREATE TABLE IF NOT EXISTS `n1:test_multi_cf$family_with_group2` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test_multi_cf` PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "CREATE TABLE IF NOT EXISTS `n1:test_multi_cf$family_with_group3` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test_multi_cf` PARTITION BY KEY(`K`) PARTITIONS 3;"); + st.close(); + conn.close(); + String tablegroup1 = "test_multi_cf"; + String tablegroup2 = "n1:test_multi_cf"; + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + IOException thrown = assertThrows(IOException.class, + () -> { + admin.getRegionMetrics(null, TableName.valueOf("tablegroup_not_exists")); + }); + Assert.assertTrue(thrown.getCause() instanceof ObTableException); + Assert.assertEquals(ResultCodes.OB_TABLEGROUP_NOT_EXIST.errorCode, ((ObTableException) thrown.getCause()).getErrorCode()); + assertThrows(FeatureNotSupportedException.class, + () -> { + admin.getRegionMetrics(ServerName.valueOf("localhost,1,1")); + }); + // insert 300 thousand of rows in each table under tablegroup test_multi_cf + batchInsert(100000, tablegroup1); + List metrics = admin.getRegionMetrics(null, TableName.valueOf(tablegroup1)); + for (RegionMetrics regionMetrics : metrics) { + System.out.println("region name: " + regionMetrics.getNameAsString() + + ", storeFileSize: " + regionMetrics.getStoreFileSize() + + ", memFileSize: " + regionMetrics.getMemStoreSize()); + } + // concurrently read while writing 150 thousand of rows to 2 tablegroups + ExecutorService executorService = Executors.newFixedThreadPool(10); + CountDownLatch latch = new CountDownLatch(100); + List exceptionCatcher = new ArrayList<>(); + for (int i = 0; i < 100; ++i) { + int taskId = i; + executorService.submit(() -> { + try { + if (taskId % 2 == 1) { + List regionMetrics = null; + // test get regionMetrics from different namespaces + if (taskId % 3 != 0) { + regionMetrics = admin.getRegionMetrics(null, TableName.valueOf(tablegroup1)); + } else { + regionMetrics = admin.getRegionMetrics(null, TableName.valueOf(tablegroup2)); + } + for (RegionMetrics m : regionMetrics) { + System.out.println("task: " + taskId + ", tablegroup: " + ((OHRegionMetrics) m).getTablegroup() + + ", region name: " + m.getNameAsString() + + ", storeFileSize: " + m.getStoreFileSize() + + ", memFileSize: " + m.getMemStoreSize()); + } + } else { + if (taskId % 8 == 0) { + batchInsert(1000, tablegroup2); + } else { + batchInsert(1000, tablegroup1); + } + System.out.println("task: " + taskId + ", batchInsert"); + } + } catch (Exception e) { + e.printStackTrace(); + exceptionCatcher.add(e); + } finally { + latch.countDown(); + } + }); + } + try { + latch.await(); + } catch (Exception e) { + e.printStackTrace(); + exceptionCatcher.add(e); + } + executorService.shutdownNow(); + Assert.assertTrue(exceptionCatcher.isEmpty()); + } + + @Test + public void testAdminDeleteTable() throws Exception { + java.sql.Connection conn = ObHTableTestUtil.getConnection(); + Statement st = conn.createStatement(); + st.execute("CREATE TABLEGROUP IF NOT EXISTS test_multi_cf SHARDING = 'ADAPTIVE';\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group2` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group3` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE DATABASE IF NOT EXISTS `n1`;\n" + + "use `n1`;\n" + + "CREATE TABLEGROUP IF NOT EXISTS `n1:test` SHARDING = 'ADAPTIVE';\n" + + "CREATE TABLE IF NOT EXISTS `n1:test$family_group` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test`;" + + "\n" + + "CREATE TABLE IF NOT EXISTS `n1:test$family1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test`;"); + st.close(); + conn.close(); + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + assertTrue(admin.tableExists(TableName.valueOf("n1", "test"))); + assertTrue(admin.tableExists(TableName.valueOf("test_multi_cf"))); + IOException thrown = assertThrows(IOException.class, + () -> { + admin.deleteTable(TableName.valueOf("tablegroup_not_exists")); + }); + Assert.assertTrue(thrown.getCause() instanceof ObTableException); + Assert.assertEquals(ResultCodes.OB_TABLEGROUP_NOT_EXIST.errorCode, ((ObTableException) thrown.getCause()).getErrorCode()); + admin.deleteTable(TableName.valueOf("n1", "test")); + admin.deleteTable(TableName.valueOf("test_multi_cf")); + assertFalse(admin.tableExists(TableName.valueOf("n1", "test"))); + assertFalse(admin.tableExists(TableName.valueOf("test_multi_cf"))); + } + + @Test + public void testAdminTableExists() throws Exception { + java.sql.Connection conn = ObHTableTestUtil.getConnection(); + Statement st = conn.createStatement(); + st.execute("CREATE TABLEGROUP IF NOT EXISTS test_multi_cf SHARDING = 'ADAPTIVE';\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group2` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE TABLE IF NOT EXISTS `test_multi_cf$family_with_group3` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = test_multi_cf PARTITION BY KEY(`K`) PARTITIONS 3;\n" + + "\n" + + "CREATE DATABASE IF NOT EXISTS `n1`;\n" + + "use `n1`;\n" + + "CREATE TABLEGROUP IF NOT EXISTS `n1:test` SHARDING = 'ADAPTIVE';\n" + + "CREATE TABLE IF NOT EXISTS `n1:test$family_group` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test`;" + + "\n" + + "CREATE TABLE IF NOT EXISTS `n1:test$family1` (\n" + + " `K` varbinary(1024) NOT NULL,\n" + + " `Q` varbinary(256) NOT NULL,\n" + + " `T` bigint(20) NOT NULL,\n" + + " `V` varbinary(1024) DEFAULT NULL,\n" + + " PRIMARY KEY (`K`, `Q`, `T`)\n" + + ") TABLEGROUP = `n1:test`;"); + st.close(); + conn.close(); + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Admin admin = connection.getAdmin(); + // TableName cannot contain $ symbol + Assert.assertThrows(IllegalArgumentException.class, + () -> { + TableName.valueOf("random_string$"); + }); + Assert.assertFalse(admin.tableExists(TableName.valueOf("tablegroup_not_exists"))); + Assert.assertTrue(admin.tableExists(TableName.valueOf("test_multi_cf"))); + Assert.assertTrue(admin.tableExists(TableName.valueOf("n1", "test"))); + } + + private void batchInsert(int rows, String tablegroup) throws Exception { + byte[] family1 = Bytes.toBytes("family_with_group1"); + byte[] family2 = Bytes.toBytes("family_with_group2"); + byte[] family3 = Bytes.toBytes("family_with_group3"); + byte[] family1_column1 = "family1_column1".getBytes(); + byte[] family1_column2 = "family1_column2".getBytes(); + byte[] family1_column3 = "family1_column3".getBytes(); + byte[] family2_column1 = "family2_column1".getBytes(); + byte[] family2_column2 = "family2_column2".getBytes(); + byte[] family2_column3 = "family2_column3".getBytes(); + byte[] family3_column1 = "family3_column1".getBytes(); + byte[] family3_column2 = "family3_column2".getBytes(); + byte[] family3_column3 = "family3_column3".getBytes(); + byte[] family1_value1 = Bytes.toBytes("family1_value1"); + byte[] family1_value2 = Bytes.toBytes("family1_value2"); + byte[] family1_value3 = Bytes.toBytes("family1_value3"); + byte[] family2_value1 = Bytes.toBytes("family2_value1"); + byte[] family2_value2 = Bytes.toBytes("family2_value2"); + byte[] family2_value3 = Bytes.toBytes("family2_value3"); + byte[] family3_value1 = Bytes.toBytes("family3_value1"); + byte[] family3_value2 = Bytes.toBytes("family3_value2"); + byte[] family3_value3 = Bytes.toBytes("family3_value3"); + Configuration conf = ObHTableTestUtil.newConfiguration(); + Connection connection = ConnectionFactory.createConnection(conf); + Table table = connection.getTable(TableName.valueOf(tablegroup)); + List batchLsit = new LinkedList<>(); + for (int i = 0; i < rows; ++i) { + Put put = new Put(toBytes("Key" + i)); + put.addColumn(family1, family1_column1, family1_value1); + put.addColumn(family1, family1_column2, family1_value2); + put.addColumn(family1, family1_column3, family1_value3); + put.addColumn(family2, family2_column1, family2_value1); + put.addColumn(family2, family2_column2, family2_value2); + put.addColumn(family2, family2_column3, family2_value3); + put.addColumn(family3, family3_column1, family3_value1); + put.addColumn(family3, family3_column2, family3_value2); + put.addColumn(family3, family3_column3, family3_value3); + batchLsit.add(put); + if (i % 100 == 0) { // 100 rows one batch to avoid OB_TIMEOUT + Object[] results = new Object[batchLsit.size()]; + table.batch(batchLsit, results); + batchLsit.clear(); + } + } + Object[] results = new Object[batchLsit.size()]; + table.batch(batchLsit, results); + } } diff --git a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableTimeSeriesDeleteTest.java b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableTimeSeriesDeleteTest.java index 5669da70..3e85b904 100644 --- a/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableTimeSeriesDeleteTest.java +++ b/src/test/java/com/alipay/oceanbase/hbase/secondary/OHTableTimeSeriesDeleteTest.java @@ -75,7 +75,7 @@ public static void testDeleteColumnImpl(String tableName) throws Exception { for (int i = 0; i < values.length; i++) { Put put = new Put(toBytes(key)); - put.add(family.getBytes(), columns[i].getBytes(), ts[i], toBytes(values[i])); + put.addColumn(family.getBytes(), columns[i].getBytes(), ts[i], toBytes(values[i])); hTable.put(put); } @@ -126,7 +126,7 @@ public static void testDeleteColumnsImpl(String tableName) throws Exception { for (int i = 0; i < values.length; i++) { Put put = new Put(toBytes(key)); - put.add(family.getBytes(), columns[i].getBytes(), ts[i] + i, toBytes(values[i] + i)); + put.addColumn(family.getBytes(), columns[i].getBytes(), ts[i] + i, toBytes(values[i] + i)); hTable.put(put); } @@ -177,7 +177,7 @@ public static void testDeleteFamilyImpl(String tableName) throws Exception { for (int i = 0; i < values.length; i++) { Put put = new Put(toBytes(key)); - put.add(family.getBytes(), columns[i].getBytes(), ts[i] + i, toBytes(values[i] + i)); + put.addColumn(family.getBytes(), columns[i].getBytes(), ts[i] + i, toBytes(values[i] + i)); hTable.put(put); } @@ -229,8 +229,8 @@ public static void testDeleteFamilyWithTimestampImpl(String tableName) throws Ex for (int i = 0; i < 3; i++) { Put put = new Put(toBytes(key)); - put.add(family.getBytes(), columns[0].getBytes(), curTs + i, toBytes(value + i)); - put.add(family.getBytes(), columns[1].getBytes(), curTs + i, toBytes(value + i)); + put.addColumn(family.getBytes(), columns[0].getBytes(), curTs + i, toBytes(value + i)); + put.addColumn(family.getBytes(), columns[1].getBytes(), curTs + i, toBytes(value + i)); hTable.put(put); } @@ -264,8 +264,8 @@ public static void testDeleteFamilyVersionImpl(String tableName) throws Exceptio for (int i = 0; i < 3; i++) { Put put = new Put(toBytes(key)); - put.add(family.getBytes(), columns[0].getBytes(), curTs + i, toBytes(value + i)); - put.add(family.getBytes(), columns[1].getBytes(), curTs + i, toBytes(value + i)); + put.addColumn(family.getBytes(), columns[0].getBytes(), curTs + i, toBytes(value + i)); + put.addColumn(family.getBytes(), columns[1].getBytes(), curTs + i, toBytes(value + i)); hTable.put(put); }