Skip to content

hbase client 2.0.0-alpha4 #239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: hbase_compat_3_2.0
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,16 @@
</distributionManagement>

<properties>
<hadoop.version>2.7.7</hadoop.version>
<hbase.version>2.0.6</hbase.version>
<hadoop.version>3.1.0</hadoop.version>
<hbase.version>2.0.0-alpha4</hbase.version>
<java.source.version>1.8</java.source.version>
<java.target.version>1.8</java.target.version>
<junit.version>4.13.1</junit.version>
<powermock.version>2.0.9</powermock.version>
<project.build.sourceEncoding>${project.encoding}</project.build.sourceEncoding>
<project.encoding>UTF-8</project.encoding>
<slf4j.version>1.7.21</slf4j.version>
<table.client.version>2.0.0</table.client.version>
<table.client.version>2.0.1-SNAPSHOT</table.client.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -198,6 +198,11 @@
<version>1.2.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.19.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
179 changes: 58 additions & 121 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public class OHTable implements Table {
/**
* the buffer of put request
*/
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
/**
* when the put request reach the write buffer size the do put will
* flush commits automatically
Expand Down Expand Up @@ -459,8 +459,8 @@ private void finishSetUp() {
}

public static OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
OHConnectionConfiguration ohConnectionConf)
throws IllegalArgumentException {
OHConnectionConfiguration ohConnectionConf)
throws IllegalArgumentException {
if (tableNameString.indexOf(':') != -1) {
String[] params = tableNameString.split(":");
if (params.length != 2) {
Expand Down Expand Up @@ -500,13 +500,15 @@ public Configuration getConfiguration() {

@Override
public HTableDescriptor getTableDescriptor() throws IOException {
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
obTableClient);
return executor.getTableDescriptor();
}

@Override
public TableDescriptor getDescriptor() throws IOException {
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString, obTableClient);
OHTableDescriptorExecutor executor = new OHTableDescriptorExecutor(tableNameString,
obTableClient);
return executor.getTableDescriptor();
}

Expand Down Expand Up @@ -620,8 +622,8 @@ private BatchOperation compatOldServerDel(final List<? extends Row> actions, fin
} else if (delete.getFamilyCellMap().size() > 1) {
boolean has_delete_family = delete.getFamilyCellMap().entrySet().stream()
.flatMap(entry -> entry.getValue().stream()).anyMatch(
kv -> kv.getType().getCode() == KeyValue.Type.DeleteFamily.getCode() ||
kv.getType().getCode() == KeyValue.Type.DeleteFamilyVersion.getCode());
kv -> kv.getTypeByte() == KeyValue.Type.DeleteFamily.getCode() ||
kv.getTypeByte() == KeyValue.Type.DeleteFamilyVersion.getCode());
if (!has_delete_family) {
return buildBatchOperation(tableNameString,
Collections.singletonList(delete), true,
Expand Down Expand Up @@ -909,10 +911,12 @@ private void processColumnFilters(NavigableSet<byte[]> columnFilters,
byte[] family = entry.getKey();
if (entry.getValue() != null) {
for (byte[] columnName : entry.getValue()) {
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + columnName.length];
byte[] newQualifier = new byte[family.length + 1/* length of "." */
+ columnName.length];
System.arraycopy(family, 0, newQualifier, 0, family.length);
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
System.arraycopy(columnName, 0, newQualifier, family.length + 1, columnName.length);
System.arraycopy(columnName, 0, newQualifier, family.length + 1,
columnName.length);
columnFilters.add(newQualifier);
}
} else {
Expand Down Expand Up @@ -1402,8 +1406,8 @@ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, Compa
}

@Override
public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
return new ObCheckAndMutateBuilderImpl(row, family);
public long getRpcTimeout(TimeUnit unit) {
return getRpcTimeout();
}

private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier,
Expand Down Expand Up @@ -1723,6 +1727,11 @@ public int getOperationTimeout() {
return operationTimeout;
}

@Override
public long getOperationTimeout(TimeUnit unit) {
return getOperationTimeout();
}

