Skip to content

Commit 51374c7

Browse files
shenyunlongeemjwu
andauthored
lsop (#41)
* adapt ls_op_batch in put and delete (#21) * adaptoin ls_op_batch * upgrate table.client.version to 1.2.9-SNAPSHOT * add test case * [Fix] conflict resolution error * [Fix] compile error cuz of duplicate import and format code --------- Co-authored-by: eemjwu <34029771+eemjwu@users.noreply.github.com>
1 parent a8bb5d9 commit 51374c7

File tree

10 files changed

+304
-97
lines changed

10 files changed

+304
-97
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<project.build.sourceEncoding>${project.encoding}</project.build.sourceEncoding>
5555
<project.encoding>UTF-8</project.encoding>
5656
<slf4j.version>1.7.21</slf4j.version>
57-
<table.client.version>1.2.10</table.client.version>
57+
<table.client.version>1.2.12</table.client.version>
5858
</properties>
5959

6060
<dependencies>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 112 additions & 62 deletions
Large diffs are not rendered by default.

src/main/java/com/alipay/oceanbase/hbase/OHTableClient.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,20 @@ public CoprocessorRpcChannel coprocessorService(byte[] row) {
115115
}
116116

117117
@Override
118-
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
118+
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
119+
byte[] startKey, byte[] endKey,
120+
Batch.Call<T, R> callable)
121+
throws ServiceException,
122+
Throwable {
119123
throw new FeatureNotSupportedException("not supported yet'");
120124
}
121125

122126
@Override
123-
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable {
127+
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
128+
byte[] endKey, Batch.Call<T, R> callable,
129+
Batch.Callback<R> callback)
130+
throws ServiceException,
131+
Throwable {
124132
throw new FeatureNotSupportedException("not supported yet'");
125133
}
126134

@@ -130,7 +138,6 @@ private void checkStatus() throws IllegalStateException {
130138
}
131139
}
132140

133-
134141
@Override
135142
public void setAutoFlush(boolean autoFlush) {
136143
checkStatus();
@@ -161,17 +168,30 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
161168
}
162169

163170
@Override
164-
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
171+
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
172+
Message request,
173+
byte[] startKey,
174+
byte[] endKey,
175+
R responsePrototype)
176+
throws ServiceException,
177+
Throwable {
165178
throw new FeatureNotSupportedException("not supported yet'");
166179
}
167180

168181
@Override
169-
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable {
182+
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
183+
Message request, byte[] startKey,
184+
byte[] endKey, R responsePrototype,
185+
Batch.Callback<R> callback)
186+
throws ServiceException,
187+
Throwable {
170188
throw new FeatureNotSupportedException("not supported yet'");
171189
}
172190

173191
@Override
174-
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
192+
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
193+
CompareFilter.CompareOp compareOp, byte[] value,
194+
RowMutations mutation) throws IOException {
175195
throw new FeatureNotSupportedException("not supported yet'");
176196
}
177197

@@ -225,12 +245,16 @@ public Object[] batch(List<? extends Row> actions) throws IOException, Interrupt
225245
}
226246

227247
@Override
228-
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
248+
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
249+
Batch.Callback<R> callback) throws IOException,
250+
InterruptedException {
229251
throw new FeatureNotSupportedException("not supported yet'");
230252
}
231253

232254
@Override
233-
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
255+
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
256+
throws IOException,
257+
InterruptedException {
234258
throw new FeatureNotSupportedException("not supported yet'");
235259
}
236260

@@ -336,7 +360,8 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
336360
}
337361

338362
@Override
339-
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
363+
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
364+
Durability durability) throws IOException {
340365
throw new FeatureNotSupportedException("not supported yet'");
341366
}
342367

src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -714,12 +714,16 @@ public Object[] batch(List<? extends Row> actions) throws IOException, Interrupt
714714
}
715715

716716
@Override
717-
public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
717+
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
718+
Batch.Callback<R> callback) throws IOException,
719+
InterruptedException {
718720
throw new FeatureNotSupportedException("not supported yet'");
719721
}
720722

721723
@Override
722-
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException {
724+
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
725+
throws IOException,
726+
InterruptedException {
723727
throw new FeatureNotSupportedException("not supported yet'");
724728
}
725729

@@ -798,7 +802,8 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
798802
}
799803

