Skip to content

Commit 8a7c8c5

Browse files
authored
Merge pull request #141 from oceanbase/batch_get
Batch Get 1.x client
2 parents 7a32095 + 33dd840 commit 8a7c8c5

File tree

5 files changed

+549
-680
lines changed

5 files changed

+549
-680
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 146 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,15 @@
2323
import com.alipay.oceanbase.hbase.filter.HBaseFilterUtils;
2424
import com.alipay.oceanbase.hbase.result.ClientStreamScanner;
2525
import com.alipay.oceanbase.hbase.util.*;
26+
import com.alipay.oceanbase.rpc.ObGlobal;
2627
import com.alipay.oceanbase.rpc.ObTableClient;
2728
import com.alipay.oceanbase.rpc.exception.ObTableException;
2829
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
30+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2931
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
3032
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
33+
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
34+
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
3135
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
3236
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
3337
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.*;
@@ -40,11 +44,13 @@
4044
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
4145
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
4246
import com.alipay.oceanbase.rpc.table.ObKVParams;
47+
import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl;
4348

4449
import com.google.protobuf.Descriptors;
4550
import com.google.protobuf.Message;
4651
import com.google.protobuf.Service;
4752
import com.google.protobuf.ServiceException;
53+
import jdk.nashorn.internal.objects.Global;
4854
import org.apache.hadoop.classification.InterfaceAudience;
4955
import org.apache.hadoop.conf.Configuration;
5056
import org.apache.hadoop.hbase.*;
@@ -67,6 +73,8 @@
6773
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
6874
import static com.alipay.oceanbase.hbase.util.Preconditions.checkArgument;
6975
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
76+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
77+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
7078
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperation.getInstance;
7179
import static com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationType.*;
7280
import static com.alipay.sofa.common.thread.SofaThreadPoolConstants.SOFA_THREAD_POOL_LOGGING_CAPABILITY;
@@ -458,16 +466,19 @@ public boolean exists(Get get) throws IOException {
458466

459467
@Override
460468
public boolean[] existsAll(List<Get> gets) throws IOException {
461-
if (gets.isEmpty()) {
462-
return new boolean[] {};
463-
}
464-
if (gets.size() == 1) {
465-
return new boolean[] { exists(gets.get(0)) };
466-
}
467-
Result[] r = get(gets);
468-
boolean[] ret = new boolean[r.length];
469-
for (int i = 0; i < gets.size(); ++i) {
470-
ret[i] = exists(gets.get(i));
469+
boolean[] ret = new boolean[gets.size()];
470+
List<Get> newGets = new ArrayList<>();
471+
// if just checkExistOnly, batch get will not return any result or row count
472+
// therefore we have to set checkExistOnly as false and so the result can be returned
473+
// TODO: adjust ExistOnly in server when using batch get
474+
for (Get get : gets) {
475+
Get newGet = new Get(get);
476+
newGet.setCheckExistenceOnly(false);
477+
newGets.add(newGet);
478+
}
479+
Result[] results = get(newGets);
480+
for (int i = 0; i < results.length; ++i) {
481+
ret[i] = !results[i].isEmpty();
471482
}
472483
return ret;
473484
}
@@ -625,7 +636,7 @@ private void compatOldServerBatch(final List<? extends Row> actions, final Objec
625636
}
626637

627638
@Override
628-
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException{
639+
public void batch(final List<? extends Row> actions, final Object[] results) throws IOException {
629640
if (actions == null) {
630641
return;
631642
}
@@ -637,7 +648,7 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
637648
BatchError batchError = new BatchError();
638649
obTableClient.setRuntimeBatchExecutor(executePool);
639650
List<Integer> resultMapSingleOp = new LinkedList<>();
640-
if (!CompatibilityUtil.isBatchSupport()) {
651+
if (!ObGlobal.isHBaseBatchSupport()) {
641652
try {
642653
compatOldServerBatch(actions, results, batchError);
643654
} catch (Exception e) {
@@ -660,6 +671,23 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
660671
results[i] = tmpResults.getResults().get(index);
661672
}
662673
batchError.add((ObTableException) tmpResults.getResults().get(index), actions.get(i), null);
674+
} else if (actions.get(i) instanceof Get) {
675+
if (results != null) {
676+
// get results have been wrapped in MutationResult, need to fetch it
677+
if (tmpResults.getResults().get(index) instanceof MutationResult) {
678+
MutationResult mutationResult = (MutationResult) tmpResults.getResults().get(index);
679+
ObPayload innerResult = mutationResult.getResult();
680+
if (innerResult instanceof ObTableSingleOpResult) {
681+
ObTableSingleOpResult singleOpResult = (ObTableSingleOpResult) innerResult;
682+
List<Cell> cells = generateGetResult(singleOpResult);
683+
results[i] = Result.create(cells);
684+
} else {
685+
throw new ObTableUnexpectedException("Unexpected type of result in MutationResult");
686+
}
687+
} else {
688+
throw new ObTableUnexpectedException("Unexpected type of result in batch");
689+
}
690+
}
663691
} else {
664692
if (results != null) {
665693
results[i] = new Result();
@@ -673,17 +701,58 @@ public void batch(final List<? extends Row> actions, final Object[] results) thr
673701
}
674702
}
675703

704+
private List<Cell> generateGetResult(ObTableSingleOpResult getResult) throws IOException {
705+
List<Cell> cells = new ArrayList<>();
706+
ObTableSingleOpEntity singleOpEntity = getResult.getEntity();
707+
// all values queried by this get are contained in properties
708+
// qualifier in batch get result is always appended after family
709+
List<ObObj> propertiesValues = singleOpEntity.getPropertiesValues();
710+
int valueIdx = 0;
711+
while (valueIdx < propertiesValues.size()) {
712+
// values in propertiesValues like: [ K, Q, T, V, K, Q, T, V ... ]
713+
// we need to retrieve K Q T V and construct them to cells: [ cell_0, cell_1, ... ]
714+
byte[][] familyAndQualifier = new byte[2][];
715+
try {
716+
// split family and qualifier
717+
familyAndQualifier = OHBaseFuncUtils
718+
.extractFamilyFromQualifier((byte[]) propertiesValues.get(valueIdx + 1).getValue());
719+
} catch (Exception e) {
720+
throw new IOException(e);
721+
}
722+
KeyValue kv = new KeyValue((byte[]) propertiesValues.get(valueIdx).getValue(),//K
723+
familyAndQualifier[0], // family
724+
familyAndQualifier[1], // qualifiermat
725+
(Long) propertiesValues.get(valueIdx + 2).getValue(), // T
726+
(byte[]) propertiesValues.get(valueIdx + 3).getValue()// V
727+
);
728+
cells.add(kv);
729+
valueIdx += 4;
730+
}
731+
return cells;
732+
}
733+
734+
676735
private String getTargetTableName(List<? extends Row> actions) {
677736
byte[] family = null;
678737
for (Row action : actions) {
679738
if (action instanceof RowMutations || action instanceof RegionCoprocessorServiceExec) {
680739
throw new FeatureNotSupportedException("not supported yet'");
681740
} else {
682-
Mutation mutation = (Mutation) action;
683-
if (mutation.getFamilyCellMap().size() != 1) {
741+
Set<byte[]> familySet = null;
742+
if (action instanceof Get) {
743+
Get get = (Get) action;
744+
familySet = get.familySet();
745+
} else {
746+
Mutation mutation = (Mutation) action;
747+
familySet = mutation.getFamilyCellMap().keySet();
748+
}
749+
if (familySet == null) {
750+
throw new ObTableUnexpectedException("Fail to get family set in action");
751+
}
752+
if (familySet.size() != 1) {
684753
return getTargetTableName(tableNameString);
685754
} else {
686-
byte[] nextFamily = mutation.getFamilyCellMap().keySet().iterator().next();
755+
byte[] nextFamily = familySet.iterator().next();
687756
if (family != null && !Arrays.equals(family, nextFamily)) {
688757
return getTargetTableName(tableNameString);
689758
} else if (family == null) {
@@ -702,6 +771,7 @@ public Object[] batch(List<? extends Row> actions) throws IOException {
702771
return results;
703772
}
704773

774+
@Override
705775
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
706776
Batch.Callback<R> callback) throws IOException,
707777
InterruptedException {
@@ -879,17 +949,21 @@ public Result call() throws IOException {
879949
@Override
880950
public Result[] get(List<Get> gets) throws IOException {
881951
Result[] results = new Result[gets.size()];
882-
List<Future<Result>> futures = new LinkedList<>();
883-
for (int i = 0; i < gets.size(); i++) {
884-
int index = i;
885-
Future<Result> future = executePool.submit(() -> get(gets.get(index)));
886-
futures.add(future);
887-
}
888-
for (int i = 0; i < gets.size(); i++) {
889-
try {
890-
results[i] = futures.get(i).get();
891-
} catch (Exception e) {
892-
throw new RuntimeException("gets occur error. index:{" + i + "}", e);
952+
if (ObGlobal.isHBaseBatchGetSupport()) { // get only supported in BatchSupport version
953+
batch(gets, results);
954+
} else {
955+
List<Future<Result>> futures = new LinkedList<>();
956+
for (int i = 0; i < gets.size(); i++) {
957+
int index = i;
958+
Future<Result> future = executePool.submit(() -> get(gets.get(index)));
959+
futures.add(future);
960+
}
961+
for (int i = 0; i < gets.size(); i++) {
962+
try {
963+
results[i] = futures.get(i).get();
964+
} catch (Exception e) {
965+
throw new RuntimeException("gets occur error. index:{" + i + "}", e);
966+
}
893967
}
894968
}
895969
return results;
@@ -1668,6 +1742,8 @@ public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescri
16681742
@Override
16691743
public void setOperationTimeout(int operationTimeout) {
16701744
this.operationTimeout = operationTimeout;
1745+
this.obTableClient.setRuntimeMaxWait(operationTimeout);
1746+
this.obTableClient.setRuntimeBatchMaxWait(operationTimeout);
16711747
this.operationExecuteInPool = this.configuration.getBoolean(
16721748
HBASE_CLIENT_OPERATION_EXECUTE_IN_POOL,
16731749
(this.operationTimeout != HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
@@ -1994,14 +2070,55 @@ private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
19942070
private BatchOperation buildBatchOperation(String tableName, List<? extends Row> actions,
19952071
boolean isTableGroup, List<Integer> resultMapSingleOp)
19962072
throws FeatureNotSupportedException,
1997-
IllegalArgumentException {
2073+
IllegalArgumentException,
2074+
IOException {
19982075
BatchOperation batch = obTableClient.batchOperation(tableName);
19992076
int posInList = -1;
20002077
int singleOpResultNum;
20012078
for (Row row : actions) {
20022079
singleOpResultNum = 0;
20032080
posInList++;
2004-
if (row instanceof Put) {
2081+
if (row instanceof Get) {
2082+
if (!ObGlobal.isHBaseBatchGetSupport()) {
2083+
throw new FeatureNotSupportedException("server does not support batch get");
2084+
}
2085+
++singleOpResultNum;
2086+
Get get = (Get) row;
2087+
ObTableQuery obTableQuery;
2088+
// In a Get operation in ls batch, we need to determine whether the get is a table-group operation or not,
2089+
// we handle this by appending the column family to the qualifier on the client side.
2090+
// The server can then use this information to filter the appropriate column families and qualifiers.
2091+
if ((get.getFamilyMap().keySet().isEmpty()
2092+
|| get.getFamilyMap().size() > 1) &&
2093+
!get.getColumnFamilyTimeRange().isEmpty()) {
2094+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
2095+
} else if (get.getFamilyMap().size() == 1 && !get.getColumnFamilyTimeRange().isEmpty()) {
2096+
byte[] family = get.getFamilyMap().keySet().iterator().next();
2097+
Map<byte[], TimeRange> colFamTimeRangeMap = get.getColumnFamilyTimeRange();
2098+
if (colFamTimeRangeMap.size() > 1) {
2099+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
2100+
} else if (colFamTimeRangeMap.get(family) == null) {
2101+
throw new IllegalArgumentException("Get family is not matched in ColumnFamilyTimeRange");
2102+
} else {
2103+
TimeRange tr = colFamTimeRangeMap.get(family);
2104+
get.setTimeRange(tr.getMin(), tr.getMax());
2105+
}
2106+
}
2107+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
2108+
// in batch get, we need to carry family in qualifier to server even this get is a single-cf operation
2109+
// because the entire batch may be a multi-cf batch so do not carry family
2110+
// family in qualifier helps us to know which table to query
2111+
processColumnFilters(columnFilters, get.getFamilyMap());
2112+
obTableQuery = buildObTableQuery(get, columnFilters);
2113+
ObTableClientQueryImpl query = new ObTableClientQueryImpl(tableName, obTableQuery, obTableClient);
2114+
try {
2115+
query.setRowKey(row(colVal("K", Bytes.toString(get.getRow())), colVal("Q", null), colVal("T", null)));
2116+
} catch (Exception e) {
2117+
logger.error("unexpected error occurs when set row key", e);
2118+
throw new IOException(e);
2119+
}
2120+
batch.addOperation(query);
2121+
} else if (row instanceof Put) {
20052122
Put put = (Put) row;
20062123
if (put.isEmpty()) {
20072124
throw new IllegalArgumentException("No columns to insert for #"
@@ -2049,7 +2166,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
20492166
}
20502167
} else {
20512168
throw new FeatureNotSupportedException(
2052-
"not supported other type in batch yet,only support put and delete");
2169+
"not supported other type in batch yet,only support get, put and delete");
20532170
}
20542171
resultMapSingleOp.add(singleOpResultNum);
20552172
}

src/main/java/com/alipay/oceanbase/hbase/util/CompatibilityUtil.java

Lines changed: 0 additions & 9 deletions
This file was deleted.

0 commit comments

Comments
 (0)