//todo
@Override
public void setRpcTimeout(int rpcTimeout) {
Expand All @@ -1741,11 +1750,31 @@ public int getReadRpcTimeout() {
return this.readRpcTimeout;
}

@Override
public void setReadRpcTimeout(int readRpcTimeout) {
this.readRpcTimeout = readRpcTimeout;
}

@Override
public long getReadRpcTimeout(TimeUnit unit) {
return getReadRpcTimeout();
}

@Override
public long getWriteRpcTimeout(TimeUnit unit) {
return this.readRpcTimeout;
}

@Override
public int getWriteRpcTimeout() {
return this.writeRpcTimeout;
}

@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
}

public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) {
this.obTableClient.setRuntimeBatchExecutor(runtimeBatchExecutor);
}
Expand Down Expand Up @@ -1877,7 +1906,7 @@ private ObHTableFilter buildObHTableFilter(byte[] filterString, TimeRange timeRa
if (columnQualifier == null) {
obHTableFilter.addSelectColumnQualifier(new byte[0]);
} else {
obHTableFilter.addSelectColumnQualifier(columnQualifier);
obHTableFilter.addSelectColumnQualifier(columnQualifier);
}
}
}
Expand Down Expand Up @@ -1935,11 +1964,11 @@ private ObTableQuery buildObTableQuery(ObHTableFilter filter, final Scan scan) {
filter.setOffsetPerRowPerCf(scan.getRowOffsetPerColumnFamily());
}
if (scan.isReversed()) {
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(), scan.getStartRow(),
scan.includeStartRow(), true, ts);
obTableQuery = buildObTableQuery(filter, scan.getStopRow(), scan.includeStopRow(),
scan.getStartRow(), scan.includeStartRow(), true, ts);
} else {
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
scan.includeStopRow(), false, ts);
obTableQuery = buildObTableQuery(filter, scan.getStartRow(), scan.includeStartRow(),
scan.getStopRow(), scan.includeStopRow(), false, ts);
}
obTableQuery.setBatchSize(scan.getBatch());
obTableQuery.setLimit(scan.getLimit());
Expand Down Expand Up @@ -2016,13 +2045,14 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
ObTableOperationType operationType,
boolean isTableGroup, byte[] family, Long TTL) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType().getCode());
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
com.alipay.oceanbase.rpc.mutation.Mutation tableMutation = buildMutation(kv, operationType,
isTableGroup, family, TTL);
if(isTableGroup) {
if (isTableGroup) {
// construct new_kv otherwise filter will fail to match targeted columns
byte[] oldQualifier = CellUtil.cloneQualifier(kv);
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length];
byte[] newQualifier = new byte[family.length + 1/* length of "." */
+ oldQualifier.length];
System.arraycopy(family, 0, newQualifier, 0, family.length);
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length);
Expand Down Expand Up @@ -2105,12 +2135,10 @@ private QueryAndMutate buildDeleteQueryAndMutate(KeyValue kv,
range.setEndKey(ObRowKey.getInstance(CellUtil.cloneRow(kv), ObObj.getMax(),
ObObj.getMax()));
if (!isTableGroup) {
filter = buildObHTableFilter(null,
new TimeRange(0, kv.getTimestamp() + 1),
filter = buildObHTableFilter(null, new TimeRange(0, kv.getTimestamp() + 1),
Integer.MAX_VALUE);
} else {
filter = buildObHTableFilter(null,
new TimeRange(0, kv.getTimestamp() + 1),
filter = buildObHTableFilter(null, new TimeRange(0, kv.getTimestamp() + 1),
Integer.MAX_VALUE, CellUtil.cloneQualifier(kv));
}
}
Expand All @@ -2130,13 +2158,14 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(Cell kv,
Cell newCell = kv;
if (isTableGroup && family != null) {
byte[] oldQualifier = CellUtil.cloneQualifier(kv);
byte[] newQualifier = new byte[family.length + 1/* length of "." */ + oldQualifier.length];
byte[] newQualifier = new byte[family.length + 1/* length of "." */
+ oldQualifier.length];
System.arraycopy(family, 0, newQualifier, 0, family.length);
newQualifier[family.length] = 0x2E; // 0x2E in utf-8 is "."
System.arraycopy(oldQualifier, 0, newQualifier, family.length + 1, oldQualifier.length);
newCell = modifyQualifier(kv, newQualifier);
}
Cell.Type kvType = kv.getType();
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
switch (kvType) {
case Put:
String[] propertyColumns = V_COLUMNS;
Expand Down Expand Up @@ -2182,7 +2211,7 @@ private KeyValue modifyQualifier(Cell original, byte[] newQualifier) {
byte[] family = CellUtil.cloneFamily(original);
byte[] value = CellUtil.cloneValue(original);
long timestamp = original.getTimestamp();
KeyValue.Type type = KeyValue.Type.codeToType(original.getType().getCode());
KeyValue.Type type = KeyValue.Type.codeToType(original.getTypeByte());
// Create a new KeyValue with the modified qualifier
return new KeyValue(row, family, newQualifier, timestamp, type, value);
}
Expand Down Expand Up @@ -2312,7 +2341,7 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
public static ObTableOperation buildObTableOperation(Cell kv,
ObTableOperationType operationType,
Long TTL) {
Cell.Type kvType = kv.getType();
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getTypeByte());
String[] propertyColumns = V_COLUMNS;
Object[] property = new Object[] { CellUtil.cloneValue(kv) };
if (TTL != Long.MAX_VALUE) {
Expand Down Expand Up @@ -2449,7 +2478,7 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
return new Pair<>(getStartKeys(), getEndKeys());
}

private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
public static CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
switch (cmpOp) {
case LESS:
return CompareFilter.CompareOp.LESS;
Expand All @@ -2467,96 +2496,4 @@ private CompareFilter.CompareOp getCompareOp(CompareOperator cmpOp) {
return CompareFilter.CompareOp.NO_OP;
}
}

private class ObCheckAndMutateBuilderImpl implements CheckAndMutateBuilder {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
private byte[] value;
private TimeRange timeRange;
private CompareOperator cmpOp;

ObCheckAndMutateBuilderImpl(byte[] row, byte[] family) {
this.row = checkNotNull(row, "The provided row is null.");
this.family = checkNotNull(family, "The provided family is null.");
}

@Override
public CheckAndMutateBuilder qualifier(byte[] qualifier) {
this.qualifier = checkNotNull(
qualifier,
"The provided qualifier is null. You could"
+ " use an empty byte array, or do not call this method if you want a null qualifier.");
return this;
}

@Override
public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
this.timeRange = timeRange;
return this;
}

@Override
public CheckAndMutateBuilder ifNotExists() {
this.cmpOp = CompareOperator.EQUAL;
this.value = null;
return this;
}

@Override
public CheckAndMutateBuilder ifMatches(CompareOperator cmpOp, byte[] value) {
this.cmpOp = checkNotNull(cmpOp, "The provided cmpOp is null.");
this.value = checkNotNull(value, "The provided value is null.");
return this;
}

@Override
public boolean thenPut(Put put) throws IOException {
checkCmpOp();
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(put);
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
timeRange, rowMutations);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

@Override
public boolean thenDelete(Delete delete) throws IOException {
checkCmpOp();
RowMutations rowMutations = new RowMutations(row);
rowMutations.add(delete);
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
timeRange, rowMutations);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), rowMutations, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
checkCmpOp();
try {
return checkAndMutation(row, family, qualifier, getCompareOp(cmpOp), value,
timeRange, mutation);
} catch (Exception e) {
logger.error(LCD.convert("01-00005"), mutation, tableNameString, e);
throw new IOException("checkAndMutate type table: " + tableNameString + " e.msg: "
+ e.getMessage() + " error.", e);
}
}

private void checkCmpOp() {
checkNotNull(this.cmpOp,
"The compare condition is null. Please use"
+ " ifNotExists/ifEquals/ifMatches before executing the request");
}
}
}
}
Loading