Skip to content

Commit e040d75

Browse files
JackShi148stuBirdFly
authored andcommitted
OHBufferedMutator in OBKV Hbase 1_x_comp (#64)
* init bufferedMutator * finish validateFamily and asyncExecute * correct log in OHBufferedMutatorImpl * pass self-test * format code * add retry when batch fails * remove test print * format code * make interface more generalized * format BufferedMutator test case * remove redundancy, add some comments * fix type of a bufferedMutator. Optimize by review
1 parent cf33383 commit e040d75

File tree

4 files changed

+788
-37
lines changed

4 files changed

+788
-37
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -520,8 +520,10 @@ public Result call() throws IOException {
520520
.entrySet()) {
521521
family = entry.getKey();
522522
obTableQuery = buildObTableQuery(get, entry.getValue());
523-
request = buildObTableQueryRequest(obTableQuery,
524-
getTargetTableName(tableNameString, Bytes.toString(family)));
523+
request = buildObTableQueryRequest(
524+
obTableQuery,
525+
getTargetTableName(tableNameString, Bytes.toString(family),
526+
configuration));
525527
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
526528
.execute(request);
527529
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
@@ -598,8 +600,10 @@ public ResultScanner call() throws IOException {
598600
scan.getMaxVersions(), entry.getValue());
599601
obTableQuery = buildObTableQuery(filter, scan);
600602

601-
request = buildObTableQueryAsyncRequest(obTableQuery,
602-
getTargetTableName(tableNameString, Bytes.toString(family)));
603+
request = buildObTableQueryAsyncRequest(
604+
obTableQuery,
605+
getTargetTableName(tableNameString, Bytes.toString(family),
606+
configuration));
603607
clientQueryAsyncStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
604608
.execute(request);
605609
return new ClientStreamScanner(clientQueryAsyncStreamResult,
@@ -749,7 +753,7 @@ private void innerDelete(Delete delete) throws IOException {
749753
.next();
750754

751755
BatchOperation batch = buildBatchOperation(
752-
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
756+
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
753757
entry.getValue(), false, null);
754758
BatchOperationResult results = batch.execute();
755759

@@ -851,7 +855,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
851855
ObTableBatchOperation batch = buildObTableBatchOperation(keyValueList, false, null);
852856

853857
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
854-
batch, getTargetTableName(tableNameString, Bytes.toString(family)));
858+
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
855859
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
856860
.execute(request);
857861
return result.getAffectedRows() > 0;
@@ -889,7 +893,8 @@ public Result append(Append append) throws IOException {
889893
queryAndMutate.setTableQuery(obTableQuery);
890894
queryAndMutate.setMutations(batchOperation);
891895
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
892-
batchOperation, getTargetTableName(tableNameString, Bytes.toString(f)));
896+
batchOperation,
897+
getTargetTableName(tableNameString, Bytes.toString(f), configuration));
893898
request.setReturningAffectedEntity(true);
894899
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
895900
.execute(request);
@@ -949,7 +954,7 @@ public Result increment(Increment increment) throws IOException {
949954
queryAndMutate.setTableQuery(obTableQuery);
950955

951956
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
952-
batch, getTargetTableName(tableNameString, Bytes.toString(f)));
957+
batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration));
953958
request.setReturningAffectedEntity(true);
954959
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
955960
.execute(request);
@@ -999,7 +1004,7 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
9991004
queryAndMutate.setTableQuery(obTableQuery);
10001005

10011006
ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery,
1002-
batch, getTargetTableName(tableNameString, Bytes.toString(family)));
1007+
batch, getTargetTableName(tableNameString, Bytes.toString(family), configuration));
10031008
request.setReturningAffectedEntity(true);
10041009
ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient
10051010
.execute(request);
@@ -1063,7 +1068,7 @@ public void flushCommits() throws IOException {
10631068
.getSecond().size());
10641069
try {
10651070
String targetTableName = getTargetTableName(this.tableNameString,
1066-
entry.getKey());
1071+
entry.getKey(), configuration);
10671072

10681073
BatchOperation batch = buildBatchOperation(targetTableName, entry
10691074
.getValue().getSecond(), false, null);
@@ -1308,21 +1313,23 @@ <T> T executeServerCallable(final ServerCallable<T> serverCallable) throws IOExc
13081313
}
13091314
}
13101315

