Skip to content

Commit f0f4f2f

Browse files
committed
hbase 2.0.0-alpha4
1 parent 975f59e commit f0f4f2f

32 files changed

+822
-1177
lines changed

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@
4545
</distributionManagement>
4646

4747
<properties>
48-
<hadoop.version>2.7.7</hadoop.version>
49-
<hbase.version>2.0.6</hbase.version>
48+
<hadoop.version>3.1.0</hadoop.version>
49+
<hbase.version>2.0.0-alpha4</hbase.version>
5050
<java.source.version>1.8</java.source.version>
5151
<java.target.version>1.8</java.target.version>
5252
<junit.version>4.13.1</junit.version>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 58 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public class OHTable implements Table {
154154
/**
155155
* the buffer of put request
156156
*/
157-
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
157+
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
158158
/**
159159
* when the put request reach the write buffer size the do put will
160160
* flush commits automatically
@@ -459,8 +459,8 @@ private void finishSetUp() {
459459
}
460460

461461
public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
462-
OHConnectionConfiguration ohConnectionConf)
463-
throws IllegalArgumentException {
462+
OHConnectionConfiguration ohConnectionConf)
463+
throws IllegalArgumentException {
464464
if (tableNameString.indexOf(':') != -1) {
465465
String[] params = tableNameString.split(":");
466466
if (params.length != 2) {
@@ -500,13 +500,15 @@ public Configuration getConfiguration() {
500500

501501
@Override
502502
public HTableDescriptor getTableDescriptor() throws IOException {
503-
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
503+
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
504+
obTableClient);
504505
return executor.getTableDescriptor();
505506
}
506507

507508
@Override
508509
public TableDescriptor getDescriptor() throws IOException {
509-
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
510+
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
511+
obTableClient);
510512
return executor.getTableDescriptor();
511513
}
512514

@@ -620,8 +622,8 @@ private BatchOperation compatOldServerDel(final List<? extends Row> actions, fin
620622
} else if (delete.getFamilyCellMap().size() > 1) {
621623
boolean has_delete_family = delete.getFamilyCellMap().entrySet().stream()
622624
.flatMap(entry -> entry.getValue().stream()).anyMatch(
623-
kv -> kv.getType().getCode() == KeyValue.Type.DeleteFamily.getCode() ||
624-
kv.getType().getCode() == KeyValue.Type.DeleteFamilyVersion.getCode());
625+
kv -> kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() ||
626+
kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode());
625627
if (!has_delete_family) {
626628
return buildBatchOperation(tableNameString,
627629
Collections.singletonList(delete), true,
@@ -909,10 +911,12 @@ private void processColumnFilters(NavigableSet<byte[]> columnFilters,
909911
byte[] family = entry.getKey();
910912
if (entry.getValue() != null) {
911913
for (byte[] columnName : entry.getValue()) {
912-
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + columnName.length];
914+
byte[] newQualifier = new byte[family.length + 1/* length of "." */
915+
+ columnName.length];
913916
System.arraycopy(family, 0, newQualifier, 0, family.length);
914917
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
915-
System.arraycopy(columnName, 0, newQualifier, family.length + 1, columnName.length);
918+
System.arraycopy(columnName, 0, newQualifier, family.length + 1,
919+
columnName.length);
916920
columnFilters.add(newQualifier);
917921
}
918922
} else {
@@ -1402,8 +1406,8 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, Compa
14021406
}
14031407

14041408
@Override
1405-
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
1406-
return new ObCheckAndMutateBuilderImpl(row, family);
1409+
public long getRpcTimeout(TimeUnit unit) {
1410+
return getRpcTimeout();
14071411
}
14081412

14091413
private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier,
@@ -1723,6 +1727,11 @@ public int getOperationTimeout() {
17231727
return operationTimeout;
17241728
}
17251729

