Skip to content

Commit b10890e

Browse files
committed
patch getscanners to hbase_2.x
1 parent 14e5578 commit b10890e

File tree

5 files changed

+452
-2
lines changed

5 files changed

+452
-2
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alipay.oceanbase.rpc.ObTableClient;
2828
import com.alipay.oceanbase.rpc.exception.ObTableException;
2929
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
30+
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
3031
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
3132
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
3233
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
@@ -1086,6 +1087,92 @@ public ResultScanner call() throws IOException {
10861087
return executeServerCallable(serverCallable);
10871088
}
10881089

1090+
public List<ResultScanner> getScanners(final Scan scan) throws IOException {
1091+
1092+
if (scan.getFamilyMap().keySet().isEmpty()) {
1093+
// check nothing, use table group;
1094+
} else {
1095+
checkFamilyViolation(scan.getFamilyMap().keySet(), false);
1096+
}
1097+
1098+
//be careful about the packet size ,may the packet exceed the max result size ,leading to error
1099+
ServerCallable<List<ResultScanner>> serverCallable = new ServerCallable<List<ResultScanner>>(
1100+
configuration, obTableClient, tableNameString, scan.getStartRow(), scan.getStopRow(),
1101+
operationTimeout) {
1102+
public List<ResultScanner> call() throws IOException {
1103+
byte[] family = new byte[] {};
1104+
ObTableClientQueryAsyncStreamResult clientQueryAsyncStreamResult;
1105+
ObTableQueryAsyncRequest request;
1106+
ObTableQuery obTableQuery;
1107+
ObHTableFilter filter;
1108+
try {
1109+
if (scan.getFamilyMap().keySet() == null
1110+
|| scan.getFamilyMap().keySet().isEmpty()
1111+
|| scan.getFamilyMap().size() > 1) {
1112+
// In a Scan operation where the family map is greater than 1 or equal to 0,
1113+
// we handle this by appending the column family to the qualifier on the client side.
1114+
// The server can then use this information to filter the appropriate column families and qualifiers.
1115+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1116+
processColumnFilters(columnFilters, scan.getFamilyMap());
1117+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1118+
scan.getMaxVersions(), columnFilters);
1119+
obTableQuery = buildObTableQuery(filter, scan);
1120+
List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
1121+
1122+
request = buildObTableQueryAsyncRequest(obTableQuery,
1123+
getTargetTableName(tableNameString));
1124+
String phyTableName = obTableClient.getPhyTableNameFromTableGroup(
1125+
request.getObTableQueryRequest(), tableNameString);
1126+
List<Partition> partitions = obTableClient.getPartition(phyTableName, false);
1127+
for (Partition partition : partitions) {
1128+
request.getObTableQueryRequest().setTableQueryPartId(
1129+
partition.getPartId());
1130+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1131+
.execute(request);
1132+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1133+
clientQueryAsyncStreamResult, tableNameString, scan, true);
1134+
resultScanners.add(clientScanner);
1135+
}
1136+
return resultScanners;
1137+
} else {
1138+
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
1139+
.entrySet()) {
1140+
family = entry.getKey();
1141+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1142+
scan.getMaxVersions(), entry.getValue());
1143+
obTableQuery = buildObTableQuery(filter, scan);
1144+
1145+
List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
1146+
String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family),
1147+
configuration);
1148+
request = buildObTableQueryAsyncRequest(obTableQuery, targetTableName);
1149+
List<Partition> partitions = obTableClient
1150+
.getPartition(targetTableName, false);
1151+
for (Partition partition : partitions) {
1152+
request.getObTableQueryRequest().setTableQueryPartId(
1153+
partition.getPartId());
1154+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1155+
.execute(request);
1156+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1157+
clientQueryAsyncStreamResult, tableNameString, scan, false);
1158+
resultScanners.add(clientScanner);
1159+
}
1160+
return resultScanners;
1161+
}
1162+
}
1163+
} catch (Exception e) {
1164+
logger.error(LCD.convert("01-00003"), tableNameString, Bytes.toString(family),
1165+
e);
1166+
throw new IOException("scan table:" + tableNameString + " family "
1167+
+ Bytes.toString(family) + " error.", e);
1168+
}
1169+
1170+
throw new IOException("scan table:" + tableNameString + "has no family");
1171+
}
1172+
};
1173+
return executeServerCallable(serverCallable);
1174+
}
1175+
10891176
@Override
10901177
public ResultScanner getScanner(final byte[] family) throws IOException {
10911178
Scan scan = new Scan();

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
290290
return ohTable.getScanner(family, qualifier);
291291
}
292292

293+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
294+
checkStatus();
295+
return ohTable.getScanners(scan);
296+
}
297+
293298
@Override
294299
public void put(Put put) throws IOException {
295300
checkStatus();

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,16 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
769769
return table.getScanner(family, qualifier);
770770
}
771771

772+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
773+
if (table instanceof OHTableClient) {
774+
return ((OHTableClient) table).getScanners(scan);
775+
} else if (table instanceof OHTable) {
776+
return ((OHTable) table).getScanners(scan);
777+
}
778+
779+
throw new IllegalArgumentException("just support for OHTable and OHTableClient");
780+
}
781+
772782
@Override
773783
public void put(Put put) throws IOException {
774784
table.put(put);

0 commit comments

Comments
 (0)