Skip to content

Commit 5fedabf

Browse files
committed
fix bug
1 parent d67cbf5 commit 5fedabf

File tree

4 files changed

+114
-21
lines changed

4 files changed

+114
-21
lines changed

pom.xml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
</distributionManagement>
4646

4747
<properties>
48-
<hadoop.version>2.5.1</hadoop.version>
48+
<hadoop.version>2.7.7</hadoop.version>
4949
<hbase.version>2.0.6</hbase.version>
5050
<java.source.version>1.8</java.source.version>
5151
<java.target.version>1.8</java.target.version>
@@ -103,10 +103,6 @@
103103
<artifactId>jettison</artifactId>
104104
<groupId>org.codehaus.jettison</groupId>
105105
</exclusion>
106-
<exclusion>
107-
<artifactId>commons-collections</artifactId>
108-
<groupId>commons-collections</groupId>
109-
</exclusion>
110106
</exclusions>
111107
</dependency>
112108
<dependency>

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -750,6 +750,12 @@ private void doPut(List<Put> puts) throws IOException {
750750

751751
// we need to periodically see if the writebuffer is full instead of waiting until the end of the List
752752
n++;
753+
if (n % putWriteBufferCheck == 0 && currentWriteBufferSize > writeBufferSize) {
754+
flushCommits();
755+
}
756+
}
757+
if (autoFlush || currentWriteBufferSize > writeBufferSize) {
758+
flushCommits();
753759
}
754760
}
755761

@@ -1142,6 +1148,106 @@ public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, lo
11421148
return incrementColumnValue(row, family, qualifier, amount);
11431149
}
11441150

