36
36
import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .query .*;
37
37
import com .alipay .oceanbase .rpc .protocol .payload .impl .execute .syncquery .ObTableQueryAsyncRequest ;
38
38
import com .alipay .oceanbase .rpc .stream .ObTableClientQueryAsyncStreamResult ;
39
+ import com .alipay .oceanbase .rpc .stream .ObTableClientQueryStreamResult ;
39
40
import com .alipay .oceanbase .rpc .table .ObHBaseParams ;
40
41
import com .alipay .oceanbase .rpc .table .ObKVParams ;
41
42
import com .alipay .sofa .common .thread .SofaThreadPoolExecutor ;
@@ -856,8 +857,6 @@ public Result get(final Get get) throws IOException {
856
857
public Result call () throws IOException {
857
858
List <Cell > keyValueList = new ArrayList <>();
858
859
byte [] family = new byte [] {};
859
- ObTableClientQueryAsyncStreamResult clientQueryStreamResult ;
860
- ObTableQueryAsyncRequest request ;
861
860
ObTableQuery obTableQuery ;
862
861
try {
863
862
if (get .getFamilyMap ().keySet () == null
@@ -872,10 +871,10 @@ public Result call() throws IOException {
872
871
NavigableSet <byte []> columnFilters = new TreeSet <>(Bytes .BYTES_COMPARATOR );
873
872
processColumnFilters (columnFilters , get .getFamilyMap ());
874
873
obTableQuery = buildObTableQuery (get , columnFilters );
875
- request = buildObTableQueryAsyncRequest (obTableQuery ,
874
+ ObTableQueryAsyncRequest request = buildObTableQueryAsyncRequest (obTableQuery ,
876
875
getTargetTableName (tableNameString ));
877
876
878
- clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
877
+ ObTableClientQueryAsyncStreamResult clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
879
878
.execute (request );
880
879
getMaxRowFromResult (clientQueryStreamResult , keyValueList , true , family );
881
880
} else {
@@ -894,10 +893,10 @@ public Result call() throws IOException {
894
893
}
895
894
}
896
895
obTableQuery = buildObTableQuery (get , entry .getValue ());
897
- request = buildObTableQueryAsyncRequest (obTableQuery ,
896
+ ObTableQueryRequest request = buildObTableQueryRequest (obTableQuery ,
898
897
getTargetTableName (tableNameString , Bytes .toString (family ),
899
898
configuration ));
900
- clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult ) obTableClient
899
+ ObTableClientQueryStreamResult clientQueryStreamResult = (ObTableClientQueryStreamResult ) obTableClient
901
900
.execute (request );
902
901
getMaxRowFromResult (clientQueryStreamResult , keyValueList , false ,
903
902
family );
@@ -1923,6 +1922,15 @@ public static ObTableOperation buildObTableOperation(Cell kv, boolean putToAppen
1923
1922
}
1924
1923
}
1925
1924
1925
+ private ObTableQueryRequest buildObTableQueryRequest (ObTableQuery obTableQuery ,
1926
+ String targetTableName ) {
1927
+ ObTableQueryRequest request = new ObTableQueryRequest ();
1928
+ request .setEntityType (ObTableEntityType .HKV );
1929
+ request .setTableQuery (obTableQuery );
1930
+ request .setTableName (targetTableName );
1931
+ return request ;
1932
+ }
1933
+
1926
1934
private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest (ObTableQuery obTableQuery ,
1927
1935
String targetTableName ) {
1928
1936
ObTableQueryRequest request = new ObTableQueryRequest ();
0 commit comments