From c30c0c0b68675902cdbf9428e9245bb9e8b621d6 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 31 Mar 2025 14:44:45 +0800 Subject: [PATCH 1/3] add isReturnResults flag to QueryAndMutateRequest in Increment and Append --- src/main/java/com/alipay/oceanbase/hbase/OHTable.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java index a8c2c1bc..b06e24c0 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTable.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTable.java @@ -1357,13 +1357,10 @@ public Result append(Append append) throws IOException { // the later hbase has supported timeRange ObHTableFilter filter = buildObHTableFilter(null, null, 1, qualifiers); ObTableQuery obTableQuery = buildObTableQuery(filter, r, true, r, true, false); - ObTableQueryAndMutate queryAndMutate = new ObTableQueryAndMutate(); - queryAndMutate.setTableQuery(obTableQuery); - queryAndMutate.setMutations(batchOperation); ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batchOperation, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); - request.setReturningAffectedEntity(true); + request.setReturningAffectedEntity(append.isReturnResults()); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); ObTableQueryResult queryResult = result.getAffectedEntity(); @@ -1412,7 +1409,7 @@ public Result increment(Increment increment) throws IOException { ObTableQueryAndMutateRequest request = buildObTableQueryAndMutateRequest(obTableQuery, batch, getTargetTableName(tableNameString, Bytes.toString(f), configuration)); - request.setReturningAffectedEntity(true); + request.setReturningAffectedEntity(increment.isReturnResults()); ObTableQueryAndMutateResult result = (ObTableQueryAndMutateResult) obTableClient .execute(request); ObTableQueryResult queryResult = result.getAffectedEntity(); @@ -2161,7 +2158,6 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu request.setTableName(targetTableName); request.setTableQueryAndMutate(queryAndMutate); request.setEntityType(ObTableEntityType.HKV); - request.setReturningAffectedEntity(true); return request; } From 83892b3ed763231e83877b00db6e8e8677616970 Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Mon, 31 Mar 2025 14:56:42 +0800 Subject: [PATCH 2/3] fix runtimeBatchExecutor not set to OHTable in OHTableClient init --- .../java/com/alipay/oceanbase/hbase/OHTableClient.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java index c20c2bcb..3a97a74d 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java +++ b/src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java @@ -44,7 +44,7 @@ public class OHTableClient implements HTableInterface, Lifecycle { private OHTable ohTable; private volatile boolean initialized = false; private final Configuration conf; - private ExecutorService runtimeBatchExecutor; + private ExecutorService runtimeBatchExecutor = null; public void setRuntimeBatchExecutor(ExecutorService runtimeBatchExecutor) { this.runtimeBatchExecutor = runtimeBatchExecutor; @@ -82,7 +82,11 @@ public void init() throws Exception { lock.lock(); try { if (!initialized) { - ohTable = new OHTable(conf, tableNameString); + if (runtimeBatchExecutor != null) { + ohTable = new OHTable(conf, tableName, runtimeBatchExecutor); + } else { + ohTable = new OHTable(conf, tableNameString); + } initialized = true; } } finally { From 168171498a2e1441103d7a44843762349f45652d Mon Sep 17 00:00:00 2001 From: JackShi148 Date: Wed, 2 Apr 2025 14:42:37 +0800 Subject: [PATCH 3/3] set close and flush as synchronized to match original hbase --- .../alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java index 2aba69f5..2e865e9c 100644 --- a/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java +++ b/src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java @@ -236,7 +236,7 @@ private void batchExecute(boolean flushAll) throws IOException { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (closed) { return; } @@ -279,7 +279,7 @@ boolean isMultiFamilySupport() { * do not care whether the pool is shut down or this BufferedMutator is closed */ @Override - public void flush() throws IOException { + public synchronized void flush() throws IOException { batchExecute(true); }