Skip to content

Commit 1743301

Browse files
committed
simple support single cf in bufferedmutator; wait for batch compatible
1 parent df11904 commit 1743301

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1530,7 +1530,7 @@ public void flushCommits() throws IOException {
15301530
mutator.flush();
15311531
} catch (Exception e) {
15321532
throw new IOException("put table " + tableNameString + " error codes " + null
1533-
+ "auto flush " + autoFlush + " current buffer size "
1533+
+ " auto flush " + autoFlush + " current buffer size "
15341534
+ mutator.getCurrentBufferSize(), e);
15351535
}
15361536
}

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
6868
private int rpcTimeout;
6969
private int operationTimeout;
7070
private static final long OB_VERSION_4_3_5_0 = calcVersion(4, (short) 3, (byte) 5, (byte) 0);
71+
private static final long OB_VERSION_4_3_0_0 = calcVersion(4, (short) 3, (byte) 0, (byte) 0);
72+
private static final long OB_VERSION_4_2_5_1 = calcVersion(4, (short) 2, (byte) 5, (byte) 1);
73+
private static final long OB_VERSION_4_3_4_0 = calcVersion(4, (short) 3, (byte) 4, (byte) 0);
7174

7275
public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParams params,
7376
OHTable ohTable) throws IOException {
@@ -144,16 +147,8 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
144147
validateOperation(m);
145148
toAddSize += m.heapSize();
146149
}
147-
148-
currentAsyncBufferSize.addAndGet(toAddSize);
149-
asyncWriteBuffer.addAll(mutations);
150-
151-
if (currentAsyncBufferSize.get() > writeBufferSize) {
152-
batchExecute(false);
153-
}
154150
} else {
155-
// check if every mutation's family is the same
156-
// check if mutations are the same type
151+
// version below 4_3_5 need the same type in one bufferedMutator
157152
for (Mutation m : mutations) {
158153
validateOperation(m);
159154
Class<?> curType = m.getClass();
@@ -164,11 +159,14 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
164159
}
165160
toAddSize += m.heapSize();
166161
}
162+
}
163+
currentAsyncBufferSize.addAndGet(toAddSize);
164+
asyncWriteBuffer.addAll(mutations);
167165

168-
currentAsyncBufferSize.addAndGet(toAddSize);
169-
asyncWriteBuffer.addAll(mutations);
170-
171-
if (currentAsyncBufferSize.get() > writeBufferSize) {
166+
if (currentAsyncBufferSize.get() > writeBufferSize) {
167+
if (isBatchSupport()) {
168+
batchExecute(false);
169+
} else {
172170
normalExecute(false);
173171
}
174172
}
@@ -188,9 +186,17 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
188186
if (mt instanceof Put) {
189187
// family empty check is in validatePut
190188
OHTable.validatePut((Put) mt, maxKeyValueSize);
191-
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
189+
if (isMultiFamilySupport()) {
190+
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), true);
191+
} else {
192+
OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
193+
}
192194
} else {
193-
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
195+
if (isMultiFamilySupport()) {
196+
OHTable.checkFamilyViolation(mt.getFamilyMap().keySet(), false);
197+
} else {
198+
OHTable.checkFamilyViolationForOneFamily(mt.getFamilyMap().keySet());
199+
}
194200
}
195201
}
196202

@@ -394,6 +400,14 @@ boolean isBatchSupport() {
394400
return OB_VERSION >= OB_VERSION_4_3_5_0;
395401
}
396402

403+
/**
404+
* Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
405+
* */
406+
boolean isMultiFamilySupport() {
407+
return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0)
408+
|| (OB_VERSION >= OB_VERSION_4_3_4_0);
409+
}
410+
397411
/**
398412
* Force to commit all operations
399413
* do not care whether the pool is shut down or this BufferedMutator is closed

0 commit comments

Comments
 (0)