Skip to content

Commit 14e5578

Browse files
authored
Batch Get 2.x client (#142)
* init get query building in batch; fix mulfi-cf test case bug * deal with batch get * compatible hbase 2.0 * add test case for batch get and processing * add batch get test cases and compatible batch case to hbase 2.0 * change to use propertities * add version control for batch get * compatible new client to old server * batch get compatible to hbase 2.x * revert clear tableClient cache when OHTable close * do compatibility for batch get and batch put + delete * exists and get<List> use batch get * fix existsAll using batch get * keep the original gets * remove useless variable * add setRuntimeMaxWait and setRuntimeBatchMaxWait in OHTable setOperationTimeout
1 parent 2ed38ba commit 14e5578

File tree

4 files changed

+562
-43
lines changed

4 files changed

+562
-43
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 142 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,14 @@
2323
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
2424
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
2525
import com.alipay.oceanbase.hbase.util.*;
26+
import com.alipay.oceanbase.rpc.ObGlobal;
2627
import com.alipay.oceanbase.rpc.ObTableClient;
2728
import com.alipay.oceanbase.rpc.exception.ObTableException;
29+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2830
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
2931
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
32+
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
33+
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
3034
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
3135
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3236
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
@@ -39,6 +43,7 @@
3943
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
4044
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
4145
import com.alipay.oceanbase.rpc.table.ObKVParams;
46+
import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl;
4247
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
4348

4449
import com.google.protobuf.Descriptors;
@@ -68,6 +73,8 @@
6873
import static com.alipay.oceanbase.hbase.util.Preconditions.checkNotNull;
6974
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
7075
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.TABLE_HBASE_LOGGER_SPACE;
76+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
77+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
7178
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance;
7279
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
7380
import static com.alipay.sofa.common.thread.SofaThreadPoolConstants.SOFA_THREAD_POOL_LOGGING_CAPABILITY;
@@ -523,8 +530,18 @@ public boolean exists(Get get) throws IOException {
523530
@Override
524531
public boolean[] existsAll(List<Get> gets) throws IOException {
525532
boolean[] ret = new boolean[gets.size()];
526-
for (int i = 0; i < gets.size(); ++i) {
527-
ret[i] = exists(gets.get(i));
533+
List<Get> newGets = new ArrayList<>();
534+
// if just checkExistOnly, batch get will not return any result or row count
535+
// therefore we have to set checkExistOnly as false and so the result can be returned
536+
// TODO: adjust ExistOnly in server when using batch get
537+
for (Get get : gets) {
538+
Get newGet = new Get(get);
539+
newGet.setCheckExistenceOnly(false);
540+
newGets.add(newGet);
541+
}
542+
Result[] results = get(newGets);
543+
for (int i = 0; i < results.length; ++i) {
544+
ret[i] = !results[i].isEmpty();
528545
}
529546
return ret;
530547
}
@@ -681,7 +698,7 @@ private void compatOldServerBatch(final List<? extends Row> actions, final Objec
681698
}
682699

683700
@Override
684-
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException{
701+
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException {
685702
if (actions == null) {
686703
return;
687704
}
@@ -693,7 +710,7 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
693710
BatchError batchError = new BatchError();
694711
obTableClient.setRuntimeBatchExecutor(executePool);
695712
List<Integer> resultMapSingleOp = new LinkedList<>();
696-
if (!CompatibilityUtil.isBatchSupport()) {
713+
if (!ObGlobal.isHBaseBatchSupport()) {
697714
try {
698715
compatOldServerBatch(actions, results, batchError);
699716
} catch (Exception e) {
@@ -716,6 +733,23 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
716733
results[i] = tmpResults.getResults().get(index);
717734
}
718735
batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null);
736+
} else if (actions.get(i) instanceof Get) {
737+
if (results != null) {
738+
// get results have been wrapped in MutationResult, need to fetch it
739+
if (tmpResults.getResults().get(index) instanceof MutationResult) {
740+
MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index);
741+
ObPayload innerResult = mutationResult.getResult();
742+
if (innerResult instanceof ObTableSingleOpResult) {
743+
ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult;
744+
List<Cell> cells = generateGetResult(singleOpResult);
745+
results[i] = Result.create(cells);
746+
} else {
747+
throw new ObTableUnexpectedException("Unexpected type of result in MutationResult");
748+
}
749+
} else {
750+
throw new ObTableUnexpectedException("Unexpected type of result in batch");
751+
}
752+
}
719753
} else {
720754
if (results != null) {
721755
results[i] = new Result();
@@ -729,17 +763,57 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
729763
}
730764
}
731765

766+
private List<Cell> generateGetResult(ObTableSingleOpResult getResult) throws IOException {
767+
List<Cell> cells = new ArrayList<>();
768+
ObTableSingleOpEntity singleOpEntity = getResult.getEntity();
769+
// all values queried by this get are contained in properties
770+
// qualifier in batch get result is always appended after family
771+
List<ObObj> propertiesValues = singleOpEntity.getPropertiesValues();
772+
int valueIdx = 0;
773+
while (valueIdx < propertiesValues.size()) {
774+
// values in propertiesValues like: [ K, Q, T, V, K, Q, T, V ... ]
775+
// we need to retrieve K Q T V and construct them to cells: [ cell_0, cell_1, ... ]
776+
byte[][] familyAndQualifier = new byte[2][];
777+
try {
778+
// split family and qualifier
779+
familyAndQualifier = OHBaseFuncUtils
780+
.extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue());
781+
} catch (Exception e) {
782+
throw new IOException(e);
783+
}
784+
KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K
785+
familyAndQualifier[0], // family
786+
familyAndQualifier[1], // qualifiermat
787+
(Long) propertiesValues.get(valueIdx + 2).getValue(), // T
788+
(byte[]) propertiesValues.get(valueIdx + 3).getValue()// V
789+
);
790+
cells.add(kv);
791+
valueIdx += 4;
792+
}
793+
return cells;
794+
}
795+
732796
private String getTargetTableName(List<? extends Row> actions) {
733797
byte[] family = null;
734798
for (Row action : actions) {
735799
if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) {
736800
throw new FeatureNotSupportedException("not supported yet'");
737801
} else {
738-
Mutation mutation = (Mutation) action;
739-
if (mutation.getFamilyCellMap().size() != 1) {
802+
Set<byte[]> familySet = null;
803+
if (action instanceof Get){
804+
Get get = (Get) action;
805+
familySet = get.familySet();
806+
} else {
807+
Mutation mutation = (Mutation) action;
808+
familySet = mutation.getFamilyCellMap().keySet();
809+
}
810+
if (familySet == null) {
811+
throw new ObTableUnexpectedException("Fail to get family set in action");
812+
}
813+
if (familySet.size() != 1) {
740814
return getTargetTableName(tableNameString);
741815
} else {
742-
byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next();
816+
byte[] nextFamily = familySet.iterator().next();
743817
if (family != null && !Arrays.equals(family, nextFamily)) {
744818
return getTargetTableName(tableNameString);
745819
} else if (family == null) {
@@ -825,8 +899,8 @@ private String getTargetTableName(String tableNameString) {
825899
return tableNameString;
826900
}
827901

828-
// To enable the server to identify the column family to which a qualifier belongs,
829-
// the client writes the column family name into the qualifier.
902+
// To enable the server to identify the column family to which a qualifier belongs,
903+
// the client writes the column family name into the qualifier.
830904
// The server then parses this information to determine the table that needs to be operated on.
831905
private void processColumnFilters(NavigableSet<byte[]> columnFilters,
832906
Map<byte[], NavigableSet<byte[]>> familyMap) {
@@ -862,8 +936,8 @@ public Result call() throws IOException {
862936
if (get.getFamilyMap().keySet() == null
863937
|| get.getFamilyMap().keySet().isEmpty()
864938
|| get.getFamilyMap().size() > 1) {
865-
// In a Get operation where the family map is greater than 1 or equal to 0,
866-
// we handle this by appending the column family to the qualifier on the client side.
939+
// In a Get operation where the family map is greater than 1 or equal to 0,
940+
// we handle this by appending the column family to the qualifier on the client side.
867941
// The server can then use this information to filter the appropriate column families and qualifiers.
868942
if (!get.getColumnFamilyTimeRange().isEmpty()) {
869943
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
@@ -920,8 +994,12 @@ public Result call() throws IOException {
920994
@Override
921995
public Result[] get(List<Get> gets) throws IOException {
922996
Result[] results = new Result[gets.size()];
923-
for (int i = 0; i < gets.size(); i++) {
924-
results[i] = get(gets.get(i));
997+
if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version
998+
batch(gets, results);
999+
} else {
1000+
for (int i = 0; i < gets.size(); i++) {
1001+
results[i] = get(gets.get(i));
1002+
}
9251003
}
9261004
return results;
9271005
}
@@ -948,8 +1026,8 @@ public ResultScanner call() throws IOException {
9481026
if (scan.getFamilyMap().keySet() == null
9491027
|| scan.getFamilyMap().keySet().isEmpty()
9501028
|| scan.getFamilyMap().size() > 1) {
951-
// In a Scan operation where the family map is greater than 1 or equal to 0,
952-
// we handle this by appending the column family to the qualifier on the client side.
1029+
// In a Scan operation where the family map is greater than 1 or equal to 0,
1030+
// we handle this by appending the column family to the qualifier on the client side.
9531031
// The server can then use this information to filter the appropriate column families and qualifiers.
9541032
if (!scan.getColumnFamilyTimeRange().isEmpty()) {
9551033
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
@@ -1464,7 +1542,6 @@ public void close() throws IOException {
14641542
if (cleanupPoolOnClose) {
14651543
executePool.shutdown();
14661544
}
1467-
ObTableClientManager.clear();
14681545
}
14691546

14701547
@Override
@@ -1514,6 +1591,8 @@ public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescri
15141591
@Override
15151592
public void setOperationTimeout(int operationTimeout) {
15161593
this.operationTimeout = operationTimeout;
1594+
this.obTableClient.setRuntimeMaxWait(operationTimeout);
1595+
this.obTableClient.setRuntimeBatchMaxWait(operationTimeout);
15171596
this.operationExecuteInPool = this.configuration.getBoolean(
15181597
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
15191598
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -1834,21 +1913,62 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
18341913
byte[] value = CellUtil.cloneValue(original);
18351914
long timestamp = original.getTimestamp();
18361915
KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode());
1837-
// Create a new KeyValue with the modified qualifier
1916+
// Create a new KeyValue with the modified qualifier
18381917
return new KeyValue(row, family, newQualifier, timestamp, type, value);
18391918
}
18401919

18411920
private BatchOperation buildBatchOperation(String tableName, List<? extends Row> actions,
18421921
boolean isTableGroup, List<Integer> resultMapSingleOp)
18431922
throws FeatureNotSupportedException,
1844-
IllegalArgumentException {
1923+
IllegalArgumentException,
1924+
IOException {
18451925
BatchOperation batch = obTableClient.batchOperation(tableName);
18461926
int posInList = -1;
18471927
int singleOpResultNum;
18481928
for (Row row : actions) {
18491929
singleOpResultNum = 0;
18501930
posInList++;
1851-
if (row instanceof Put) {
1931+
if (row instanceof Get) {
1932+
if (!ObGlobal.isHBaseBatchGetSupport()) {
1933+
throw new FeatureNotSupportedException("server does not support batch get");
1934+
}
1935+
++singleOpResultNum;
1936+
Get get = (Get) row;
1937+
ObTableQuery obTableQuery;
1938+
// In a Get operation in ls batch, we need to determine whether the get is a table-group operation or not,
1939+
// we handle this by appending the column family to the qualifier on the client side.
1940+
// The server can then use this information to filter the appropriate column families and qualifiers.
1941+
if ((get.getFamilyMap().keySet().isEmpty()
1942+
|| get.getFamilyMap().size() > 1) &&
1943+
!get.getColumnFamilyTimeRange().isEmpty()) {
1944+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
1945+
} else if (get.getFamilyMap().size() == 1 && !get.getColumnFamilyTimeRange().isEmpty()) {
1946+
byte[] family = get.getFamilyMap().keySet().iterator().next();
1947+
Map<byte[], TimeRange> colFamTimeRangeMap = get.getColumnFamilyTimeRange();
1948+
if (colFamTimeRangeMap.size() > 1) {
1949+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
1950+
} else if (colFamTimeRangeMap.get(family) == null) {
1951+
throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange");
1952+
} else {
1953+
TimeRange tr = colFamTimeRangeMap.get(family);
1954+
get.setTimeRange(tr.getMin(), tr.getMax());
1955+
}
1956+
}
1957+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1958+
// in batch get, we need to carry family in qualifier to server even this get is a single-cf operation
1959+
// because the entire batch may be a multi-cf batch so do not carry family
1960+
// family in qualifier helps us to know which table to query
1961+
processColumnFilters(columnFilters, get.getFamilyMap());
1962+
obTableQuery = buildObTableQuery(get, columnFilters);
1963+
ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient);
1964+
try {
1965+
query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", null)));
1966+
} catch (Exception e) {
1967+
logger.error("unexpected error occurs when set row key", e);
1968+
throw new IOException(e);
1969+
}
1970+
batch.addOperation(query);
1971+
} else if (row instanceof Put) {
18521972
Put put = (Put) row;
18531973
if (put.isEmpty()) {
18541974
throw new IllegalArgumentException("No columns to insert for #"
@@ -1882,7 +2002,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
18822002
}
18832003
} else {
18842004
throw new FeatureNotSupportedException(
1885-
"not supported other type in batch yet,only support put and delete");
2005+
"not supported other type in batch yet,only support get, put and delete");
18862006
}
18872007
resultMapSingleOp.add(singleOpResultNum);
18882008
}
@@ -1983,8 +2103,8 @@ public static void checkFamilyViolation(Collection<byte[]> families, boolean che
19832103
}
19842104
}
19852105

1986-
// This method is currently only used for append and increment operations.
1987-
// It restricts these two methods to use multi-column family operations.
2106+
// This method is currently only used for append and increment operations.
2107+
// It restricts these two methods to use multi-column family operations.
19882108
// Note: After completing operations on multiple column families, they are deleted using the method described above.
19892109
public static void checkFamilyViolationForOneFamily(Collection<byte[]> families) {
19902110
if (families == null || families.size() == 0) {

src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,6 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
126126
return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey);
127127
}
128128

129-
public static void clear() throws IOException {
130-
try {
131-
for (Map.Entry<ObTableClientKey, ObTableClient> pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) {
132-
pair.getValue().close();
133-
}
134-
}
135-
catch (Exception e) {
136-
throw new IOException("fail to close tableClient" , e);
137-
}
138-
OB_TABLE_CLIENT_INSTANCE.clear();
139-
}
140-
141129
public static class ObTableClientKey {
142130
private String paramUrl;
143131
private String fullUserName;

0 commit comments

Comments
 (0)