800804
@Override
801-
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
805+
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
806+
Durability durability) throws IOException {
802807
throw new FeatureNotSupportedException("not supported yet'");
803808
}
804809

@@ -833,16 +838,25 @@ public CoprocessorRpcChannel coprocessorService(byte[] row) {
833838
}
834839

835840
@Override
836-
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws ServiceException, Throwable {
841+
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
842+
byte[] startKey,
843+
byte[] endKey,
844+
Batch.Call<T, R> callable)
845+
throws ServiceException,
846+
Throwable {
837847
throw new FeatureNotSupportedException("not supported yet'");
838848
}
839849

840850
@Override
841-
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable {
851+
public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey,
852+
byte[] endKey,
853+
Batch.Call<T, R> callable,
854+
Batch.Callback<R> callback)
855+
throws ServiceException,
856+
Throwable {
842857
throw new FeatureNotSupportedException("not supported yet'");
843858
}
844859

845-
846860
@Override
847861
public String toString() {
848862
return "PooledOHTable{" + ", table=" + table + '}';
@@ -893,17 +907,30 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
893907
}
894908

895909
@Override
896-
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable {
910+
public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
911+
Message request,
912+
byte[] startKey,
913+
byte[] endKey,
914+
R responsePrototype)
915+
throws ServiceException,
916+
Throwable {
897917
throw new FeatureNotSupportedException("not supported yet'");
898918
}
899919

900920
@Override
901-
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws ServiceException, Throwable {
921+
public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor,
922+
Message request, byte[] startKey,
923+
byte[] endKey, R responsePrototype,
924+
Batch.Callback<R> callback)
925+
throws ServiceException,
926+
Throwable {
902927
throw new FeatureNotSupportedException("not supported yet'");
903928
}
904929

905930
@Override
906-
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, byte[] value, RowMutations mutation) throws IOException {
931+
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
932+
CompareFilter.CompareOp compareOp, byte[] value,
933+
RowMutations mutation) throws IOException {
907934
throw new FeatureNotSupportedException("not supported yet'");
908935
}
909936

