Skip to content

Commit d2c33c1

Browse files
committed
add ObParams
1 parent 6966c4f commit d2c33c1

File tree

3 files changed

+168
-13
lines changed

3 files changed

+168
-13
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 58 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@
2626
import com.alipay.oceanbase.hbase.util.ObTableClientManager;
2727
import com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory;
2828
import com.alipay.oceanbase.rpc.ObTableClient;
29+
import com.alipay.oceanbase.rpc.filter.ObHBaseParams;
30+
import com.alipay.oceanbase.rpc.filter.ObParamsBase;
31+
import com.alipay.oceanbase.rpc.filter.ObParams;
2932
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
3033
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
3134
import com.alipay.oceanbase.rpc.property.Property;
@@ -181,6 +184,8 @@ public class OHTable implements HTableInterface {
181184
*/
182185
private final Configuration configuration;
183186

187+
private int scannerTimeout;
188+
184189
/**
185190
* Creates an object to access a HBase table.
186191
* Shares oceanbase table obTableClient and other resources with other OHTable instances
@@ -203,6 +208,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
203208
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
204209
this.keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
205210
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
211+
HBaseConfiguration.getInt(configuration, HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
212+
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
213+
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
206214
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
207215
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(configuration);
208216

@@ -364,8 +372,9 @@ public HTableDescriptor getTableDescriptor() {
364372
* @throws IOException e
365373
*/
366374
public boolean exists(Get get) throws IOException {
375+
get.setCheckExistenceOnly(true);
367376
Result r = get(get);
368-
return !r.isEmpty();
377+
return r.getExists();
369378
}
370379

371380
@Override
@@ -380,8 +389,8 @@ public boolean[] existsAll(List<Get> list) throws IOException {
380389
// todo: Optimize after CheckExistenceOnly is finished
381390
Result[] r = get(list);
382391
boolean[] ret = new boolean[r.length];
383-
for (int i = 0; i < r.length; ++i){
384-
ret[i] = !r[i].isEmpty();
392+
for (int i = 0; i < list.size(); ++i) {
393+
ret[i] = exists(list.get(i));
385394
}
386395
return ret;
387396
}
@@ -470,10 +479,20 @@ public Result call() throws IOException {
470479
get.getMaxVersions(), null);
471480
obTableQuery = buildObTableQuery(filter, get.getRow(), true, get.getRow(),
472481
true, -1);
482+
obTableQuery.setObParams(buildObHBaseParams(null, get));
473483
request = buildObTableQueryRequest(obTableQuery, getTargetTableName(tableNameString));
474484

475485
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
476486
.execute(request);
487+
if (get.isCheckExistenceOnly() ) {
488+
Result result = new Result();
489+
if (clientQueryStreamResult.getCacheRows().size() != 0) {
490+
result.setExists(true);
491+
} else {
492+
result.setExists(false);
493+
}
494+
return result;
495+
}
477496
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
478497
} else {
479498
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
@@ -492,10 +511,20 @@ public Result call() throws IOException {
492511
get.getRow(), true, -1);
493512
}
494513

514+
obTableQuery.setObParams(buildObHBaseParams(null, get));
495515
request = buildObTableQueryRequest(obTableQuery,
496516
getTargetTableName(tableNameString, Bytes.toString(family)));
497517
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
498518
.execute(request);
519+
if (get.isCheckExistenceOnly() ) {
520+
Result result = new Result();
521+
if (clientQueryStreamResult.getCacheRows().size() != 0) {
522+
result.setExists(true);
523+
} else {
524+
result.setExists(false);
525+
}
526+
return result;
527+
}
499528
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
500529
family);
501530
}
@@ -555,17 +584,18 @@ public ResultScanner call() throws IOException {
555584
scan.getMaxVersions(), null);
556585
if (scan.isReversed()) {
557586
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
558-
scan.getStartRow(), true, scan.getBatch());
587+
scan.getStartRow(), true, scan.getCaching());
559588
} else {
560589
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
561-
scan.getStopRow(), false, scan.getBatch());
590+
scan.getStopRow(), false, scan.getCaching());
562591
}
563592
if (scan.isReversed()) { // reverse scan 时设置为逆序
564593
obTableQuery.setScanOrder(ObScanOrder.Reverse);
565594
}
566595
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
567596
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
568597
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
598+
obTableQuery.setObParams(buildObHBaseParams(scan, null));
569599
request = buildObTableQueryAsyncRequest(obTableQuery, getTargetTableName(tableNameString));
570600
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
571601
.execute(request);
@@ -579,20 +609,19 @@ public ResultScanner call() throws IOException {
579609
scan.getMaxVersions(), entry.getValue());
580610
if (scan.isReversed()) {
581611
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
582-
scan.getStartRow(), true, scan.getBatch());
612+
scan.getStartRow(), true, scan.getCaching());
583613
} else {
584614
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
585-
scan.getStopRow(), false, scan.getBatch());
615+
scan.getStopRow(), false, scan.getCaching());
586616
}
587617
if (scan.isReversed()) { // reverse scan 时设置为逆序
588618
obTableQuery.setScanOrder(ObScanOrder.Reverse);
589619
}
590620

