Skip to content

Commit df11904

Browse files
committed
init old server bufferedMutator compatible
1 parent 28619e1 commit df11904

File tree

1 file changed

+177
-20
lines changed

1 file changed

+177
-20
lines changed

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 177 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,18 @@
1818
package com.alipay.oceanbase.hbase.util;
1919

2020
import com.alipay.oceanbase.hbase.OHTable;
21+
import com.alipay.oceanbase.rpc.ObTableClient;
22+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationRequest;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperationResult;
2126
import com.google.common.annotations.VisibleForTesting;
2227
import org.apache.hadoop.classification.InterfaceAudience;
2328
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.hbase.KeyValue;
2430
import org.apache.hadoop.hbase.TableName;
2531
import org.apache.hadoop.hbase.client.*;
32+
import org.apache.hadoop.hbase.util.Bytes;
2633
import org.slf4j.Logger;
2734

2835
import java.io.IOException;
@@ -31,6 +38,10 @@
3138
import java.util.concurrent.ExecutorService;
3239
import java.util.concurrent.TimeUnit;
3340
import java.util.concurrent.atomic.AtomicLong;
41+
import java.util.concurrent.atomic.AtomicReference;
42+
43+
import static com.alipay.oceanbase.hbase.util.TableHBaseLoggerFactory.LCD;
44+
import static com.alipay.oceanbase.rpc.ObGlobal.*;
3445

3546
@InterfaceAudience.Private
3647
public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -39,21 +50,24 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
3950

4051
private final ExceptionListener listener;
4152

42-
private final OHTable ohTable;
4353
private final TableName tableName;
4454
private volatile Configuration conf;
4555

56+
private OHTable ohTable;
57+
private ObTableClient obTableClient;
4658
@VisibleForTesting
4759
final ConcurrentLinkedQueue<Mutation> asyncWriteBuffer = new ConcurrentLinkedQueue<Mutation>();
4860
@VisibleForTesting
4961
AtomicLong currentAsyncBufferSize = new AtomicLong(0);
62+
private AtomicReference<Class<?>> type = new AtomicReference<>(null);
5063

5164
private long writeBufferSize;
5265
private final int maxKeyValueSize;
5366
private boolean closed = false;
5467
private final ExecutorService pool;
5568
private int rpcTimeout;
5669
private int operationTimeout;
70+
private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
5771

5872
public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params,
5973
OHTable ohTable) throws IOException {
@@ -75,11 +89,18 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
7589
this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
7690
.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize();
7791

78-
// create an OHTable object to do batch work
79-
if (ohTable != null) {
80-
this.ohTable = ohTable;
92+
if (isBatchSupport()) {
93+
// create an OHTable object to do batch work
94+
if (ohTable != null) {
95+
this.ohTable = ohTable;
96+
} else {
97+
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
98+
}
8199
} else {
82-
this.ohTable = new OHTable(tableName, ohConnection, connectionConfig, pool);
100+
// create an ObTableClient object to execute batch operation request
101+
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
102+
this.obTableClient.setRuntimeBatchExecutor(pool);
103+
this.obTableClient.setRpcExecuteTimeout(rpcTimeout);
83104
}
84105
}
85106

@@ -118,18 +139,39 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
118139
}
119140

120141
long toAddSize = 0;
121-
for (Mutation m : mutations) {
122-
validateOperation(m);
123-
toAddSize += m.heapSize();
124-
}
142+
if (isBatchSupport()) {
143+
for (Mutation m : mutations) {
144+
validateOperation(m);
145+
toAddSize += m.heapSize();
146+
}
125147

126-
currentAsyncBufferSize.addAndGet(toAddSize);
127-
asyncWriteBuffer.addAll(mutations);
148+
currentAsyncBufferSize.addAndGet(toAddSize);
149+
asyncWriteBuffer.addAll(mutations);
128150

129-
if (currentAsyncBufferSize.get() > writeBufferSize) {
130-
execute(false);
131-
}
151+
if (currentAsyncBufferSize.get() > writeBufferSize) {
152+
batchExecute(false);
153+
}
154+
} else {
155+
// check if every mutation's family is the same
156+
// check if mutations are the same type
157+
for (Mutation m : mutations) {
158+
validateOperation(m);
159+
Class<?> curType = m.getClass();
160+
// set the type of this BufferedMutator
161+
type.compareAndSet(null, curType);
162+
if (!type.get().equals(curType)) {
163+
throw new IllegalArgumentException("Not support different type in one batch.");
164+
}
165+
toAddSize += m.heapSize();
166+
}
132167

168+
currentAsyncBufferSize.addAndGet(toAddSize);
169+
asyncWriteBuffer.addAll(mutations);
170+
171+
if (currentAsyncBufferSize.get() > writeBufferSize) {
172+
normalExecute(false);
173+
}
174+
}
133175
}
134176