src/main/java/com/alipay/oceanbase/hbase/constants/OHConstants.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ public final class OHConstants {
8282
public static final String[] ALL_COLUMNS = new String[] { "K",
8383
"Q", "T", "V" };
8484

85+
/**
86+
* ocenbase hbase model rowkey column is consist of following column
87+
* K, Q, T hbase value
88+
*/
89+
public static final String[] ROW_KEY_COLUMNS = new String[] { "K",
90+
"Q", "T" };
91+
8592
/**
8693
* ocenbase hbase model value column is consist of following column
8794
* V hbase value

src/main/java/com/alipay/oceanbase/hbase/filter/HBaseFilterUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ private static String toParseableString(PageFilter filter) {
115115
}
116116

117117
private static String toParseableString(ColumnPaginationFilter filter) {
118-
return filter.getClass().getSimpleName() + '(' + filter.getLimit() + ',' + filter.getOffset() + ')';
118+
return filter.getClass().getSimpleName() + '(' + filter.getLimit() + ','
119+
+ filter.getOffset() + ')';
119120
}
120121

121122
private static String toParseableString(ColumnCountGetFilter filter) {

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,100 @@ public void testMultiPut() throws IOException {
353353
hTable.put(puts);
354354
}
355355

356+
@Test
357+
public void testMultiPartitionPut() throws IOException {
358+
String[] keys = new String[] { "putKey1", "putKey2", "putKey3", "putKey4", "putKey5",
359+
"putKey6", "putKey7", "putKey8", "putKey9", "putKey10" };
360+
361+
String column1 = "column1";
362+
String column2 = "column2";
363+
String column3 = "column3";
364+
String value = "value";
365+
String family = "familyPartition";
366+
// put
367+
{
368+
List<Put> puts = new ArrayList<Put>();
369+
for (String key : keys) {
370+
Put put = new Put(Bytes.toBytes(key));
371+
put.add(toBytes(family), toBytes(column1), toBytes(value));
372+
put.add(toBytes(family), toBytes(column2), System.currentTimeMillis(),
373+
toBytes(value));
374+
puts.add(put);
375+
}
376+
377+
for (String key : keys) {
378+
// put same k, q, t
379+
Put put = new Put(Bytes.toBytes(key));
380+
put.add(toBytes(family), toBytes(column3), 100L, toBytes(value));
381+
put.add(toBytes(family), toBytes(column3), 100L, toBytes(value));
382+
puts.add(put);
383+
}
384+
hTable.put(puts);
385+
}
386+
// get
387+
{
388+
List<Get> gets = new ArrayList<Get>();
389+
for (String key : keys) {
390+
Get get = new Get(Bytes.toBytes(key));
391+
get.addColumn(toBytes(family), toBytes(column1));
392+
get.addColumn(toBytes(family), toBytes(column2));
393+
get.addColumn(toBytes(family), toBytes(column3));
394+
gets.add(get);
395+
}
396+
Result[] res = hTable.get(gets);
397+
assertEquals(res.length, 10);
398+
assertEquals(res[0].raw().length, 3);
399+
}
400+
}
401+
402+
@Test
403+
public void testMultiPartitionDel() throws IOException {
404+
String[] keys = new String[] { "putKey1", "putKey2", "putKey3", "putKey4", "putKey5",
405+
"putKey6", "putKey7", "putKey8", "putKey9", "putKey10" };
406+
407+
String column1 = "column1";
408+
String column2 = "column2";
409+
String column3 = "column3";
410+
String value = "value";
411+
String family = "familyPartition";
412+
// delete
413+
{
414+
List<Delete> deletes = new ArrayList<Delete>();
415+
for (String key : keys) {
416+
Delete del = new Delete(Bytes.toBytes(key));
417+
del.deleteColumns(toBytes(family), toBytes(column1));
418+
del.deleteColumns(toBytes(family), toBytes(column2), System.currentTimeMillis());
419+
deletes.add(del);
420+
}
421+
422+
for (String key : keys) {
423+
// del same k, q, t
424+
Delete del = new Delete(Bytes.toBytes(key));
425+
del.deleteColumn(toBytes(family), toBytes(column3), 100L);
426+
del.deleteColumn(toBytes(family), toBytes(column3), 100L);
427+
deletes.add(del);
428+
}
429+
hTable.delete(deletes);
430+
}
431+
// get
432+
{
433+
List<Get> gets = new ArrayList<Get>();
434+
for (String key : keys) {
435+
Get get = new Get(Bytes.toBytes(key));
436+
get.addColumn(toBytes(family), toBytes(column1));
437+
get.addColumn(toBytes(family), toBytes(column2));
438+
get.addColumn(toBytes(family), toBytes(column3));
439+
gets.add(get);
440+
}
441+
Result[] res = hTable.get(gets);
442+
assertEquals(res.length, 10);
443+
int i = 0;
444+
for (i = 0; i < res.length; ++i) {
445+
assertEquals(res[i].raw().length, 0);
446+
}
447+
}
448+
}
449+
356450
public void tryPut(HTableInterface hTable, Put put) throws Exception {
357451
hTable.put(put);
358452
Thread.sleep(1);
@@ -622,7 +716,6 @@ public void testGetFilter() throws Exception {
622716
tryPut(hTable, putKey2Column2Value1);
623717
tryPut(hTable, putKey2Column2Value2);
624718

625-
626719
// show table (time maybe different)
627720
//+---------+---------+----------------+--------+
628721
//| K | Q | T | V |
@@ -738,7 +831,7 @@ public void testGetFilter() throws Exception {
738831
filterList = new FilterList();
739832
emptyFilterList = new FilterList();
740833
filterList.addFilter(emptyFilterList);
741-
filterList.addFilter(new ColumnPaginationFilter(3,1));
834+
filterList.addFilter(new ColumnPaginationFilter(3, 1));
742835
get = new Get(toBytes(key1));
743836
get.setMaxVersions(10);
744837
get.addFamily(toBytes(family));
@@ -1172,7 +1265,6 @@ public void testScan() throws Exception {
11721265
hTable.delete(deleteZKey2Family);
11731266
}
11741267

1175-
11761268
@Test
11771269
public void testReversedScan() throws Exception {
11781270
String key1 = "scanKey1x";

0 commit comments

Comments
 (0)