591-
// no support set maxResultSize.
592621
obTableQuery.setMaxResultSize(scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : conf.getLong(
593622
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
594623
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE));
595-
624+
obTableQuery.setObParams(buildObHBaseParams(scan, null));
596625
request = buildObTableQueryAsyncRequest(obTableQuery,
597626
getTargetTableName(tableNameString, Bytes.toString(family)));
598627
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
@@ -614,6 +643,24 @@ public ResultScanner call() throws IOException {
614643
return executeServerCallable(serverCallable);
615644
}
616645

646+
public ObParams buildObHBaseParams(Scan scan, Get get) {
647+
ObParams obParams = new ObParams();
648+
ObHBaseParams obHBaseParams = new ObHBaseParams();
649+
if (scan != null) {
650+
obHBaseParams.setBatch(scan.getBatch());
651+
obHBaseParams.setCallTimeout(scannerTimeout);
652+
obHBaseParams.setRaw(scan.isRaw());
653+
obHBaseParams.setCacheBlock(scan.isGetScan());
654+
obHBaseParams.setAllowPartialResults(scan.getAllowPartialResults());
655+
}
656+
if (get != null) {
657+
obHBaseParams.setCheckExistenceOnly(get.isCheckExistenceOnly());
658+
obHBaseParams.setCacheBlock(get.getCacheBlocks());
659+
}
660+
obParams.setObParamsBase(obHBaseParams);
661+
return obParams;
662+
}
663+
617664
public ResultScanner getScanner(final byte[] family) throws IOException {
618665
Scan scan = new Scan();
619666
scan.addFamily(family);
@@ -1510,10 +1557,10 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
15101557
return;
15111558
}
15121559
this.obTableClient.getOrRefreshTableEntry(
1513-
getNormalTargetTableName(tableNameString, familyString), true, true);
1560+
getNormalTargetTableName(tableNameString, familyString), true, true, false);
15141561
if (hasTestLoad) {
15151562
this.obTableClient.getOrRefreshTableEntry(
1516-
getTestLoadTargetTableName(tableNameString, familyString), true, true);
1563+
getTestLoadTargetTableName(tableNameString, familyString), true, true, false);
15171564
}
15181565
}
15191566

