Skip to content

Commit 1b584ad

Browse files
maochongxinstuBirdFly
authored andcommitted
hbase_multi_column_family_dev (#67)
* hbase_multi_column_family_dev * fix * fix revierw
1 parent e040d75 commit 1b584ad

File tree

3 files changed

+826
-67
lines changed

3 files changed

+826
-67
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 187 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -489,9 +489,27 @@ private String getTargetTableName(String tableNameString) {
489489
return tableNameString;
490490
}
491491

492+
// To enable the server to identify the column family to which a qualifier belongs,
493+
// the client writes the column family name into the qualifier.
494+
// The server then parses this information to determine the table that needs to be operated on.
495+
private void processColumnFilters(NavigableSet<byte[]> columnFilters, Map<byte[], NavigableSet<byte[]>> familyMap) {
496+
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
497+
if (entry.getValue() != null) {
498+
for (byte[] columnName : entry.getValue()) {
499+
String columnNameStr = Bytes.toString(columnName);
500+
columnNameStr = Bytes.toString(entry.getKey()) + "." + columnNameStr;
501+
columnFilters.add(columnNameStr.getBytes());
502+
}
503+
} else {
504+
String columnNameStr = Bytes.toString(entry.getKey()) + ".";
505+
columnFilters.add(columnNameStr.getBytes());
506+
}
507+
}
508+
}
509+
492510
@Override
493511
public Result get(final Get get) throws IOException {
494-
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().size() == 0) {
512+
if (get.getFamilyMap().keySet() == null || get.getFamilyMap().keySet().isEmpty()) {
495513
// check nothing, use table group;
496514
} else {
497515
checkFamilyViolation(get.getFamilyMap().keySet());
@@ -502,29 +520,33 @@ public Result get(final Get get) throws IOException {
502520
public Result call() throws IOException {
503521
List<KeyValue> keyValueList = new ArrayList<KeyValue>();
504522
byte[] family = new byte[] {};
505-
ObTableClientQueryStreamResult clientQueryStreamResult;
506-
ObTableQueryRequest request;
523+
ObTableClientQueryAsyncStreamResult clientQueryStreamResult;
524+
ObTableQueryAsyncRequest request;
507525
ObTableQuery obTableQuery;
508526
try {
509527
if (get.getFamilyMap().keySet() == null
510-
|| get.getFamilyMap().keySet().size() == 0) {
511-
obTableQuery = buildObTableQuery(get, null);
512-
request = buildObTableQueryRequest(obTableQuery,
528+
|| get.getFamilyMap().keySet().isEmpty()
529+
|| get.getFamilyMap().size() > 1) {
530+
// In a Get operation where the family map is greater than 1 or equal to 0,
531+
// we handle this by appending the column family to the qualifier on the client side.
532+
// The server can then use this information to filter the appropriate column families and qualifiers.
533+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
534+
processColumnFilters(columnFilters, get.getFamilyMap());
535+
obTableQuery = buildObTableQuery(get, columnFilters);
536+
request = buildObTableQueryAsyncRequest(obTableQuery,
513537
getTargetTableName(tableNameString));
514538

515-
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
539+
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
516540
.execute(request);
517541
getKeyValueFromResult(clientQueryStreamResult, keyValueList, true, family);
518542
} else {
519543
for (Map.Entry<byte[], NavigableSet<byte[]>> entry : get.getFamilyMap()
520544
.entrySet()) {
521545
family = entry.getKey();
522546
obTableQuery = buildObTableQuery(get, entry.getValue());
523-
request = buildObTableQueryRequest(
524-
obTableQuery,
525-
getTargetTableName(tableNameString, Bytes.toString(family),
526-
configuration));
527-
clientQueryStreamResult = (ObTableClientQueryStreamResult) obTableClient
547+
request = buildObTableQueryAsyncRequest(obTableQuery,
548+
getTargetTableName(tableNameString, Bytes.toString(family)));
549+
clientQueryStreamResult = (ObTableClientQueryAsyncStreamResult) obTableClient
528550
.execute(request);
529551
getKeyValueFromResult(clientQueryStreamResult, keyValueList, false,
530552
family);
@@ -581,9 +603,16 @@ public ResultScanner call() throws IOException {
581603
ObTableQuery obTableQuery;
582604
ObHTableFilter filter;
583605
try {
584-
if (scan.getFamilyMap().keySet().isEmpty()) {
606+
if (scan.getFamilyMap().keySet() == null
607+
|| scan.getFamilyMap().keySet().isEmpty()
608+
|| scan.getFamilyMap().size() > 1) {
609+
// In a Scan operation where the family map is greater than 1 or equal to 0,
610+
// we handle this by appending the column family to the qualifier on the client side.
611+
// The server can then use this information to filter the appropriate column families and qualifiers.
612+
NavigableSet<byte[]> columnFilters = new TreeSet<>(Bytes.BYTES_COMPARATOR);
613+
processColumnFilters(columnFilters, scan.getFamilyMap());
585614
filter = buildObHTableFilter(scan.getFilter(), scan.getTimeRange(),
586-
scan.getMaxVersions(), null);
615+
scan.getMaxVersions(), columnFilters);
587616
obTableQuery = buildObTableQuery(filter, scan);
588617

589618
request = buildObTableQueryAsyncRequest(obTableQuery,
@@ -744,18 +773,47 @@ public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
744773

745774
private void innerDelete(Delete delete) throws IOException {
746775
checkArgument(delete.getRow() != null, "row is null");
747-
checkArgument(!delete.isEmpty(), "delete is empty");
748776
List<Integer> errorCodeList = new ArrayList<Integer>();
777+
BatchOperationResult results = null;
778+
749779
try {
750780
checkFamilyViolation(delete.getFamilyMap().keySet());
781+
if (delete.getFamilyMap().isEmpty()) {
782+
// For a Delete operation without any qualifiers, we construct a DeleteFamily request.
783+
// The server then performs the operation on all column families.
784+
KeyValue kv = new KeyValue(delete.getRow(), delete.getTimeStamp(),
785+
KeyValue.Type.DeleteFamily);
786+
787+
BatchOperation batch = buildBatchOperation(tableNameString, Arrays.asList(kv), false, null);
788+
results = batch.execute();
789+
} else if (delete.getFamilyMap().size() > 1) {
790+
// Currently, the Delete Family operation type cannot transmit qualifiers to the server.
791+
// As a result, the server cannot identify which families need to be deleted.
792+
// Therefore, this process is handled sequentially.
793+
boolean has_delete_family = delete.getFamilyMap().entrySet().stream()
794+
.flatMap(entry -> entry.getValue().stream())
795+
.anyMatch(kv -> KeyValue.Type.codeToType(kv.getType()) == KeyValue.Type.DeleteFamily);
796+
if (!has_delete_family) {
797+
BatchOperation batch = buildBatchOperation(tableNameString,
798+
delete.getFamilyMap(), false, null);
799+
results = batch.execute();
800+
} else {
801+
for (Map.Entry<byte[], List<KeyValue>> entry : delete.getFamilyMap().entrySet()) {
802+
BatchOperation batch = buildBatchOperation(
803+
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
804+
entry.getValue(), false, null);
805+
results = batch.execute();
806+
}
807+
}
808+
} else {
809+
Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet().iterator()
810+
.next();
751811

752-
Map.Entry<byte[], List<KeyValue>> entry = delete.getFamilyMap().entrySet().iterator()
753-
.next();
754-
755-
BatchOperation batch = buildBatchOperation(
756-
getTargetTableName(tableNameString, Bytes.toString(entry.getKey()), configuration),
757-
entry.getValue(), false, null);
758-
BatchOperationResult results = batch.execute();
812+
BatchOperation batch = buildBatchOperation(
813+
getTargetTableName(tableNameString, Bytes.toString(entry.getKey())),
814+
entry.getValue(), false, null);
815+
results = batch.execute();
816+
}
759817

760818
errorCodeList = results.getErrorCodeList();
761819
boolean hasError = results.hasError();
@@ -842,7 +900,7 @@ private boolean checkAndMutation(byte[] row, byte[] family, byte[] qualifier, Co
842900
List<KeyValue> keyValueList = new LinkedList<>();
843901
// only one family operation is allowed
844902
for (Mutation mutation : mutations) {
845-
checkFamilyViolation(mutation.getFamilyMap().keySet());
903+
checkFamilyViolationForOneFamily(mutation.getFamilyMap().keySet());
846904
checkArgument(Arrays.equals(family, mutation.getFamilyMap().firstEntry().getKey()),
847905
"mutation family is not equal check family");
848906
// Support for multiple families in the future
@@ -876,7 +934,7 @@ public void mutateRow(RowMutations rm) {
876934
@Override
877935
public Result append(Append append) throws IOException {
878936

879-
checkFamilyViolation(append.getFamilyMap().keySet());
937+
checkFamilyViolationForOneFamily(append.getFamilyMap().keySet());
880938
checkArgument(!append.isEmpty(), "append is empty.");
881939
try {
882940
byte[] r = append.getRow();
@@ -926,7 +984,7 @@ public Result append(Append append) throws IOException {
926984
@Override
927985
public Result increment(Increment increment) throws IOException {
928986

929-
checkFamilyViolation(increment.getFamilyMap().keySet());
987+
checkFamilyViolationForOneFamily(increment.getFamilyMap().keySet());
930988

931989
try {
932990
List<byte[]> qualifiers = new ArrayList<byte[]>();
@@ -1048,18 +1106,39 @@ public void flushCommits() throws IOException {
10481106
for (int i = 0; i < writeBuffer.size(); i++) {
10491107
Put aPut = writeBuffer.get(i);
10501108
Map<byte[], List<KeyValue>> innerFamilyMap = aPut.getFamilyMap();
1051-
// multi family can not ensure automatic
1052-
for (Map.Entry<byte[], List<KeyValue>> entry : innerFamilyMap.entrySet()) {
1053-
String family = Bytes.toString(entry.getKey());
1054-
Pair<List<Integer>, List<KeyValue>> keyValueWithIndex = familyMap
1055-
.get(family);
1056-
if (keyValueWithIndex == null) {
1057-
keyValueWithIndex = new Pair<List<Integer>, List<KeyValue>>(
1058-
new ArrayList<Integer>(), new ArrayList<KeyValue>());
1059-
familyMap.put(family, keyValueWithIndex);
1109+
if (innerFamilyMap.size() > 1) {
1110+
// Bypass logic: directly construct BatchOperation for puts with family map size > 1
1111+
try {
1112+
BatchOperation batch = buildBatchOperation(this.tableNameString,
1113+
innerFamilyMap, false, null);
1114+
BatchOperationResult results = batch.execute();
1115+
1116+
boolean hasError = results.hasError();
1117+
resultSuccess[i] = !hasError;
1118+
if (hasError) {
1119+
throw results.getFirstException();
1120+
}
1121+
} catch (Exception e) {
1122+
logger.error(LCD.convert("01-00008"), tableNameString, null, autoFlush,
1123+
writeBuffer.size(), e);
1124+
throw new IOException("put table " + tableNameString + " error codes "
1125+
+ null + "auto flush " + autoFlush
1126+
+ " current buffer size " + writeBuffer.size(), e);
1127+
}
1128+
} else {
1129+
// Existing logic for puts with family map size = 1
1130+
for (Map.Entry<byte[], List<KeyValue>> entry : innerFamilyMap.entrySet()) {
1131+
String family = Bytes.toString(entry.getKey());
1132+
Pair<List<Integer>, List<KeyValue>> keyValueWithIndex = familyMap
1133+
.get(family);
1134+
if (keyValueWithIndex == null) {
1135+
keyValueWithIndex = new Pair<List<Integer>, List<KeyValue>>(
1136+
new ArrayList<Integer>(), new ArrayList<KeyValue>());
1137+
familyMap.put(family, keyValueWithIndex);
1138+
}
1139+
keyValueWithIndex.getFirst().add(i);
1140+
keyValueWithIndex.getSecond().addAll(entry.getValue());
10601141
}
1061-
keyValueWithIndex.getFirst().add(i);
1062-
keyValueWithIndex.getSecond().addAll(entry.getValue());
10631142
}
10641143
}
10651144
for (Map.Entry<String, Pair<List<Integer>, List<KeyValue>>> entry : familyMap
@@ -1339,7 +1418,7 @@ private ObHTableFilter buildObHTableFilter(Filter filter, TimeRange timeRange, i
13391418
ObHTableFilter obHTableFilter = new ObHTableFilter();
13401419

13411420
if (filter != null) {
1342-
obHTableFilter.setFilterString(HBaseFilterUtils.toParseableString(filter));
1421+
obHTableFilter.setFilterString(HBaseFilterUtils.toParseableString(filter).getBytes());
13431422
}
13441423

13451424
if (timeRange != null) {
@@ -1379,7 +1458,7 @@ private ObHTableFilter buildObHTableFilter(String filterString, TimeRange timeRa
13791458
ObHTableFilter obHTableFilter = new ObHTableFilter();
13801459

13811460
if (filterString != null) {
1382-
obHTableFilter.setFilterString(filterString);
1461+
obHTableFilter.setFilterString(filterString.getBytes());
13831462
}
13841463

13851464
if (timeRange != null) {
@@ -1499,6 +1578,29 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<KeyValue> ke
14991578
return batch;
15001579
}
15011580

1581+
private ObTableBatchOperation buildObTableBatchOperation(Map<byte[], List<KeyValue>> familyMap,
1582+
boolean putToAppend,
1583+
List<byte[]> qualifiers) {
1584+
ObTableBatchOperation batch = new ObTableBatchOperation();
1585+
for (Map.Entry<byte[], List<KeyValue>> entry : familyMap.entrySet()) {
1586+
byte[] family = entry.getKey();
1587+
List<KeyValue> keyValueList = entry.getValue();
1588+
for (KeyValue kv : keyValueList) {
1589+
if (qualifiers != null) {
1590+
qualifiers
1591+
.add((Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier()))
1592+
.getBytes());
1593+
}
1594+
KeyValue new_kv = modifyQualifier(kv,
1595+
(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes());
1596+
batch.addTableOperation(buildObTableOperation(new_kv, putToAppend));
1597+
}
1598+
}
1599+
batch.setSameType(true);
1600+
batch.setSamePropertiesNames(true);
1601+
return batch;
1602+
}
1603+
15021604
private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
15031605
boolean putToAppend) {
15041606
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
@@ -1529,6 +1631,40 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
15291631
throw new IllegalArgumentException("illegal mutation type " + kvType);
15301632
}
15311633
}
1634+
private KeyValue modifyQualifier(KeyValue original, byte[] newQualifier) {
1635+
// Extract existing components
1636+
byte[] row = original.getRow();
1637+
byte[] family = original.getFamily();
1638+
byte[] value = original.getValue();
1639+
long timestamp = original.getTimestamp();
1640+
byte type = original.getTypeByte();
1641+
// Create a new KeyValue with the modified qualifier
1642+
return new KeyValue(row, family, newQualifier, timestamp, KeyValue.Type.codeToType(type),
1643+
value);
1644+
}
1645+
1646+
private BatchOperation buildBatchOperation(String tableName,
1647+
Map<byte[], List<KeyValue>> familyMap,
1648+
boolean putToAppend, List<byte[]> qualifiers) {
1649+
BatchOperation batch = obTableClient.batchOperation(tableName);
1650+
1651+
for (Map.Entry<byte[], List<KeyValue>> entry : familyMap.entrySet()) {
1652+
byte[] family = entry.getKey();
1653+
List<KeyValue> keyValueList = entry.getValue();
1654+
for (KeyValue kv : keyValueList) {
1655+
if (qualifiers != null) {
1656+
qualifiers.add(kv.getQualifier());
1657+
}
1658+
KeyValue new_kv = modifyQualifier(kv,
1659+
(Bytes.toString(family) + "." + Bytes.toString(kv.getQualifier())).getBytes());
1660+
batch.addOperation(buildMutation(new_kv, putToAppend));
1661+
}
1662+
}
1663+
1664+
batch.setEntityType(ObTableEntityType.HKV);
1665+
return batch;
1666+
}
1667+
15321668

15331669
private BatchOperation buildBatchOperation(String tableName, List<KeyValue> keyValueList,
15341670
boolean putToAppend, List<byte[]> qualifiers) {
@@ -1618,24 +1754,31 @@ private ObTableQueryAndMutateRequest buildObTableQueryAndMutateRequest(ObTableQu
16181754
return request;
16191755
}
16201756

1621-
public static void checkFamilyViolation(Collection<byte[]> families) {
1757+
private void checkFamilyViolation(Collection<byte[]> families) {
1758+
for (byte[] family : families) {
1759+
if (isBlank(Bytes.toString(family))) {
1760+
throw new IllegalArgumentException("family is blank");
1761+
}
1762+
}
1763+
}
1764+
1765+
1766+
// This method is currently only used for append and increment operations.
1767+
// It restricts these two methods to use multi-column family operations.
1768+
// Note: After completing operations on multiple column families, they are deleted using the method described above.
1769+
private void checkFamilyViolationForOneFamily(Collection<byte[]> families) {
16221770
if (families == null || families.size() == 0) {
16231771
throw new FeatureNotSupportedException("family is empty.");
16241772
}
16251773

16261774
if (families.size() > 1) {
16271775
throw new FeatureNotSupportedException("multi family is not supported yet.");
16281776
}
1629-
16301777
for (byte[] family : families) {
1631-
if (family == null || family.length == 0) {
1632-
throw new IllegalArgumentException("family is empty");
1633-
}
16341778
if (isBlank(Bytes.toString(family))) {
16351779
throw new IllegalArgumentException("family is blank");
16361780
}
16371781
}
1638-
16391782
}
16401783

16411784
public void refreshTableEntry(String familyString, boolean hasTestLoad) throws Exception {

0 commit comments

Comments
 (0)