Skip to content

Commit 13acb2b

Browse files
committed
correct bufferedMutator to original hbase (#127)
* add DepentdentFilter and SingleColumnValueExcludeFilter * add singleColumnValueExcludeFilter and DependentColumnFilter; add test cases and optimize bufferedMutator test cases * merge obkv branch-1.3 and correct test case for the mulfi-cf bug * optimize test case and add sql to create self-defined database * correct bufferedMutator to adjust original hbase * directly throw exceptions before execution * fix incorrect close
1 parent 67f6370 commit 13acb2b

File tree

3 files changed

+21
-25
lines changed

3 files changed

+21
-25
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public byte[] getTableName() {
439439

440440
@Override
441441
public TableName getName() {
442-
throw new FeatureNotSupportedException("not supported yet.");
442+
return TableName.valueOf(this.tableNameString);
443443
}
444444

445445
@Override

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.alipay.oceanbase.hbase.OHTable;
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
22-
import com.google.common.annotations.VisibleForTesting;
2322
import org.apache.hadoop.classification.InterfaceAudience;
2423
import org.apache.hadoop.conf.Configuration;
2524
import org.apache.hadoop.hbase.KeyValue;
@@ -48,9 +47,7 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
4847
private volatile Configuration conf;
4948

5049
private OHTable ohTable;
51-
@VisibleForTesting
5250
final ConcurrentLinkedQueue<Mutation> asyncWriteBuffer = new ConcurrentLinkedQueue<Mutation>();
53-
@VisibleForTesting
5451
AtomicLong currentAsyncBufferSize = new AtomicLong(0);
5552

5653
private long writeBufferSize;
@@ -166,7 +163,6 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
166163
}
167164

168165
/**
169-
* This execute only supports for server version of 4_3_5.
170166
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
171167
* there is an error, either throw the error, or use the listener to deal with the error.
172168
*
@@ -189,38 +185,25 @@ private void batchExecute(boolean flushAll) throws IOException {
189185
if (execBuffer.isEmpty()) {
190186
return;
191187
}
192-
ohTable.batch(execBuffer);
188+
Object[] results = new Object[execBuffer.size()];
189+
ohTable.batch(execBuffer, results);
193190
// if commit all successfully, clean execBuffer
194191
execBuffer.clear();
195192
} catch (Exception ex) {
193+
// do not recollect error operations, notify outside
194+
LOGGER.error(LCD.convert("01-00026"), ex);
196195
if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) {
197-
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
198-
+ ": One or more of the operations have failed after retries.", ex.getCause());
196+
LOGGER.error(tableName + ": One or more of the operations have failed after retries.");
199197
RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause();
200-
// recollect mutations and log error information
201-
execBuffer.clear();
202-
for (int i = 0; i < retryException.getNumExceptions(); ++i) {
203-
Row failedOp = retryException.getRow(i);
204-
execBuffer.add((Mutation) failedOp);
205-
LOGGER.error(LCD.convert("01-00011"), failedOp, tableName.getNameAsString(),
206-
currentAsyncBufferSize.get(), retryException.getCause(i));
207-
}
208198
if (listener != null) {
209199
listener.onException(retryException, this);
210200
} else {
211201
throw retryException;
212202
}
213203
} else {
214-
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
215-
+ ": Errors unrelated to operations occur during mutation operation", ex);
204+
LOGGER.error("Errors unrelated to operations occur during mutation operation", ex);
216205
throw ex;
217206
}
218-
} finally {
219-
for (Mutation mutation : execBuffer) {
220-
long size = mutation.heapSize();
221-
currentAsyncBufferSize.addAndGet(size);
222-
asyncWriteBuffer.add(mutation);
223-
}
224207
}
225208
}
226209

@@ -230,7 +213,7 @@ public void close() throws IOException {
230213
return;
231214
}
232215
try {
233-
flush();
216+
batchExecute(true);
234217
} finally {
235218
// the pool in ObTableClient will be shut down too
236219
this.pool.shutdown();

src/main/java/com/alipay/oceanbase/hbase/util/ObTableClientManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.Properties;
3030
import java.util.concurrent.ConcurrentHashMap;
31+
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.locks.ReentrantLock;
3233

3334
import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
@@ -128,6 +129,18 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
128129
return OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey);
129130
}
130131

132+
public static void clear() throws IOException {
133+
try {
134+
for (Map.Entry<ObTableClientKey, ObTableClient> pair : OB_TABLE_CLIENT_INSTANCE.entrySet()) {
135+
pair.getValue().close();
136+
}
137+
}
138+
catch (Exception e) {
139+
throw new IOException("fail to close tableClient" , e);
140+
}
141+
OB_TABLE_CLIENT_INSTANCE.clear();
142+
}
143+
131144
public static class ObTableClientKey {
132145
private String paramUrl;
133146
private String fullUserName;

0 commit comments

Comments
 (0)