Skip to content

Commit 5f90652

Browse files
committed
fix region locator in flink
1 parent 5196714 commit 5f90652

File tree

2 files changed

+33
-21
lines changed

2 files changed

+33
-21
lines changed

src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocator.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,24 @@
1010
import org.apache.hadoop.hbase.util.Pair;
1111

1212
import java.io.IOException;
13-
import java.util.Collections;
1413
import java.util.List;
1514

1615
public class OHRegionLocator implements RegionLocator {
1716
private byte[][] startKeys;
1817
private byte[][] endKeys;
18+
private ObTableClient tableClient;
1919
private TableName tableName;
2020

2121
private List<HRegionLocation> regionLocations;
2222

23-
public OHRegionLocator(byte[][] startKeys, byte[][] endKeys, List<HRegionLocation> regionLocations) {
23+
public OHRegionLocator(byte[][] startKeys, byte[][] endKeys,
24+
List<HRegionLocation> regionLocations,
25+
TableName tableName, ObTableClient tableClient) {
2426
this.startKeys = startKeys;
2527
this.endKeys = endKeys;
2628
this.regionLocations = regionLocations;
29+
this.tableName = tableName;
30+
this.tableClient = tableClient;
2731
}
2832

2933
@Override
@@ -39,7 +43,7 @@ public HRegionLocation getRegionLocation(byte[] bytes) throws IOException {
3943

4044
@Override
4145
public HRegionLocation getRegionLocation(byte[] bytes, boolean b) throws IOException {
42-
if (b) {
46+
if (b || regionLocations.isEmpty()) {
4347
OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableName.toString(), tableClient);
4448
try {
4549
RegionLocator location = executor.getRegionLocator(tableName.toString());
@@ -108,8 +112,6 @@ public TableName getName() {
108112
return tableName;
109113
}
110114

111-
private ObTableClient tableClient;
112-
113115
@Override
114116
public void close() throws IOException {
115117
return;

src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocatorExecutor.java

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.alipay.oceanbase.rpc.ObTableClient;
77
import com.alipay.oceanbase.rpc.constant.Constants;
88
import com.alipay.oceanbase.rpc.exception.ObTableException;
9+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
910
import com.alipay.oceanbase.rpc.location.model.TableEntry;
1011
import com.alipay.oceanbase.rpc.meta.ObTableMetaRequest;
1112
import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
@@ -116,28 +117,37 @@ private OHRegionLocator createRangePartitionLocator(
116117
final List<Object> tableIdDict,
117118
final List<Object> replicaDict,
118119
final List<Object> partitions) {
119-
final int partitionCount = partitions.size();
120-
final int regionCount = partitionCount + 1;
121-
final byte[][] startKeys = new byte[regionCount][];
122-
final byte[][] endKeys = new byte[regionCount][];
123-
124-
for (int i = 0; i < regionCount; i++) {
120+
if ((partitions.size() % tableIdDict.size()) != 0) {
121+
throw new ObTableUnexpectedException(
122+
"The number of partitions should be an integer multiple of the number of tables");
123+
}
124+
// the size of partitions the multiple of the number of zones, the number of tablets and the number of tables
125+
final int regionCount = partitions.size() / tableIdDict.size(); // get tablet boundaries of leaders and followers
126+
final List<byte[]> startKeysList = new ArrayList<>();
127+
final List<byte[]> endKeysList = new ArrayList<>();
128+
129+
for (int i = 0; i < regionCount; ++i) {
130+
boolean isLeader = ((int) ((List<Object>) partitions.get(i)).get(4) == 1);
131+
if (!isLeader) { // only record leader's boundary
132+
continue;
133+
}
125134
if (i == 0) {
126-
startKeys[i] = HConstants.EMPTY_BYTE_ARRAY;
127-
endKeys[i] = ((List<Object>) partitions.get(i)).get(2).toString().getBytes();
135+
startKeysList.add(HConstants.EMPTY_BYTE_ARRAY);
136+
endKeysList.add(((List<Object>) partitions.get(i)).get(2).toString().getBytes());
128137
} else if (i == regionCount - 1) {
129-
startKeys[i] = ((List<Object>) partitions.get(i - 1)).get(2).toString().getBytes();
130-
endKeys[i] = HConstants.EMPTY_BYTE_ARRAY;
138+
startKeysList.add(((List<Object>) partitions.get(i - 1)).get(2).toString().getBytes());
139+
endKeysList.add(HConstants.EMPTY_BYTE_ARRAY);
131140
} else {
132-
startKeys[i] = ((List<Object>) partitions.get(i - 1)).get(2).toString().getBytes();
133-
endKeys[i] = ((List<Object>) partitions.get(i)).get(2).toString().getBytes();
141+
startKeysList.add(((List<Object>) partitions.get(i - 1)).get(2).toString().getBytes());
142+
endKeysList.add(((List<Object>) partitions.get(i)).get(2).toString().getBytes());
134143
}
135144
}
136-
145+
final byte[][] startKeys = startKeysList.toArray(new byte[0][]);
146+
final byte[][] endKeys = endKeysList.toArray(new byte[0][]);
137147
// Create region locations for all regions
138148
final List<HRegionLocation> regionLocations = IntStream.range(0, regionCount)
139149
.mapToObj(i -> {
140-
final List<Object> partition = (List<Object>) partitions.get(Math.min(i, partitionCount - 1));
150+
final List<Object> partition = (List<Object>) partitions.get(Math.min(i, regionCount - 1));
141151
final int replicationIdx = (int) partition.get(3);
142152
final List<Object> hostInfo = (List<Object>) replicaDict.get(replicationIdx);
143153

@@ -155,7 +165,7 @@ private OHRegionLocator createRangePartitionLocator(
155165
})
156166
.collect(Collectors.toList());
157167

158-
return new OHRegionLocator(startKeys, endKeys, regionLocations);
168+
return new OHRegionLocator(startKeys, endKeys, regionLocations, TableName.valueOf(tableName), client);
159169
}
160170

161171
/**
@@ -195,7 +205,7 @@ private OHRegionLocator createHashPartitionLocator(
195205
})
196206
.collect(Collectors.toList());
197207

198-
return new OHRegionLocator(startKeys, endKeys, regionLocations);
208+
return new OHRegionLocator(startKeys, endKeys, regionLocations, TableName.valueOf(tableName), client);
199209
}
200210

201211
public OHRegionLocator getRegionLocator(final String tableName) throws IOException {

0 commit comments

Comments
 (0)