Skip to content

Commit 3411491

Browse files
authored
hbase support batchCallBack (#86)
1 parent 697c5d0 commit 3411491

File tree

2 files changed

+45
-6
lines changed

2 files changed

+45
-6
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -526,15 +526,26 @@ public Object[] batch(List<? extends Row> actions) throws IOException {
526526
@Override
527527
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
528528
Batch.Callback<R> callback) throws IOException,
529-
InterruptedException {
530-
throw new FeatureNotSupportedException("not supported yet'");
529+
InterruptedException {
530+
try {
531+
batch(actions, results);
532+
} finally {
533+
if (results != null) {
534+
for (int i = 0; i < results.length; i++) {
535+
if (!(results[i] instanceof ObTableException)) {
536+
callback.update(null, actions.get(i).getRow(), (R) results[i]);
537+
}
538+
}
539+
}
540+
}
531541
}
532542

533543
@Override
534-
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
535-
throws IOException,
536-
InterruptedException {
537-
throw new FeatureNotSupportedException("not supported yet'");
544+
public <R> Object[] batchCallback(
545+
final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException, InterruptedException {
546+
Object[] results = new Object[actions.size()];
547+
batchCallback(actions, results, callback);
548+
return results;
538549
}
539550

540551
public static int compareByteArray(byte[] bt1, byte[] bt2) {

src/test/java/com/alipay/oceanbase/hbase/OHTableMultiColumnFamilyTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package com.alipay.oceanbase.hbase;
1919

20+
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
2021
import org.apache.hadoop.hbase.KeyValue;
2122
import org.apache.hadoop.hbase.client.*;
23+
import org.apache.hadoop.hbase.client.coprocessor.Batch;
2224
import org.apache.hadoop.hbase.filter.PrefixFilter;
2325
import org.junit.*;
2426
import org.junit.rules.ExpectedException;
@@ -146,6 +148,32 @@ public void testMultiColumnFamilyBatch() throws Exception {
146148
result = hTable.get(get);
147149
keyValues = result.raw();
148150
assertEquals(6, keyValues.length);
151+
152+
batchLsit.clear();
153+
final long[] updateCounter = new long[] { 0L };
154+
delete = new Delete(toBytes("Key5"));
155+
delete.deleteColumns(family1, family1_column2);
156+
delete.deleteColumns(family2, family2_column1);
157+
delete.deleteFamily(family3);
158+
batchLsit.add(delete);
159+
for (int i = 0; i < rows; ++i) {
160+
Put put = new Put(toBytes("Key" + i));
161+
put.add(family1, family1_column1, family1_value);
162+
put.add(family1, family1_column2, family1_value);
163+
put.add(family1, family1_column3, family1_value);
164+
put.add(family2, family2_column1, family2_value);
165+
put.add(family2, family2_column2, family2_value);
166+
put.add(family3, family3_column1, family3_value);
167+
batchLsit.add(put);
168+
}
169+
hTable.batchCallback(batchLsit, new Batch.Callback<MutationResult>() {
170+
@Override
171+
public void update(byte[] region, byte[] row, MutationResult result) {
172+
updateCounter[0]++;
173+
}
174+
});
175+
assertEquals(11, updateCounter[0]);
176+
149177
}
150178

151179
@Test

0 commit comments

Comments
 (0)