1151+
public void flushCommits() throws IOException {
1152+
1153+
try {
1154+
boolean[] resultSuccess = new boolean[writeBuffer.size()];
1155+
try {
1156+
Map<String, Pair<List<Integer>, List<Cell>>> familyMap = new HashMap<String, Pair<List<Integer>, List<Cell>>>();
1157+
for (int i = 0; i < writeBuffer.size(); i++) {
1158+
Put aPut = writeBuffer.get(i);
1159+
Map<byte[], List<Cell>> innerFamilyMap = aPut.getFamilyCellMap();
1160+
if (innerFamilyMap.size() > 1) {
1161+
// Bypass logic: directly construct BatchOperation for puts with family map size > 1
1162+
try {
1163+
BatchOperation batch = buildBatchOperation(this.tableNameString,
1164+
innerFamilyMap, false, null);
1165+
BatchOperationResult results = batch.execute();
1166+
1167+
boolean hasError = results.hasError();
1168+
resultSuccess[i] = !hasError;
1169+
if (hasError) {
1170+
throw results.getFirstException();
1171+
}
1172+
} catch (Exception e) {
1173+
logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush,
1174+
writeBuffer.size(), e);
1175+
throw new IOException("put table " + tableNameString + " error codes "
1176+
+ null + "auto flush " + autoFlush
1177+
+ " current buffer size " + writeBuffer.size(), e);
1178+
}
1179+
} else {
1180+
// Existing logic for puts with family map size = 1
1181+
for (Map.Entry<byte[], List<Cell>> entry : innerFamilyMap.entrySet()) {
1182+
String family = Bytes.toString(entry.getKey());
1183+
Pair<List<Integer>, List<Cell>> keyValueWithIndex = familyMap
1184+
.get(family);
1185+
if (keyValueWithIndex == null) {
1186+
keyValueWithIndex = new Pair<List<Integer>, List<Cell>>(
1187+
new ArrayList<Integer>(), new ArrayList<Cell>());
1188+
familyMap.put(family, keyValueWithIndex);
1189+
}
1190+
keyValueWithIndex.getFirst().add(i);
1191+
keyValueWithIndex.getSecond().addAll(entry.getValue());
1192+
}
1193+
}
1194+
}
1195+
for (Map.Entry<String, Pair<List<Integer>, List<Cell>>> entry : familyMap
1196+
.entrySet()) {
1197+
List<Integer> errorCodeList = new ArrayList<Integer>(entry.getValue()
1198+
.getSecond().size());
1199+
try {
1200+
String targetTableName = getTargetTableName(this.tableNameString,
1201+
entry.getKey(), configuration);
1202+
1203+
BatchOperation batch = buildBatchOperation(targetTableName, entry
1204+
.getValue().getSecond(), false, null);
1205+
BatchOperationResult results = batch.execute();
1206+
1207+
errorCodeList = results.getErrorCodeList();
1208+
boolean hasError = results.hasError();
1209+
for (Integer index : entry.getValue().getFirst()) {
1210+
resultSuccess[index] = !hasError;
1211+
}
1212+
if (hasError) {
1213+
throw results.getFirstException();
1214+
}
1215+
} catch (Exception e) {
1216+
logger.error(LCD.convert("01-00008"), tableNameString, errorCodeList,
1217+
autoFlush, writeBuffer.size(), e);
1218+
throw new IOException("put table " + tableNameString + " error codes "
1219+
+ errorCodeList + "auto flush " + autoFlush
1220+
+ " current buffer size " + writeBuffer.size(), e);
1221+
}
1222+
1223+
}
1224+
1225+
} finally {
1226+
// mutate list so that it is empty for complete success, or contains
1227+
// only failed records results are returned in the same order as the
1228+
// requests in list walk the list backwards, so we can remove from list
1229+
// without impacting the indexes of earlier members
1230+
for (int i = resultSuccess.length - 1; i >= 0; i--) {
1231+
if (resultSuccess[i]) {
1232+
// successful Puts are removed from the list here.
1233+
writeBuffer.remove(i);
1234+
}
1235+
}
1236+
}
1237+
} finally {
1238+
if (clearBufferOnFail) {
1239+
writeBuffer.clear();
1240+
currentWriteBufferSize = 0;
1241+
} else {
1242+
// the write buffer was adjusted by processBatchOfPuts
1243+
currentWriteBufferSize = 0;
1244+
for (Put aPut : writeBuffer) {
1245+
currentWriteBufferSize += aPut.heapSize();
1246+
}
1247+
}
1248+
}
1249+
}
1250+
11451251
@Override
11461252
public void close() throws IOException {
11471253
if (cleanupPoolOnClose) {

src/main/java/com/alipay/oceanbase/hbase/util/OHConnectionImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public class OHConnectionImpl implements Connection {
5656

5757
private final OHConnectionConfiguration connectionConfig;
5858

59-
OHConnectionImpl(Configuration conf, final boolean managed, ExecutorService pool,
59+
OHConnectionImpl(Configuration conf, ExecutorService pool,
6060
final User user) throws IOException {
6161
this.conf = conf;
6262
this.batchPool = pool;

src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ PRIMARY KEY (`K`, `Q`, `T`)
105105
}
106106

107107
// test scan with empty family
108-
Scan scan = new Scan();
108+
Scan scan = new Scan(toBytes(key));
109109
ResultScanner scanner = hTable.getScanner(scan);
110110
for (Result result : scanner) {
111111
for (Cell keyValue : result.rawCells()) {
@@ -189,10 +189,8 @@ private void testBasic(String family) throws Exception {
189189
try {
190190
for (int j = 0; j < 10; j++) {
191191
put = new Put((key + "_" + j).getBytes());
192-
kv = new KeyValue(toBytes(key), family.getBytes(), column1.getBytes(), timestamp + 2, toBytes(value));
193-
put.add(kv);
194-
kv = new KeyValue(toBytes(key), family.getBytes(), column2.getBytes(), timestamp + 2, toBytes(value));
195-
put.add(kv);
192+
put.addColumn(family.getBytes(), column1.getBytes(), timestamp + 2, toBytes(value));
193+
put.addColumn(family.getBytes(), column2.getBytes(), timestamp + 2, toBytes(value));
196194
hTable.put(put);
197195
}
198196

@@ -230,6 +228,7 @@ private void testBasic(String family) throws Exception {
230228
// scan.setBatch(1);
231229

232230
scan.setMaxVersions(9);
231+
scanner = hTable.getScanner(scan);
233232
i = 0;
234233
count = 0;
235234
for (Result result : scanner) {
@@ -1041,7 +1040,7 @@ public void testRowRangeFilter() throws Exception {
10411040
for (Result result : scanner) {
10421041
for (Cell keyValue : result.rawCells()) {
10431042
System.out.printf("Rowkey: %s, Column Family: %s, Column Qualifier: %s, Timestamp: %d, Value: %s%n",
1044-
Bytes.toString(result.getRow()),
1043+
Bytes.toString(CellUtil.cloneRow(keyValue)),
10451044
Bytes.toString(CellUtil.cloneFamily(keyValue)),
10461045
Bytes.toString(CellUtil.cloneQualifier(keyValue)),
10471046
keyValue.getTimestamp(),
@@ -3204,13 +3203,6 @@ public void testGet() throws Exception {
32043203
// | zScanKey2 | column1 | -1729223352450 | value1 |
32053204
// +-----------+---------+----------------+--------+
32063205

3207-
// test closestRowBefore
3208-
get = new Get("scanKey2x2".getBytes());
3209-
get.addFamily(family.getBytes());
3210-
get.setClosestRowBefore(true);
3211-
r = hTable.get(get);
3212-
assertEquals(key2, Bytes.toString(r.getRow()));
3213-
32143206
// test exists
32153207
LinkedList<Get> gets = new LinkedList<>();
32163208
Get get1 = new Get(key1.getBytes());
@@ -5291,9 +5283,8 @@ public void testPutColumnFamilyNotExists() throws Exception {
52915283
@Test
52925284
public void testGetColumnFamilyNull() throws Exception {
52935285
Get get = new Get(("key_c_f").getBytes());
5294-
expectedException.expect(NullPointerException.class);
52955286
get.addFamily(null);
5296-
expectedException.expect(FeatureNotSupportedException.class);
5287+
expectedException.expect(IllegalArgumentException.class);
52975288
expectedException.expectMessage("family is empty");
52985289
hTable.get(get);
52995290
}

0 commit comments

Comments
 (0)