Skip to content

Commit 46e71b7

Browse files
committed
hbase client 2.0.0-alpha4
1 parent d192cd4 commit 46e71b7

19 files changed

+606
-912
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
</distributionManagement>
4646

4747
<properties>
48-
<hadoop.version>2.7.7</hadoop.version>
49-
<hbase.version>2.0.6</hbase.version>
48+
<hadoop.version>3.1.0</hadoop.version>
49+
<hbase.version>2.0.0-alpha4</hbase.version>
5050
<java.source.version>1.8</java.source.version>
5151
<java.target.version>1.8</java.target.version>
5252
<junit.version>4.13.1</junit.version>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 35 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -620,8 +620,8 @@ private BatchOperation compatOldServerDel(final List<? extends Row> actions, fin
620620
} else if (delete.getFamilyCellMap().size() > 1) {
621621
boolean has_delete_family = delete.getFamilyCellMap().entrySet().stream()
622622
.flatMap(entry -> entry.getValue().stream()).anyMatch(
623-
kv -> kv.getType().getCode() == KeyValue.Type.DeleteFamily.getCode() ||
624-
kv.getType().getCode() == KeyValue.Type.DeleteFamilyVersion.getCode());
623+
kv -> kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() ||
624+
kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode());
625625
if (!has_delete_family) {
626626
return buildBatchOperation(tableNameString,
627627
Collections.singletonList(delete), true,
@@ -1402,8 +1402,8 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, Compa
14021402
}
14031403

14041404
@Override
1405-
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
1406-
return new ObCheckAndMutateBuilderImpl(row, family);
1405+
public long getRpcTimeout(TimeUnit unit) {
1406+
return getRpcTimeout();
14071407
}
14081408

14091409
private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier,
@@ -1723,6 +1723,11 @@ public int getOperationTimeout() {
17231723
return operationTimeout;
17241724
}
17251725

1726+
@Override
1727+
public long getOperationTimeout(TimeUnit unit) {
1728+
return getOperationTimeout();
1729+
}
1730+
17261731
//todo
17271732
@Override
17281733
public void setRpcTimeout(int rpcTimeout) {
@@ -1741,11 +1746,31 @@ public int getReadRpcTimeout() {
17411746
return this.readRpcTimeout;
17421747
}
17431748

1749+
@Override
1750+
public void setReadRpcTimeout(int readRpcTimeout) {
1751+
this.readRpcTimeout = readRpcTimeout;
1752+
}
1753+
@Override
1754+
public long getReadRpcTimeout(TimeUnit unit) {
1755+
return getReadRpcTimeout();
1756+
}
1757+
1758+
@Override
1759+
public long getWriteRpcTimeout(TimeUnit unit) {
1760+
return this.readRpcTimeout;
1761+
}
1762+
17441763
@Override
17451764
public int getWriteRpcTimeout() {
17461765
return this.writeRpcTimeout;
17471766
}
17481767

1768+
@Override
1769+
public void setWriteRpcTimeout(int writeRpcTimeout) {
1770+
this.writeRpcTimeout = writeRpcTimeout;
1771+
}
1772+
1773+
17491774
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
17501775
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
17511776
}
@@ -2016,7 +2041,7 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
20162041
private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
20172042
ObTableOperationType operationType,
20182043
boolean isTableGroup, byte[] family, Long TTL) {
2019-
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode());
2044+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
20202045
com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType,
20212046
isTableGroup, family, TTL);
20222047
if(isTableGroup) {
@@ -2136,7 +2161,7 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv,
21362161
System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length);
21372162
newCell = modifyQualifier(kv, newQualifier);
21382163
}
2139-
Cell.Type kvType = kv.getType();
2164+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
21402165
switch (kvType) {
21412166
case Put:
21422167
String[] propertyColumns = V_COLUMNS;
@@ -2182,7 +2207,7 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
21822207
byte[] family = CellUtil.cloneFamily(original);
21832208
byte[] value = CellUtil.cloneValue(original);
21842209
long timestamp = original.getTimestamp();
2185-
KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode());
2210+
KeyValue.Type type = KeyValue.Type.codeToType(original.getTypeByte());
21862211
// Create a new KeyValue with the modified qualifier
21872212
return new KeyValue(row, family, newQualifier, timestamp, type, value);
21882213
}
@@ -2312,7 +2337,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
23122337
public static ObTableOperation buildObTableOperation(Cell kv,
23132338
ObTableOperationType operationType,
23142339
Long TTL) {
2315-
Cell.Type kvType = kv.getType();
2340+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
23162341
String[] propertyColumns = V_COLUMNS;
23172342
Object[] property = new Object[] { CellUtil.cloneValue(kv) };
23182343
if (TTL != Long.MAX_VALUE) {
@@ -2449,7 +2474,7 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
24492474
return new Pair<>(getStartKeys(), getEndKeys());
24502475
}
24512476

2452-
private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
2477+
public static CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
24532478
switch (cmpOp) {
24542479
case LESS:
24552480
return CompareFilter.CompareOp.LESS;
@@ -2467,96 +2492,4 @@ private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
24672492
return CompareFilter.CompareOp.NO_OP;
24682493
}
24692494
}
2470-
2471-
private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
2472-
private final byte[] row;
2473-
private final byte[] family;
2474-
private byte[] qualifier;
2475-
private byte[] value;
2476-
private TimeRange timeRange;
2477-
private CompareOperator cmpOp;
2478-
2479-
ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) {
2480-
this.row = checkNotNull(row, "The provided row is null.");
2481-
this.family = checkNotNull(family, "The provided family is null.");
2482-
}
2483-
2484-
@Override
2485-
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
2486-
this.qualifier = checkNotNull(
2487-
qualifier,
2488-
"The provided qualifier is null. You could"
2489-
+ " use an empty byte array, or do not call this method if you want a null qualifier.");
2490-
return this;
2491-
}
2492-
2493-
@Override
2494-
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
2495-
this.timeRange = timeRange;
2496-
return this;
2497-
}
2498-
2499-
@Override
2500-
public CheckAndMutateBuilder ifNotExists() {
2501-
this.cmpOp = CompareOperator.EQUAL;
2502-
this.value = null;
2503-
return this;
2504-
}
2505-
2506-
@Override
2507-
public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) {
2508-
this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null.");
2509-
this.value = checkNotNull(value, "The provided value is null.");
2510-
return this;
2511-
}
2512-
2513-
@Override
2514-
public boolean thenPut(Put put) throws IOException {
2515-
checkCmpOp();
2516-
RowMutations rowMutations = new RowMutations(row);
2517-
rowMutations.add(put);
2518-
try {
2519-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2520-
timeRange, rowMutations);
2521-
} catch (Exception e) {
2522-
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
2523-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2524-
+ e.getMessage() + " error.", e);
2525-
}
2526-
}
2527-
2528-
@Override
2529-
public boolean thenDelete(Delete delete) throws IOException {
2530-
checkCmpOp();
2531-
RowMutations rowMutations = new RowMutations(row);
2532-
rowMutations.add(delete);
2533-
try {
2534-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2535-
timeRange, rowMutations);
2536-
} catch (Exception e) {
2537-
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
2538-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2539-
+ e.getMessage() + " error.", e);
2540-
}
2541-
}
2542-
2543-
@Override
2544-
public boolean thenMutate(RowMutations mutation) throws IOException {
2545-
checkCmpOp();
2546-
try {
2547-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2548-
timeRange, mutation);
2549-
} catch (Exception e) {
2550-
logger.error(LCD.convert("01-00005"), mutation, tableNameString, e);
2551-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2552-
+ e.getMessage() + " error.", e);
2553-
}
2554-
}
2555-
2556-
private void checkCmpOp() {
2557-
checkNotNull(this.cmpOp,
2558-
"The compare condition is null. Please use"
2559-
+ " ifNotExists/ifEquals/ifMatches before executing the request");
2560-
}
2561-
}
2562-
}
2495+
}

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.protobuf.Service;
2424
import com.google.protobuf.ServiceException;
2525
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.CompareOperator;
2627
import org.apache.hadoop.hbase.HTableDescriptor;
2728
import org.apache.hadoop.hbase.TableName;
2829
import org.apache.hadoop.hbase.client.*;
@@ -35,8 +36,11 @@
3536
import java.util.List;
3637
import java.util.Map;
3738
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.TimeUnit;
3840
import java.util.concurrent.locks.ReentrantLock;
3941

