Skip to content

Commit 145a3e3

Browse files
authored
add getScanners to accelerate data synchronization (#55)
* getScanners passed single-thread and multi-thread tests with non-partition and key/range partition table * update non-partition test * support OHTablePool.PooledOHTable getScanners * correct exception message * correct format * add random_string_generator in test * delete useless dependency in OHTable * merge pull get_scanners into master * fix bugs after merge * move hbase-site.xml from src to test
1 parent a2103d8 commit 145a3e3

File tree

7 files changed

+1106
-14
lines changed

7 files changed

+1106
-14
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<project.build.sourceEncoding>${project.encoding}</project.build.sourceEncoding>
5555
<project.encoding>UTF-8</project.encoding>
5656
<slf4j.version>1.7.21</slf4j.version>
57-
<table.client.version>1.3.0</table.client.version>
57+
<table.client.version>1.3.1-SNAPSHOT</table.client.version>
5858
</properties>
5959

6060
<dependencyManagement>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 96 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.alipay.oceanbase.hbase.util.*;
2626
import com.alipay.oceanbase.rpc.ObTableClient;
2727
import com.alipay.oceanbase.rpc.exception.ObTableException;
28+
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
2829
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
2930
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
3031
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
@@ -701,7 +702,6 @@ public Object[] batch(List<? extends Row> actions) throws IOException {
701702
return results;
702703
}
703704

704-
@Override
705705
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
706706
Batch.Callback<R> callback) throws IOException,
707707
InterruptedException {
@@ -784,8 +784,8 @@ private String getTargetTableName(String tableNameString) {
784784
return tableNameString;
785785
}
786786

787-
// To enable the server to identify the column family to which a qualifier belongs,
788-
// the client writes the column family name into the qualifier.
787+
// To enable the server to identify the column family to which a qualifier belongs,
788+
// the client writes the column family name into the qualifier.
789789
// The server then parses this information to determine the table that needs to be operated on.
790790
private void processColumnFilters(NavigableSet<byte[]> columnFilters,
791791
Map<byte[], NavigableSet<byte[]>> familyMap) {
@@ -821,8 +821,8 @@ public Result call() throws IOException {
821821
if (get.getFamilyMap().keySet() == null
822822
|| get.getFamilyMap().keySet().isEmpty()
823823
|| get.getFamilyMap().size() > 1) {
824-
// In a Get operation where the family map is greater than 1 or equal to 0,
825-
// we handle this by appending the column family to the qualifier on the client side.
824+
// In a Get operation where the family map is greater than 1 or equal to 0,
825+
// we handle this by appending the column family to the qualifier on the client side.
826826
// The server can then use this information to filter the appropriate column families and qualifiers.
827827
if (!get.getColumnFamilyTimeRange().isEmpty()) {
828828
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
@@ -928,8 +928,8 @@ public ResultScanner call() throws IOException {
928928
if (scan.getFamilyMap().keySet() == null
929929
|| scan.getFamilyMap().keySet().isEmpty()
930930
|| scan.getFamilyMap().size() > 1) {
931-
// In a Scan operation where the family map is greater than 1 or equal to 0,
932-
// we handle this by appending the column family to the qualifier on the client side.
931+
// In a Scan operation where the family map is greater than 1 or equal to 0,
932+
// we handle this by appending the column family to the qualifier on the client side.
933933
// The server can then use this information to filter the appropriate column families and qualifiers.
934934
if (!scan.getColumnFamilyTimeRange().isEmpty()) {
935935
throw new FeatureNotSupportedException("setColumnFamilyTimeRange is only supported in single column family for now");
@@ -988,6 +988,92 @@ public ResultScanner call() throws IOException {
988988
return executeServerCallable(serverCallable);
989989
}
990990

991+
public List<ResultScanner> getScanners(final Scan scan) throws IOException {
992+
993+
if (scan.getFamilyMap().keySet().isEmpty()) {
994+
// check nothing, use table group;
995+
} else {
996+
checkFamilyViolation(scan.getFamilyMap().keySet(), false);
997+
}
998+
999+
//be careful about the packet size ,may the packet exceed the max result size ,leading to error
1000+
ServerCallable<List<ResultScanner>> serverCallable = new ServerCallable<List<ResultScanner>>(
1001+
configuration, obTableClient, tableNameString, scan.getStartRow(), scan.getStopRow(),
1002+
operationTimeout) {
1003+
public List<ResultScanner> call() throws IOException {
1004+
byte[] family = new byte[] {};
1005+
ObTableClientQueryAsyncStreamResult clientQueryAsyncStreamResult;
1006+
ObTableQueryAsyncRequest request;
1007+
ObTableQuery obTableQuery;
1008+
ObHTableFilter filter;
1009+
try {
1010+
if (scan.getFamilyMap().keySet() == null
1011+
|| scan.getFamilyMap().keySet().isEmpty()
1012+
|| scan.getFamilyMap().size() > 1) {
1013+
// In a Scan operation where the family map is greater than 1 or equal to 0,
1014+
// we handle this by appending the column family to the qualifier on the client side.
1015+
// The server can then use this information to filter the appropriate column families and qualifiers.
1016+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
1017+
processColumnFilters(columnFilters, scan.getFamilyMap());
1018+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1019+
scan.getMaxVersions(), columnFilters);
1020+
obTableQuery = buildObTableQuery(filter, scan);
1021+
List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
1022+
1023+
request = buildObTableQueryAsyncRequest(obTableQuery,
1024+
getTargetTableName(tableNameString));
1025+
String phyTableName = obTableClient.getPhyTableNameFromTableGroup(
1026+
request.getObTableQueryRequest(), tableNameString);
1027+
List<Partition> partitions = obTableClient.getPartition(phyTableName, false);
1028+
for (Partition partition : partitions) {
1029+
request.getObTableQueryRequest().setTableQueryPartId(
1030+
partition.getPartId());
1031+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1032+
.execute(request);
1033+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1034+
clientQueryAsyncStreamResult, tableNameString, family, true);
1035+
resultScanners.add(clientScanner);
1036+
}
1037+
return resultScanners;
1038+
} else {
1039+
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap()
1040+
.entrySet()) {
1041+
family = entry.getKey();
1042+
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
1043+
scan.getMaxVersions(), entry.getValue());
1044+
obTableQuery = buildObTableQuery(filter, scan);
1045+
1046+
List<ResultScanner> resultScanners = new ArrayList<ResultScanner>();
1047+
String targetTableName = getTargetTableName(tableNameString, Bytes.toString(family),
1048+
configuration);
1049+
request = buildObTableQueryAsyncRequest(obTableQuery, targetTableName);
1050+
List<Partition> partitions = obTableClient
1051+
.getPartition(targetTableName, false);
1052+
for (Partition partition : partitions) {
1053+
request.getObTableQueryRequest().setTableQueryPartId(
1054+
partition.getPartId());
1055+
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
1056+
.execute(request);
1057+
ClientStreamScanner clientScanner = new ClientStreamScanner(
1058+
clientQueryAsyncStreamResult, tableNameString, family, false);
1059+
resultScanners.add(clientScanner);
1060+
}
1061+
return resultScanners;
1062+
}
1063+
}
1064+
} catch (Exception e) {
1065+
logger.error(LCD.convert("01-00003"), tableNameString, Bytes.toString(family),
1066+
e);
1067+
throw new IOException("scan table:" + tableNameString + " family "
1068+
+ Bytes.toString(family) + " error.", e);
1069+
}
1070+
1071+
throw new IOException("scan table:" + tableNameString + "has no family");
1072+
}
1073+
};
1074+
return executeServerCallable(serverCallable);
1075+
}
1076+
9911077
@Override
9921078
public ResultScanner getScanner(final byte[] family) throws IOException {
9931079
Scan scan = new Scan();
@@ -1901,7 +1987,7 @@ private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
19011987
byte[] value = original.getValue();
19021988
long timestamp = original.getTimestamp();
19031989
byte type = original.getTypeByte();
1904-
// Create a new KeyValue with the modified qualifier
1990+
// Create a new KeyValue with the modified qualifier
19051991
return new KeyValue(row, family, newQualifier, timestamp, KeyValue.Type.codeToType(type),
19061992
value);
19071993
}
@@ -2059,8 +2145,8 @@ public static void checkFamilyViolation(Collection<byte[]> families, boolean che
20592145
}
20602146
}
20612147

2062-
// This method is currently only used for append and increment operations.
2063-
// It restricts these two methods to use multi-column family operations.
2148+
// This method is currently only used for append and increment operations.
2149+
// It restricts these two methods to use multi-column family operations.
20642150
// Note: After completing operations on multiple column families, they are deleted using the method described above.
20652151
public static void checkFamilyViolationForOneFamily(Collection<byte[]> families) {
20662152
if (families == null || families.size() == 0) {

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,11 @@ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
318318
return ohTable.getRowOrBefore(row, family);
319319
}
320320

321+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
322+
checkStatus();
323+
return ohTable.getScanners(scan);
324+
}
325+
321326
@Override
322327
public ResultScanner getScanner(Scan scan) throws IOException {
323328
checkStatus();

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,16 @@ public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
783783
return table.getRowOrBefore(row, family);
784784
}
785785

786+
public List<ResultScanner> getScanners(Scan scan) throws IOException {
787+
if (table instanceof OHTableClient) {
788+
return ((OHTableClient) table).getScanners(scan);
789+
} else if (table instanceof OHTable) {
790+
return ((OHTable) table).getScanners(scan);
791+
}
792+
793+
throw new IllegalArgumentException("just support for OHTable and OHTableClient");
794+
}
795+
786796
@Override
787797
public ResultScanner getScanner(Scan scan) throws IOException {
788798
return table.getScanner(scan);

0 commit comments

Comments
 (0)