Skip to content

Commit 7bf29e8

Browse files
JackShi148stuBirdFly
authored andcommitted
OHBufferedMutator set and add runtimeBatchExecutor in ObTableClient (#68)
* init bufferedMutator * finish validateFamily and asyncExecute * correct log in OHBufferedMutatorImpl * pass self-test * format code * add retry when batch fails * remove test print * format code * make interface more generalized * format BufferedMutator test case * remove redundancy, add some comments * fix type of a bufferedMutator. Optimize by review * OHBufferedMutator set and use runtimeBatchExecutor in ObTableClient
1 parent 1b584ad commit 7bf29e8

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,8 @@ public Result call() throws IOException {
545545
family = entry.getKey();
546546
obTableQuery = buildObTableQuery(get, entry.getValue());
547547
request = buildObTableQueryAsyncRequest(obTableQuery,
548-
getTargetTableName(tableNameString, Bytes.toString(family)));
548+
getTargetTableName(tableNameString, Bytes.toString(family),
549+
configuration));
549550
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
550551
.execute(request);
551552
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
@@ -810,7 +811,7 @@ private void innerDelete(Delete delete) throws IOException {
810811
.next();
811812

812813
BatchOperation batch = buildBatchOperation(
813-
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
814+
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
814815
entry.getValue(), false, null);
815816
results = batch.execute();
816817
}
@@ -1729,14 +1730,12 @@ private ObTableQueryAsyncRequest buildObTableQueryAsyncRequest(ObTableQuery obTa
17291730
}
17301731

17311732
public static ObTableBatchOperationRequest buildObTableBatchOperationRequest(ObTableBatchOperation obTableBatchOperation,
1732-
String targetTableName,
1733-
ExecutorService pool) {
1733+
String targetTableName) {
17341734
ObTableBatchOperationRequest request = new ObTableBatchOperationRequest();
17351735
request.setTableName(targetTableName);
17361736
request.setReturningAffectedRows(true);
17371737
request.setEntityType(ObTableEntityType.HKV);
17381738
request.setBatchOperation(obTableBatchOperation);
1739-
request.setPool(pool);
17401739
return request;
17411740
}
17421741

@@ -1754,7 +1753,7 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
17541753
return request;
17551754
}
17561755

1757-
private void checkFamilyViolation(Collection<byte[]> families) {
1756+
public static void checkFamilyViolation(Collection<byte[]> families) {
17581757
for (byte[] family : families) {
17591758
if (isBlank(Bytes.toString(family))) {
17601759
throw new IllegalArgumentException("family is blank");

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,13 +88,13 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
8888
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(ohConnection
8989
.getOHConnectionConfiguration());
9090

91-
// init params in OHBufferedMutatorImpl:
92-
// TableName + pool + Configuration + listener + writeBufferSize + maxKeyValueSize + rpcTimeout + operationTimeout
91+
// init params in OHBufferedMutatorImpl
9392
this.tableName = params.getTableName();
9493
this.conf = ohConnection.getConfiguration();
9594
this.connectionConfig = ohConnection.getOHConnectionConfiguration();
9695
this.listener = params.getListener();
9796
this.pool = params.getPool();
97+
this.obTableClient.setRuntimeBatchExecutor(pool);
9898

9999
this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
100100
.getWriteBufferSize() : connectionConfig.getWriteBufferSize();
@@ -211,7 +211,7 @@ private void asyncExecute(boolean flushAll) throws IOException {
211211
ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer);
212212
// table_name$cf_name
213213
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
214-
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool);
214+
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
215215
} catch (Exception ex) {
216216
LOGGER.error("Errors occur before mutation operation", ex);
217217
throw new IllegalArgumentException("Errors occur before mutation operation", ex);
@@ -229,7 +229,7 @@ private void asyncExecute(boolean flushAll) throws IOException {
229229
byte[] family = m.getFamilyMap().firstKey();
230230
ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m));
231231
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
232-
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName, pool);
232+
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
233233
ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request);
234234
}
235235
} catch (Exception newEx) {
@@ -278,6 +278,7 @@ public void close() throws IOException {
278278
try {
279279
asyncExecute(true);
280280
} finally {
281+
// the pool in ObTableClient will be shut down too
281282
this.pool.shutdown();
282283
try {
283284
if (!pool.awaitTermination(600, TimeUnit.SECONDS)) {

0 commit comments

Comments
 (0)