Skip to content

Commit 7b79c08

Browse files
committed
bufferedMutator compatible to old server
1 parent 65cba8d commit 7b79c08

File tree

1 file changed

+12
-151
lines changed

1 file changed

+12
-151
lines changed

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 12 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,13 @@
1818
package com.alipay.oceanbase.hbase.util;
1919

2020
import com.alipay.oceanbase.hbase.OHTable;
21-
import com.alipay.oceanbase.rpc.ObTableClient;
22-
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2321
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
24-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationRequest;
25-
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
2622
import com.google.common.annotations.VisibleForTesting;
2723
import org.apache.hadoop.classification.InterfaceAudience;
2824
import org.apache.hadoop.conf.Configuration;
2925
import org.apache.hadoop.hbase.KeyValue;
3026
import org.apache.hadoop.hbase.TableName;
3127
import org.apache.hadoop.hbase.client.*;
32-
import org.apache.hadoop.hbase.util.Bytes;
3328
import org.slf4j.Logger;
3429

3530
import java.io.IOException;
@@ -38,7 +33,6 @@
3833
import java.util.concurrent.ExecutorService;
3934
import java.util.concurrent.TimeUnit;
4035
import java.util.concurrent.atomic.AtomicLong;
41-
import java.util.concurrent.atomic.AtomicReference;
4236

4337
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
4438
import static com.alipay.oceanbase.rpc.ObGlobal.*;
@@ -54,23 +48,19 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
5448
private volatile Configuration conf;
5549

5650
private OHTable ohTable;
57-
private ObTableClient obTableClient;
5851
@VisibleForTesting
5952
final ConcurrentLinkedQueue<Mutation> asyncWriteBuffer = new ConcurrentLinkedQueue<Mutation>();
6053
@VisibleForTesting
6154
AtomicLong currentAsyncBufferSize = new AtomicLong(0);
62-
private AtomicReference<Class<?>> type = new AtomicReference<>(null);
6355

6456
private long writeBufferSize;
6557
private final int maxKeyValueSize;
6658
private boolean closed = false;
6759
private final ExecutorService pool;
6860
private int rpcTimeout;
6961
private int operationTimeout;
70-
private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
71-
private static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);
72-
private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2, (byte) 5, (byte) 1);
73-
private static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);
62+
private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2,
63+
(byte) 5, (byte) 1);
7464

7565
public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params,
7666
OHTable ohTable) throws IOException {
@@ -92,18 +82,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
9282
this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
9383
.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize();
9484

95-
if (isBatchSupport()) {
96-
// create an OHTable object to do batch work
97-
if (ohTable != null) {
98-
this.ohTable = ohTable;
99-
} else {
100-
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
101-
}
85+
// create an OHTable object to do batch work
86+
if (ohTable != null) {
87+
this.ohTable = ohTable;
10288
} else {
103-
// create an ObTableClient object to execute batch operation request
104-
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
105-
this.obTableClient.setRuntimeBatchExecutor(pool);
106-
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
89+
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
10790
}
10891
}
10992

@@ -142,33 +125,15 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
142125
}
143126

144127
long toAddSize = 0;
145-
if (isBatchSupport()) {
146-
for (Mutation m : mutations) {
147-
validateOperation(m);
148-
toAddSize += m.heapSize();
149-
}
150-
} else {
151-
// version below 4_3_5 need the same type in one bufferedMutator
152-
for (Mutation m : mutations) {
153-
validateOperation(m);
154-
Class<?> curType = m.getClass();
155-
// set the type of this BufferedMutator
156-
type.compareAndSet(null, curType);
157-
if (!type.get().equals(curType)) {
158-
throw new IllegalArgumentException("Not support different type in one batch.");
159-
}
160-
toAddSize += m.heapSize();
161-
}
128+
for (Mutation m : mutations) {
129+
validateOperation(m);
130+
toAddSize += m.heapSize();
162131
}
163132
currentAsyncBufferSize.addAndGet(toAddSize);
164133
asyncWriteBuffer.addAll(mutations);
165134

166135
if (currentAsyncBufferSize.get() > writeBufferSize) {
167-
if (isBatchSupport()) {
168-
batchExecute(false);
169-
} else {
170-
normalExecute(false);
171-
}
136+
batchExecute(false);
172137
}
173138
}
174139

@@ -259,102 +224,6 @@ private void batchExecute(boolean flushAll) throws IOException {
259224
}
260225
}
261226