135177
/**
@@ -153,13 +195,14 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
153195
}
154196

155197
/**
198+
* This execute only supports for server version of 4_3_5.
156199
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
157200
* there is an error, either throw the error, or use the listener to deal with the error.
158201
*
159202
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
160203
* returning.
161204
*/
162-
private void execute(boolean flushAll) throws IOException {
205+
private void batchExecute(boolean flushAll) throws IOException {
163206
LinkedList<Mutation> execBuffer = new LinkedList<>();
164207
long dequeuedSize = 0L;
165208
try {
@@ -180,15 +223,15 @@ private void execute(boolean flushAll) throws IOException {
180223
execBuffer.clear();
181224
} catch (Exception ex) {
182225
if (ex.getCause() instanceof RetriesExhaustedWithDetailsException) {
183-
LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString()
226+
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
184227
+ ": One or more of the operations have failed after retries.", ex.getCause());
185228
RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException) ex.getCause();
186229
// recollect mutations and log error information
187230
execBuffer.clear();
188231
for (int i = 0; i < retryException.getNumExceptions(); ++i) {
189232
Row failedOp = retryException.getRow(i);
190233
execBuffer.add((Mutation) failedOp);
191-
LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), failedOp, tableName.getNameAsString(),
234+
LOGGER.error(LCD.convert("01-00011"), failedOp, tableName.getNameAsString(),
192235
currentAsyncBufferSize.get(), retryException.getCause(i));
193236
}
194237
if (listener != null) {
@@ -197,7 +240,7 @@ private void execute(boolean flushAll) throws IOException {
197240
throw retryException;
198241
}
199242
} else {
200-
LOGGER.error(TableHBaseLoggerFactory.LCD.convert("01-00011"), tableName.getNameAsString()
243+
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
201244
+ ": Errors unrelated to operations occur during mutation operation", ex);
202245
throw ex;
203246
}
@@ -210,13 +253,109 @@ private void execute(boolean flushAll) throws IOException {
210253
}
211254
}
212255

256+
/**
257+
* This execute supports for server version below 4_3_5.
258+
* Send the operations in the buffer to the servers. Does not wait for the server's answer. If
259+
* there is an error, either throw the error, or use the listener to deal with the error.
260+
*
261+
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
262+
* returning.
263+
*/
264+
private void normalExecute(boolean flushAll) throws IOException {
265+
LinkedList<Mutation> execBuffer = new LinkedList<>();
266+
ObTableBatchOperationRequest request = null;
267+
// namespace n1, n1:table_name
268+
// namespace default, table_name
269+
String tableNameString = tableName.getNameAsString();
270+
try {
271+
long dequeuedSize = 0L;
272+
Mutation m;
273+
while ((writeBufferSize <= 0 || dequeuedSize < (writeBufferSize * 2) || flushAll)
274+
&& (m = asyncWriteBuffer.poll()) != null) {
275+
execBuffer.add(m);
276+
long size = m.heapSize();
277+
currentAsyncBufferSize.addAndGet(-size);
278+
dequeuedSize += size;
279+
}
280+
// in concurrent situation, asyncWriteBuffer may be empty here
281+
// for other threads flush all buffer
282+
if (execBuffer.isEmpty()) {
283+
return;
284+
}
285+
try{
286+
// for now, operations' family is the same
287+
byte[] family = execBuffer.getFirst().getFamilyMap().firstKey();
288+
ObTableBatchOperation batch = buildObTableBatchOperation(execBuffer);
289+
// table_name$cf_name
290+
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
291+
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
292+
} catch (Exception ex) {
293+
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString()
294+
+ ": Errors unrelated to operations occur before mutation operation", ex);
295+
throw new ObTableUnexpectedException(tableName.getNameAsString() + ": Errors occur before mutation operation", ex);
296+
}
297+
try {
298+
ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request);
299+
} catch (Exception ex) {
300+
LOGGER.debug(LCD.convert("01-00011"), tableName.getNameAsString() +
301+
": Errors occur during mutation operation", ex);
302+
m = null;
303+
try {
304+
// retry every single operation
305+
while (!execBuffer.isEmpty()) {
306+
// poll elements from execBuffer to recollect remaining operations
307+
m = execBuffer.poll();
308+
byte[] family = m.getFamilyMap().firstKey();
309+
ObTableBatchOperation batch = buildObTableBatchOperation(Collections.singletonList(m));
310+
String targetTableName = OHTable.getTargetTableName(tableNameString, Bytes.toString(family), conf);
311+
request = OHTable.buildObTableBatchOperationRequest(batch, targetTableName);
312+
ObTableBatchOperationResult result = (ObTableBatchOperationResult) obTableClient.execute(request);
313+
}
314+
} catch (Exception newEx) {
315+
if (m != null) {
316+
execBuffer.addFirst(m);
317+
}
318+
// if retry fails, only recollect remaining operations
319+
while(!execBuffer.isEmpty()) {
320+
m = execBuffer.poll();
321+
long size = m.heapSize();
322+
asyncWriteBuffer.add(m);
323+
currentAsyncBufferSize.addAndGet(size);
324+
}
325+
throw newEx;
326+
}
327+
}
328+
} catch (Exception ex) {
329+
LOGGER.error(LCD.convert("01-00011"), tableName.getNameAsString() +
330+
": Errors occur during mutation operation", ex);
331+
// if the cause is illegal argument, directly throw to user
332+
if (ex instanceof ObTableUnexpectedException) {
333+
throw (ObTableUnexpectedException) ex;
334+
}
335+
// TODO: need to collect error information and actions in old version
336+
// TODO: maybe keep in ObTableBatchOperationResult
337+
List<Throwable> throwables = new ArrayList<Throwable>();
338+
List<Row> actions = new ArrayList<Row>();
339+
List<String> addresses = new ArrayList<String>();
340+
throwables.add(ex);
341+
RetriesExhaustedWithDetailsException error = new RetriesExhaustedWithDetailsException(
342+
new ArrayList<Throwable>(throwables),
343+
new ArrayList<Row>(actions), new ArrayList<String>(addresses));
344+
if (listener == null) {
345+
throw error;
346+
} else {
347+
listener.onException(error, this);
348+
}
349+
}
350+
}
351+
213352
@Override
214353
public void close() throws IOException {
215354
if (closed) {
216355
return;
217356
}
218357
try {
219-
execute(true);
358+
flush();
220359
} finally {
221360
// the pool in ObTableClient will be shut down too
222361
this.pool.shutdown();
@@ -241,13 +380,31 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
241380
}
242381
}
243382

383+
private ObTableBatchOperation buildObTableBatchOperation(List<? extends Mutation> execBuffer) {
384+
List<KeyValue> keyValueList = new LinkedList<>();
385+
for (Mutation mutation : execBuffer) {
386+
for (Map.Entry<byte[], List<KeyValue>> entry : mutation.getFamilyMap().entrySet()) {
387+
keyValueList.addAll(entry.getValue());
388+
}
389+
}
390+
return OHTable.buildObTableBatchOperation(keyValueList, false, null);
391+
}
392+
393+
boolean isBatchSupport() {
394+
return OB_VERSION >= OB_VERSION_4_3_5_0;
395+
}
396+
244397
/**
245398
* Force to commit all operations
246399
* do not care whether the pool is shut down or this BufferedMutator is closed
247400
*/
248401
@Override
249402
public void flush() throws IOException {
250-
execute(true);
403+
if (isBatchSupport()) {
404+
batchExecute(true);
405+
} else {
406+
normalExecute(true);
407+
}
251408
}
252409

253410
@Override

0 commit comments

Comments
 (0)