src/main/java/com/alipay/oceanbase/hbase/result/ClientStreamScanner.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ public Result next() throws IOException {
106106
KeyValue startKeyValue = new KeyValue(sk, family, sq, st, sv);
107107
List<KeyValue> keyValues = new ArrayList<KeyValue>();
108108
keyValues.add(startKeyValue);
109-
110-
while (streamNext = streamResult.next()) {
109+
//
110+
int size = streamResult.getCacheRows().size();
111+
int current = 0;
112+
while (streamNext = streamResult.next() && current < size){
111113
List<ObObj> row = streamResult.getRow();
112114
if (this.isTableGroup) {
113115
// split family and qualifier
@@ -124,6 +126,7 @@ public Result next() throws IOException {
124126
if (Arrays.equals(sk, k)) {
125127
// when rowKey is equal to the previous rowKey ,merge the result into the same result
126128
keyValues.add(new KeyValue(k, family, q, t, v));
129+
current++;
127130
} else {
128131
break;
129132
}

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.alipay.oceanbase.hbase.exception.FeatureNotSupportedException;
2121

22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.AbstractQueryStreamResult;
2223
import org.apache.hadoop.hbase.Cell;
2324
import org.apache.hadoop.hbase.CellUtil;
2425
import org.apache.hadoop.hbase.HConstants;
@@ -51,6 +52,110 @@ public abstract class HTableTestBase {
5152

5253
protected HTableInterface hTable;
5354

55+
@Test
56+
// todo: heyu
57+
public void testScanWithObParams() throws Exception {
58+
String key1 = "scanKey1x";
59+
String key2 = "scanKey2x";
60+
String key3 = "scanKey3x";
61+
String key4 = "scanKey4x";
62+
String column1 = "column1";
63+
String column2 = "column2";
64+
String value1 = "value1";
65+
String value2 = "value2";
66+
String family = "family1";
67+
68+
// delete previous data
69+
Delete deleteKey1Family = new Delete(toBytes(key1));
70+
deleteKey1Family.deleteFamily(toBytes(family));
71+
Delete deleteKey2Family = new Delete(toBytes(key2));
72+
deleteKey2Family.deleteFamily(toBytes(family));
73+
Delete deleteKey3Family = new Delete(toBytes(key3));
74+
deleteKey3Family.deleteFamily(toBytes(family));
75+
Delete deleteKey4Family = new Delete(toBytes(key4));
76+
deleteKey4Family.deleteFamily(toBytes(family));
77+
78+
hTable.delete(deleteKey1Family);
79+
hTable.delete(deleteKey2Family);
80+
hTable.delete(deleteKey3Family);
81+
hTable.delete(deleteKey4Family);
82+
83+
Put putKey1Column1Value1 = new Put(toBytes(key1));
84+
putKey1Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
85+
86+
Put putKey1Column1Value2 = new Put(toBytes(key1));
87+
putKey1Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));
88+
89+
Put putKey1Column2Value2 = new Put(toBytes(key1));
90+
putKey1Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));
91+
92+
Put putKey1Column2Value1 = new Put(toBytes(key1));
93+
putKey1Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
94+
95+
Put putKey2Column1Value1 = new Put(toBytes(key2));
96+
putKey2Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
97+
98+
Put putKey2Column1Value2 = new Put(toBytes(key2));
99+
putKey2Column1Value2.add(toBytes(family), toBytes(column1), toBytes(value2));
100+
101+
Put putKey2Column2Value2 = new Put(toBytes(key2));
102+
putKey2Column2Value2.add(toBytes(family), toBytes(column2), toBytes(value2));
103+
104+
Put putKey2Column2Value1 = new Put(toBytes(key2));
105+
putKey2Column2Value1.add(toBytes(family), toBytes(column2), toBytes(value1));
106+
107+
Put putKey3Column1Value1 = new Put(toBytes(key3));
108+
putKey3Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
109+
110+
Put putKey4Column1Value1 = new Put(toBytes(key4));
111+
putKey4Column1Value1.add(toBytes(family), toBytes(column1), toBytes(value1));
112+
113+
tryPut(hTable, putKey1Column1Value1);
114+
tryPut(hTable, putKey1Column1Value2);
115+
tryPut(hTable, putKey1Column1Value1); // 2 * putKey1Column1Value1
116+
tryPut(hTable, putKey1Column2Value1);
117+
tryPut(hTable, putKey1Column2Value2);
118+
tryPut(hTable, putKey1Column2Value1); // 2 * putKey1Column2Value1
119+
tryPut(hTable, putKey1Column2Value2); // 2 * putKey1Column2Value2
120+
tryPut(hTable, putKey2Column2Value1);
121+
tryPut(hTable, putKey2Column2Value2);
122+
tryPut(hTable, putKey3Column1Value1);
123+
tryPut(hTable, putKey4Column1Value1);
124+
125+
Scan scan;
126+
127+
scan = new Scan();
128+
scan.addFamily(family.getBytes());
129+
scan.setStartRow("scanKey1x".getBytes());
130+
scan.setStopRow("scanKey5x".getBytes());
131+
scan.setMaxVersions(10);
132+
scan.setCaching(1);
133+
scan.setBatch(3);
134+
ResultScanner scanner = hTable.getScanner(scan);
135+
Result result = scanner.next();
136+
Assert.assertEquals(3, result.size());
137+
scanner.close();
138+
139+
scan.setMaxResultSize(10);
140+
scan.setBatch(-1);
141+
ResultScanner scanner1 = hTable.getScanner(scan);
142+
result = scanner1.next();
143+
Assert.assertEquals(7, result.size()); // 返回第一行全部数据,因为不允许行内部分返回
144+
145+
scanner1.close();
146+
147+
scan.setAllowPartialResults(true);
148+
ResultScanner scanner2 = hTable.getScanner(scan);
149+
result = scanner2.next();
150+
Assert.assertEquals(1, result.size());
151+
152+
153+
hTable.delete(deleteKey1Family);
154+
hTable.delete(deleteKey2Family);
155+
hTable.delete(deleteKey3Family);
156+
hTable.delete(deleteKey4Family);
157+
}
158+
54159
@Test
55160
public void testTableGroup() throws IOError, IOException {
56161
/*

0 commit comments

Comments
 (0)