1730+
@Override
1731+
public long getOperationTimeout(TimeUnit unit) {
1732+
return getOperationTimeout();
1733+
}
1734+
17261735
//todo
17271736
@Override
17281737
public void setRpcTimeout(int rpcTimeout) {
@@ -1741,11 +1750,31 @@ public int getReadRpcTimeout() {
17411750
return this.readRpcTimeout;
17421751
}
17431752

1753+
@Override
1754+
public void setReadRpcTimeout(int readRpcTimeout) {
1755+
this.readRpcTimeout = readRpcTimeout;
1756+
}
1757+
1758+
@Override
1759+
public long getReadRpcTimeout(TimeUnit unit) {
1760+
return getReadRpcTimeout();
1761+
}
1762+
1763+
@Override
1764+
public long getWriteRpcTimeout(TimeUnit unit) {
1765+
return this.readRpcTimeout;
1766+
}
1767+
17441768
@Override
17451769
public int getWriteRpcTimeout() {
17461770
return this.writeRpcTimeout;
17471771
}
17481772

1773+
@Override
1774+
public void setWriteRpcTimeout(int writeRpcTimeout) {
1775+
this.writeRpcTimeout = writeRpcTimeout;
1776+
}
1777+
17491778
public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
17501779
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
17511780
}
@@ -1877,7 +1906,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa
18771906
if (columnQualifier == null) {
18781907
obHTableFilter.addSelectColumnQualifier(new byte[0]);
18791908
} else {
1880-
obHTableFilter.addSelectColumnQualifier(columnQualifier);
1909+
obHTableFilter.addSelectColumnQualifier(columnQualifier);
18811910
}
18821911
}
18831912
}
@@ -1935,11 +1964,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
19351964
filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily());
19361965
}
19371966
if (scan.isReversed()) {
1938-
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), scan.getStartRow(),
1939-
scan.includeStartRow(), true, ts);
1967+
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(),
1968+
scan.getStartRow(), scan.includeStartRow(), true, ts);
19401969
} else {
1941-
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
1942-
scan.includeStopRow(), false, ts);
1970+
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(),
1971+
scan.getStopRow(), scan.includeStopRow(), false, ts);
19431972
}
19441973
obTableQuery.setBatchSize(scan.getBatch());
19451974
obTableQuery.setLimit(scan.getLimit());
@@ -2016,13 +2045,14 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
20162045
private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
20172046
ObTableOperationType operationType,
20182047
boolean isTableGroup, byte[] family, Long TTL) {
2019-
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode());
2048+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
20202049
com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType,
20212050
isTableGroup, family, TTL);
2022-
if(isTableGroup) {
2051+
if (isTableGroup) {
20232052
// construct new_kv otherwise filter will fail to match targeted columns
20242053
byte[] oldQualifier = CellUtil.cloneQualifier(kv);
2025-
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length];
2054+
byte[] newQualifier = new byte[family.length + 1/* length of "." */
2055+
+ oldQualifier.length];
20262056
System.arraycopy(family, 0, newQualifier, 0, family.length);
20272057
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
20282058
System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length);
@@ -2105,12 +2135,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
21052135
range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(),
21062136
ObObj.getMax()));
21072137
if (!isTableGroup) {
2108-
filter = buildObHTableFilter(null,
2109-
new TimeRange(0, kv.getTimestamp() + 1),
2138+
filter = buildObHTableFilter(null, new TimeRange(0, kv.getTimestamp() + 1),
21102139
Integer.MAX_VALUE);
21112140
} else {
2112-
filter = buildObHTableFilter(null,
2113-
new TimeRange(0, kv.getTimestamp() + 1),
2141+
filter = buildObHTableFilter(null, new TimeRange(0, kv.getTimestamp() + 1),
21142142
Integer.MAX_VALUE, CellUtil.cloneQualifier(kv));
21152143
}
21162144
}
@@ -2130,13 +2158,14 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv,
21302158
Cell newCell = kv;
21312159
if (isTableGroup && family != null) {
21322160
byte[] oldQualifier = CellUtil.cloneQualifier(kv);
2133-
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length];
2161+
byte[] newQualifier = new byte[family.length + 1/* length of "." */
2162+
+ oldQualifier.length];
21342163
System.arraycopy(family, 0, newQualifier, 0, family.length);
21352164
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
21362165
System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length);
21372166
newCell = modifyQualifier(kv, newQualifier);
21382167
}
2139-
Cell.Type kvType = kv.getType();
2168+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
21402169
switch (kvType) {
21412170
case Put:
21422171
String[] propertyColumns = V_COLUMNS;
@@ -2182,7 +2211,7 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
21822211
byte[] family = CellUtil.cloneFamily(original);
21832212
byte[] value = CellUtil.cloneValue(original);
21842213
long timestamp = original.getTimestamp();
2185-
KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode());
2214+
KeyValue.Type type = KeyValue.Type.codeToType(original.getTypeByte());
21862215
// Create a new KeyValue with the modified qualifier
21872216
return new KeyValue(row, family, newQualifier, timestamp, type, value);
21882217
}
@@ -2312,7 +2341,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
23122341
public static ObTableOperation buildObTableOperation(Cell kv,
23132342
ObTableOperationType operationType,
23142343
Long TTL) {
2315-
Cell.Type kvType = kv.getType();
2344+
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
23162345
String[] propertyColumns = V_COLUMNS;
23172346
Object[] property = new Object[] { CellUtil.cloneValue(kv) };
23182347
if (TTL != Long.MAX_VALUE) {
@@ -2449,7 +2478,7 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
24492478
return new Pair<>(getStartKeys(), getEndKeys());
24502479
}
24512480

2452-
private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
2481+
public static CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
24532482
switch (cmpOp) {
24542483
case LESS:
24552484
return CompareFilter.CompareOp.LESS;
@@ -2467,96 +2496,4 @@ private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
24672496
return CompareFilter.CompareOp.NO_OP;
24682497
}
24692498
}
2470-
2471-
private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
2472-
private final byte[] row;
2473-
private final byte[] family;
2474-
private byte[] qualifier;
2475-
private byte[] value;
2476-
private TimeRange timeRange;
2477-
private CompareOperator cmpOp;
2478-
2479-
ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) {
2480-
this.row = checkNotNull(row, "The provided row is null.");
2481-
this.family = checkNotNull(family, "The provided family is null.");
2482-
}
2483-
2484-
@Override
2485-
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
2486-
this.qualifier = checkNotNull(
2487-
qualifier,
2488-
"The provided qualifier is null. You could"
2489-
+ " use an empty byte array, or do not call this method if you want a null qualifier.");
2490-
return this;
2491-
}
2492-
2493-
@Override
2494-
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
2495-
this.timeRange = timeRange;
2496-
return this;
2497-
}
2498-
2499-
@Override
2500-
public CheckAndMutateBuilder ifNotExists() {
2501-
this.cmpOp = CompareOperator.EQUAL;
2502-
this.value = null;
2503-
return this;
2504-
}
2505-
2506-
@Override
2507-
public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) {
2508-
this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null.");
2509-
this.value = checkNotNull(value, "The provided value is null.");
2510-
return this;
2511-
}
2512-
2513-
@Override
2514-
public boolean thenPut(Put put) throws IOException {
2515-
checkCmpOp();
2516-
RowMutations rowMutations = new RowMutations(row);
2517-
rowMutations.add(put);
2518-
try {
2519-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2520-
timeRange, rowMutations);
2521-
} catch (Exception e) {
2522-
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
2523-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2524-
+ e.getMessage() + " error.", e);
2525-
}
2526-
}
2527-
2528-
@Override
2529-
public boolean thenDelete(Delete delete) throws IOException {
2530-
checkCmpOp();
2531-
RowMutations rowMutations = new RowMutations(row);
2532-
rowMutations.add(delete);
2533-
try {
2534-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2535-
timeRange, rowMutations);
2536-
} catch (Exception e) {
2537-
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
2538-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2539-
+ e.getMessage() + " error.", e);
2540-
}
2541-
}
2542-
2543-
@Override
2544-
public boolean thenMutate(RowMutations mutation) throws IOException {
2545-
checkCmpOp();
2546-
try {
2547-
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
2548-
timeRange, mutation);
2549-
} catch (Exception e) {
2550-
logger.error(LCD.convert("01-00005"), mutation, tableNameString, e);
2551-
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
2552-
+ e.getMessage() + " error.", e);
2553-
}
2554-
}
2555-
2556-
private void checkCmpOp() {
2557-
checkNotNull(this.cmpOp,
2558-
"The compare condition is null. Please use"
2559-
+ " ifNotExists/ifEquals/ifMatches before executing the request");
2560-
}
2561-
}
2562-
}
2499+
}

0 commit comments

Comments
 (0)