Skip to content

Commit a8bb5d9

Browse files
authored
support reverse scan (#40)
1 parent 5e322b8 commit a8bb5d9

File tree

9 files changed

+436
-143
lines changed

9 files changed

+436
-143
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646

4747
<properties>
4848
<hadoop.version>1.2.1</hadoop.version>
49-
<hbase.version>0.94.27</hbase.version>
49+
<hbase.version>0.98.24-hadoop1</hbase.version>
5050
<java.source.version>1.8</java.source.version>
5151
<java.target.version>1.8</java.target.version>
5252
<junit.version>4.13.1</junit.version>
@@ -60,7 +60,7 @@
6060
<dependencies>
6161
<dependency>
6262
<groupId>org.apache.hbase</groupId>
63-
<artifactId>hbase</artifactId>
63+
<artifactId>hbase-client</artifactId>
6464
<version>${hbase.version}</version>
6565
<exclusions>
6666
<exclusion>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 76 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -39,27 +39,18 @@
3939
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
4040
import com.alipay.oceanbase.hbase.exception.OperationTimeoutException;
4141

42+
import com.google.protobuf.Descriptors;
43+
import com.google.protobuf.Message;
44+
import com.google.protobuf.Service;
45+
import com.google.protobuf.ServiceException;
4246
import org.apache.hadoop.conf.Configuration;
43-
import org.apache.hadoop.hbase.HConstants;
44-
import org.apache.hadoop.hbase.HTableDescriptor;
45-
import org.apache.hadoop.hbase.KeyValue;
46-
import org.apache.hadoop.hbase.client.Append;
47-
import org.apache.hadoop.hbase.client.Delete;
48-
import org.apache.hadoop.hbase.client.Get;
49-
import org.apache.hadoop.hbase.client.HTableInterface;
50-
import org.apache.hadoop.hbase.client.Increment;
51-
import org.apache.hadoop.hbase.client.Mutation;
52-
import org.apache.hadoop.hbase.client.Put;
53-
import org.apache.hadoop.hbase.client.Result;
54-
import org.apache.hadoop.hbase.client.ResultScanner;
55-
import org.apache.hadoop.hbase.client.Row;
56-
import org.apache.hadoop.hbase.client.RowLock;
57-
import org.apache.hadoop.hbase.client.RowMutations;
58-
import org.apache.hadoop.hbase.client.Scan;
47+
import org.apache.hadoop.hbase.*;
48+
import org.apache.hadoop.hbase.client.*;
5949
import org.apache.hadoop.hbase.client.coprocessor.Batch;
50+
import org.apache.hadoop.hbase.filter.CompareFilter;
6051
import org.apache.hadoop.hbase.filter.Filter;
6152
import org.apache.hadoop.hbase.io.TimeRange;
62-
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
53+
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
6354
import org.apache.hadoop.hbase.util.Bytes;
6455
import org.apache.hadoop.hbase.util.Pair;
6556
import org.slf4j.Logger;
@@ -344,6 +335,10 @@ public byte[] getTableName() {
344335
return tableName;
345336
}
346337

338+
public TableName getName() {
339+
return null;
340+
}
341+
347342
public Configuration getConfiguration() {
348343
return configuration;
349344
}
@@ -368,6 +363,10 @@ public boolean exists(Get get) throws IOException {
368363
return !r.isEmpty();
369364
}
370365

366+
public Boolean[] exists(List<Get> gets) throws IOException {
367+
throw new FeatureNotSupportedException("not supported yet'");
368+
}
369+
371370
public void batch(List<? extends Row> actions, Object[] results) {
372371
throw new FeatureNotSupportedException("not supported yet.");
373372
}
@@ -376,6 +375,14 @@ public Object[] batch(List<? extends Row> actions) {
376375
throw new FeatureNotSupportedException("not supported yet.");
377376
}
378377

378+
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
379+
throw new FeatureNotSupportedException("not supported yet'");
380+
}
381+
382+
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
383+
throw new FeatureNotSupportedException("not supported yet'");
384+
}
385+
379386
public void getKeyValueFromResult(AbstractQueryStreamResult clientQueryStreamResult,
380387
List<KeyValue> keyValueList, boolean isTableGroup,
381388
byte[] family) throws Exception {
@@ -503,9 +510,16 @@ public ResultScanner call() throws IOException {
503510
|| scan.getFamilyMap().keySet().size() == 0) {
504511
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
505512
scan.getMaxVersions(), null);
506-
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
507-
scan.getStopRow(), false, scan.getBatch());
508-
513+
if (scan.isReversed()) {
514+
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
515+
scan.getStartRow(), true, scan.getBatch());
516+
} else {
517+
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
518+
scan.getStopRow(), false, scan.getBatch());
519+
}
520+
if (scan.isReversed()) { // reverse scan 时设置为逆序
521+
obTableQuery.setScanOrder(ObScanOrder.Reverse);
522+
}
509523
request = buildObTableQueryAsyncRequest(obTableQuery, tableNameString);
510524
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
511525
.execute(request);
@@ -517,22 +531,16 @@ public ResultScanner call() throws IOException {
517531
family = entry.getKey();
518532
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
519533
scan.getMaxVersions(), entry.getValue());
520-
521-
// not support reverse scan.
522-
// 由于 HBase 接口与 OB 接口表达范围的差异,reverse scan 需要交换 startRow 和 stopRow
523-
// if (scan.getReversed()) {
524-
// obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
525-
// scan.getStartRow(), true, scan.getBatch());
526-
// } else {
527-
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
528-
scan.getStopRow(), false, scan.getBatch());
529-
// }
530-
531-
// not support reverse scan.
532-
// if (scan.getReversed()) { // reverse scan 时设置为逆序
533-
// obTableQuery.setScanOrder(ObScanOrder.Reverse);
534-
// }
535-
534+
if (scan.isReversed()) {
535+
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), false,
536+
scan.getStartRow(), true, scan.getBatch());
537+
} else {
538+
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), true,
539+
scan.getStopRow(), false, scan.getBatch());
540+
}
541+
if (scan.isReversed()) { // reverse scan 时设置为逆序
542+
obTableQuery.setScanOrder(ObScanOrder.Reverse);
543+
}
536544
// no support set maxResultSize.
537545
// obTableQuery.setMaxResultSize(scan.getMaxResultSize());
538546