1311-
private String getTargetTableName(String tableNameString, String familyString) {
1316+
public static String getTargetTableName(String tableNameString, String familyString,
1317+
Configuration conf) {
13121318
checkArgument(tableNameString != null, "tableNameString is null");
13131319
checkArgument(familyString != null, "familyString is null");
1314-
if (configuration.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) {
1315-
return getTestLoadTargetTableName(tableNameString, familyString);
1320+
if (conf.getBoolean(HBASE_HTABLE_TEST_LOAD_ENABLE, false)) {
1321+
return getTestLoadTargetTableName(tableNameString, familyString, conf);
13161322
}
13171323
return getNormalTargetTableName(tableNameString, familyString);
13181324
}
13191325

1320-
private String getNormalTargetTableName(String tableNameString, String familyString) {
1326+
private static String getNormalTargetTableName(String tableNameString, String familyString) {
13211327
return tableNameString + "$" + familyString;
13221328
}
13231329

1324-
private String getTestLoadTargetTableName(String tableNameString, String familyString) {
1325-
String suffix = configuration.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
1330+
private static String getTestLoadTargetTableName(String tableNameString, String familyString,
1331+
Configuration conf) {
1332+
String suffix = conf.get(HBASE_HTABLE_TEST_LOAD_SUFFIX,
13261333
DEFAULT_HBASE_HTABLE_TEST_LOAD_SUFFIX);
13271334
return tableNameString + suffix + "$" + familyString;
13281335
}
@@ -1477,9 +1484,9 @@ private ObTableQuery buildObTableQuery(final Get get, Collection<byte[]> columnQ
14771484
return obTableQuery;
14781485
}
14791486

1480-
private ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> keyValueList,
1481-
boolean putToAppend,
1482-
List<byte[]> qualifiers) {
1487+
public static ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> keyValueList,
1488+
boolean putToAppend,
1489+
List<byte[]> qualifiers) {
14831490
ObTableBatchOperation batch = new ObTableBatchOperation();
14841491
for (KeyValue kv : keyValueList) {
14851492
if (qualifiers != null) {
@@ -1536,7 +1543,7 @@ private BatchOperation buildBatchOperation(String tableName, List<KeyValue> keyV
15361543
return batch;
15371544
}
15381545

1539-
private ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) {
1546+
public static ObTableOperation buildObTableOperation(KeyValue kv, boolean putToAppend) {
15401547
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
15411548
switch (kvType) {
15421549
case Put:
@@ -1585,13 +1592,15 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa
15851592
return asyncRequest;
15861593
}
15871594

1588-
private ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation,
1589-
String targetTableName) {
1595+
public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation,
1596+
String targetTableName,
1597+
ExecutorService pool) {
15901598
ObTableBatchOperationRequest request = new ObTableBatchOperationRequest();
15911599
request.setTableName(targetTableName);
15921600
request.setReturningAffectedRows(true);
15931601
request.setEntityType(ObTableEntityType.HKV);
15941602
request.setBatchOperation(obTableBatchOperation);
1603+
request.setPool(pool);
15951604
return request;
15961605
}
15971606

@@ -1609,7 +1618,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
16091618
return request;
16101619
}
16111620

1612-
private void checkFamilyViolation(Collection<byte[]> families) {
1621+
public static void checkFamilyViolation(Collection<byte[]> families) {
16131622
if (families == null || families.size() == 0) {
16141623
throw new FeatureNotSupportedException("family is empty.");
16151624
}
@@ -1637,7 +1646,8 @@ public void refreshTableEntry(String familyString, boolean hasTestLoad) throws E
16371646
getNormalTargetTableName(tableNameString, familyString), true, true);
16381647
if (hasTestLoad) {
16391648
this.obTableClient.getOrRefreshTableEntry(
1640-
getTestLoadTargetTableName(tableNameString, familyString), true, true);
1649+
getTestLoadTargetTableName(tableNameString, familyString, configuration), true,
1650+
true);
16411651
}
16421652
}
16431653

0 commit comments

Comments
 (0)