42+
import static com.alipay.oceanbase.hbase.OHTable.getCompareOp;
43+
4044
public class OHTableClient implements Table, Lifecycle {
4145
private byte[] tableName;
4246
private String tableNameString;
@@ -174,9 +178,14 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
174178
}
175179

176180
@Override
177-
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
181+
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
182+
return checkAndMutate(row, family, qualifier, getCompareOp(op), value, mutation);
183+
}
184+
185+
@Override
186+
public long getRpcTimeout(TimeUnit unit) {
178187
checkStatus();
179-
return ohTable.checkAndMutate(row, family);
188+
return ohTable.getRpcTimeout(unit);
180189
}
181190

182191
@Override
@@ -197,6 +206,48 @@ public void setRpcTimeout(int i) {
197206
ohTable.setRpcTimeout(i);
198207
}
199208

209+
@Override
210+
public long getReadRpcTimeout(TimeUnit unit) {
211+
checkStatus();
212+
return ohTable.getReadRpcTimeout(unit);
213+
}
214+
215+
@Override
216+
public int getReadRpcTimeout() {
217+
checkStatus();
218+
return ohTable.getReadRpcTimeout();
219+
}
220+
221+
@Override
222+
public void setReadRpcTimeout(int readRpcTimeout) {
223+
checkStatus();
224+
ohTable.setReadRpcTimeout(readRpcTimeout);
225+
}
226+
227+
@Override
228+
public long getWriteRpcTimeout(TimeUnit unit) {
229+
checkStatus();
230+
return ohTable.getWriteRpcTimeout(unit);
231+
}
232+
233+
@Override
234+
public int getWriteRpcTimeout() {
235+
checkStatus();
236+
return ohTable.getWriteRpcTimeout();
237+
}
238+
239+
@Override
240+
public void setWriteRpcTimeout(int writeRpcTimeout) {
241+
checkStatus();
242+
ohTable.setWriteRpcTimeout(writeRpcTimeout);
243+
}
244+
245+
@Override
246+
public long getOperationTimeout(TimeUnit unit) {
247+
checkStatus();
248+
return ohTable.getOperationTimeout(unit);
249+
}
250+
200251
@Override
201252
public int getRpcTimeout() {
202253
checkStatus();
@@ -323,6 +374,11 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
323374
return ohTable.checkAndPut(row, family, qualifier, compareOp, value, put);
324375
}
325376

377+
@Override
378+
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
379+
return checkAndPut(row, family, qualifier, getCompareOp(op), value, put);
380+
}
381+
326382
@Override
327383
public void delete(Delete delete) throws IOException {
328384
checkStatus();
@@ -350,6 +406,11 @@ public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
350406
return ohTable.checkAndDelete(row, family, qualifier, compareOp, value, delete);
351407
}
352408

409+
@Override
410+
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
411+
return ohTable.checkAndDelete(row, family, qualifier, getCompareOp(op), value, delete);
412+
}
413+
353414
// Not support.
354415
@Override
355416
public void mutateRow(RowMutations rm) throws IOException {

0 commit comments

Comments
 (0)