@@ -802,19 +810,18 @@ public Result increment(Increment increment) throws IOException {
802810
List<byte[]> qualifiers = new ArrayList<byte[]>();
803811

804812
byte[] rowKey = increment.getRow();
805-
Map.Entry<byte[], NavigableMap<byte[], Long>> entry = increment.getFamilyMap()
806-
.entrySet().iterator().next();
813+
Map.Entry<byte[], List<Cell>> entry = increment.getFamilyCellMap()
814+
.entrySet().iterator().next();
807815

808816
byte[] f = entry.getKey();
809817

810818
ObTableBatchOperation batch = new ObTableBatchOperation();
811-
for (Map.Entry<byte[], Long> qualifiersIncrements : entry.getValue().entrySet()) {
812-
byte[] qualifier = qualifiersIncrements.getKey();
819+
entry.getValue().forEach(cell -> {
820+
byte[] qualifier = cell.getQualifier();
813821
qualifiers.add(qualifier);
814822
batch.addTableOperation(getInstance(INCREMENT, new Object[] { rowKey, qualifier,
815-
Long.MAX_VALUE }, V_COLUMNS,
816-
new Object[] { Bytes.toBytes(qualifiersIncrements.getValue()) }));
817-
}
823+
Long.MAX_VALUE }, V_COLUMNS, new Object[] { cell.getValue() }));
824+
});
818825

819826
ObHTableFilter filter = buildObHTableFilter(null, increment.getTimeRange(), 1,
820827
qualifiers);
@@ -890,6 +897,10 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
890897
}
891898
}
892899

900+
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
901+
throw new FeatureNotSupportedException("not supported yet'");
902+
}
903+
893904
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
894905
boolean writeToWAL) throws IOException {
895906
// WAL ignored
@@ -1001,31 +1012,18 @@ public void close() throws IOException {
10011012
}
10021013
}
10031014

1004-
public RowLock lockRow(byte[] row) {
1005-
throw new FeatureNotSupportedException("not supported yet.");
1015+
public CoprocessorRpcChannel coprocessorService(byte[] row) {
1016+
throw new FeatureNotSupportedException("not supported yet'");
10061017
}
10071018

1008-
public void unlockRow(RowLock rl) {
1009-
throw new FeatureNotSupportedException("not supported yet.");
1019+
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
1020+
throw new FeatureNotSupportedException("not supported yet'");
10101021
}
10111022

1012-
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row) {
1013-
throw new FeatureNotSupportedException("not supported yet.");
1023+
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable {
1024+
throw new FeatureNotSupportedException("not supported yet'");
10141025
}
10151026

1016-
public <T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(Class<T> protocol,
1017-
byte[] startKey,
1018-
byte[] endKey,
1019-
Batch.Call<T, R> callable) {
1020-
throw new FeatureNotSupportedException("not supported yet.");
1021-
}
1022-
1023-
public <T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol,
1024-
byte[] startKey, byte[] endKey,
1025-
Batch.Call<T, R> callable,
1026-
Batch.Callback<R> callback) {
1027-
throw new FeatureNotSupportedException("not supported yet.");
1028-
}
10291027

10301028
/**
10311029
* See {@link #setAutoFlush(boolean, boolean)}
@@ -1066,6 +1064,10 @@ public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
10661064
this.clearBufferOnFail = autoFlush || clearBufferOnFail;
10671065
}
10681066

1067+
public void setAutoFlushTo(boolean autoFlush) {
1068+
throw new FeatureNotSupportedException("not supported yet'");
1069+
}
1070+
10691071
/**
10701072
* Returns the maximum size in bytes of the write buffer for this HTable.
10711073
* <p>
@@ -1094,6 +1096,18 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
10941096
}
10951097
}
10961098

1099+
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
1100+
throw new FeatureNotSupportedException("not supported yet'");
1101+
}
1102+
1103+
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable {
1104+
throw new FeatureNotSupportedException("not supported yet'");
1105+
}
1106+
1107+
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
1108+
throw new FeatureNotSupportedException("not supported yet'");
1109+
}
1110+
10971111
public void setOperationTimeout(int operationTimeout) {
10981112
this.operationTimeout = operationTimeout;
10991113
this.operationExecuteInPool = this.configuration.getBoolean(

0 commit comments

Comments
 (0)