262-
/**
263-
* This execute supports for server version below 4_3_5.
264-
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
265-
* there is an error, either throw the error, or use the listener to deal with the error.
266-
*
267-
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
268-
* returning.
269-
*/
270-
private void normalExecute(boolean flushAll) throws IOException {
271-
LinkedList<Mutation> execBuffer = new LinkedList<>();
272-
ObTableBatchOperationRequest request = null;
273-
// namespace n1, n1:table_name
274-
// namespace default, table_name
275-
String tableNameString = tableName.getNameAsString();
276-
try {
277-
long dequeuedSize = 0L;
278-
Mutation m;
279-
while ((writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || flushAll)
280-
&& (m = asyncWriteBuffer.poll()) != null) {
281-
execBuffer.add(m);
282-
long size = m.heapSize();
283-
currentAsyncBufferSize.addAndGet(-size);
284-
dequeuedSize += size;
285-
}
286-
// in concurrent situation, asyncWriteBuffer may be empty here
287-
// for other threads flush all buffer
288-
if (execBuffer.isEmpty()) {
289-
return;
290-
}
291-
try{
292-
// for now, operations' family is the same
293-
byte[] family = execBuffer.getFirst().getFamilyMap().firstKey();
294-
ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer);
295-
// table_name$cf_name
296-
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
297-
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
298-
} catch (Exception ex) {
299-
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
300-
+ ": Errors unrelated to operations occur before mutation operation", ex);
301-
throw new ObTableUnexpectedException(tableName.getNameAsString() + ": Errors occur before mutation operation", ex);
302-
}
303-
try {
304-
ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request);
305-
} catch (Exception ex) {
306-
LOGGER.debug(LCD.convert("01-00011"), tableName.getNameAsString() +
307-
": Errors occur during mutation operation", ex);
308-
m = null;
309-
try {
310-
// retry every single operation
311-
while (!execBuffer.isEmpty()) {
312-
// poll elements from execBuffer to recollect remaining operations
313-
m = execBuffer.poll();
314-
byte[] family = m.getFamilyMap().firstKey();
315-
ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m));
316-
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
317-
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
318-
ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request);
319-
}
320-
} catch (Exception newEx) {
321-
if (m != null) {
322-
execBuffer.addFirst(m);
323-
}
324-
// if retry fails, only recollect remaining operations
325-
while(!execBuffer.isEmpty()) {
326-
m = execBuffer.poll();
327-
long size = m.heapSize();
328-
asyncWriteBuffer.add(m);
329-
currentAsyncBufferSize.addAndGet(size);
330-
}
331-
throw newEx;
332-
}
333-
}
334-
} catch (Exception ex) {
335-
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() +
336-
": Errors occur during mutation operation", ex);
337-
// if the cause is illegal argument, directly throw to user
338-
if (ex instanceof ObTableUnexpectedException) {
339-
throw (ObTableUnexpectedException) ex;
340-
}
341-
// TODO: need to collect error information and actions in old version
342-
// TODO: maybe keep in ObTableBatchOperationResult
343-
List<Throwable> throwables = new ArrayList<Throwable>();
344-
List<Row> actions = new ArrayList<Row>();
345-
List<String> addresses = new ArrayList<String>();
346-
throwables.add(ex);
347-
RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException(
348-
new ArrayList<Throwable>(throwables),
349-
new ArrayList<Row>(actions), new ArrayList<String>(addresses));
350-
if (listener == null) {
351-
throw error;
352-
} else {
353-
listener.onException(error, this);
354-
}
355-
}
356-
}
357-
358227
@Override
359228
public void close() throws IOException {
360229
if (closed) {
@@ -396,16 +265,12 @@ private ObTableBatchOperation buildObTableBatchOperation(List<? extends Mutation
396265
return OHTable.buildObTableBatchOperation(keyValueList, false, null);
397266
}
398267

399-
boolean isBatchSupport() {
400-
return OB_VERSION >= OB_VERSION_4_3_5_0;
401-
}
402-
403268
/**
404269
* Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
405270
* */
406271
boolean isMultiFamilySupport() {
407272
return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0)
408-
|| (OB_VERSION >= OB_VERSION_4_3_4_0);
273+
|| (OB_VERSION >= OB_VERSION_4_3_4_0);
409274
}
410275

411276
/**
@@ -414,11 +279,7 @@ boolean isMultiFamilySupport() {
414279
*/
415280
@Override
416281
public void flush() throws IOException {
417-
if (isBatchSupport()) {
418-
batchExecute(true);
419-
} else {
420-
normalExecute(true);
421-
}
282+
batchExecute(true);
422283
}
423284

424285
@Override

0 commit comments

Comments
 (0)