Skip to content

Commit 6dc8360

Browse files
authored
Merge pull request #143 from oceanbase/getscanners_2.0
[CP] Patch getscanners to hbase_2.x
2 parents 14e5578 + ead8446 commit 6dc8360

File tree

5 files changed

+465
-2
lines changed

5 files changed

+465
-2
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.alipay.oceanbase.rpc.ObTableClient;
2828
import com.alipay.oceanbase.rpc.exception.ObTableException;
2929
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
30+
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
3031
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
3132
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
3233
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
@@ -1086,6 +1087,105 @@ public ResultScanner call() throws IOException {
10861087
return executeServerCallable(serverCallable);
10871088
}
10881089

1090+
public List<ResultScanner> getScanners(final Scan scan) throws IOException {
1091+
1092+
if (scan.getFamilyMap().keySet().isEmpty()) {
1093+
// check nothing, use table group;
1094+
} else {
1095+
checkFamilyViolation(scan.getFamilyMap().keySet(), false);
1096+
}
1097+
1098+
//be careful about the packet size ,may the packet exceed the max result size ,leading to error
1099+
ServerCallable<List<ResultScanner>> serverCallable = new ServerCallable<List<ResultScanner>>(
1100+
configuration, obTableClient, tableNameString, scan.getStartRow(), scan.getStopRow(),
1101+
operationTimeout) {
1102+
public List<ResultScanner> call() throws IOException {
1103+
byte[] family = new byte[] {};
1104+
List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
1105+
ObTableClientQueryAsyncStreamResult clientQueryAsyncStreamResult;
1106+
ObTableQueryAsyncRequest request;
1107+
ObTableQuery obTableQuery;
1108+
ObHTableFilter filter;
1109+
try {
1110+
if (scan.getFamilyMap().keySet() == null
1111+
|| scan.getFamilyMap().keySet().isEmpty()
1112+
|| scan.getFamilyMap().size() > 1) {
1113+
// In a Scan operation where the family map is greater than 1 or equal to 0,
1114+
// we handle this by appending the column family to the qualifier on the client side.
1115+
// The server can then use this information to filter the appropriate column families and qualifiers.
1116+
if (!scan.getColumnFamilyTimeRange().isEmpty()) {
1117+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
1118+
}
1119+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1120+
processColumnFilters(columnFilters, scan.getFamilyMap());
1121+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1122+
scan.getMaxVersions(), columnFilters);
1123+
obTableQuery = buildObTableQuery(filter, scan);
1124+
1125+
request = buildObTableQueryAsyncRequest(obTableQuery,
1126+
getTargetTableName(tableNameString));
1127+
String phyTableName = obTableClient.getPhyTableNameFromTableGroup(
1128+
request.getObTableQueryRequest(), tableNameString);
1129+
List<Partition> partitions = obTableClient.getPartition(phyTableName, false);
1130+
for (Partition partition : partitions) {
1131+
request.getObTableQueryRequest().setTableQueryPartId(
1132+
partition.getPartId());
1133+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1134+
.execute(request);
1135+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1136+
clientQueryAsyncStreamResult, tableNameString, scan, true);
1137+
resultScanners.add(clientScanner);
1138+
}
1139+
return resultScanners;
1140+
} else {
1141+
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
1142+
.entrySet()) {
1143+
family = entry.getKey();
1144+
if (!scan.getColumnFamilyTimeRange().isEmpty()) {
1145+
Map<byte[], TimeRange> colFamTimeRangeMap = scan.getColumnFamilyTimeRange();
1146+
if (colFamTimeRangeMap.size() > 1) {
1147+
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
1148+
} else if (colFamTimeRangeMap.get(family) == null) {
1149+
throw new IllegalArgumentException("Scan family is not matched in ColumnFamilyTimeRange");
1150+
} else {
1151+
TimeRange tr = colFamTimeRangeMap.get(family);
1152+
scan.setTimeRange(tr.getMin(), tr.getMax());
1153+
}
1154+
}
1155+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1156+
scan.getMaxVersions(), entry.getValue());
1157+
obTableQuery = buildObTableQuery(filter, scan);
1158+
1159+
String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family),
1160+
configuration);
1161+
request = buildObTableQueryAsyncRequest(obTableQuery, targetTableName);
1162+
List<Partition> partitions = obTableClient
1163+
.getPartition(targetTableName, false);
1164+
for (Partition partition : partitions) {
1165+
request.getObTableQueryRequest().setTableQueryPartId(
1166+
partition.getPartId());
1167+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1168+
.execute(request);
1169+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1170+
clientQueryAsyncStreamResult, tableNameString, scan, false);
1171+
resultScanners.add(clientScanner);
1172+
}
1173+
return resultScanners;
1174+
}
1175+
}
1176+
} catch (Exception e) {
1177+
logger.error(LCD.convert("01-00003"), tableNameString, Bytes.toString(family),
1178+
e);
1179+
throw new IOException("scan table:" + tableNameString + " family "
1180+
+ Bytes.toString(family) + " error.", e);
1181+
}
1182+
1183+
throw new IOException("scan table:" + tableNameString + "has no family");
1184+
}
1185+
};
1186+
return executeServerCallable(serverCallable);
1187+
}
1188+
10891189
@Override
10901190
public ResultScanner getScanner(final byte[] family) throws IOException {
10911191
Scan scan = new Scan();

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,11 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
290290
return ohTable.getScanner(family, qualifier);
291291
}
292292

293+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
294+
checkStatus();
295+
return ohTable.getScanners(scan);
296+
}
297+
293298
@Override
294299
public void put(Put put) throws IOException {
295300
checkStatus();

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,16 @@ public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOExcept
769769
return table.getScanner(family, qualifier);
770770
}
771771

772+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
773+
if (table instanceof OHTableClient) {
774+
return ((OHTableClient) table).getScanners(scan);
775+
} else if (table instanceof OHTable) {
776+
return ((OHTable) table).getScanners(scan);
777+
}
778+
779+
throw new IllegalArgumentException("just support for OHTable and OHTableClient");
780+
}
781+
772782
@Override
773783
public void put(Put put) throws IOException {
774784
table.put(put);

0 commit comments

Comments
 (0)