Skip to content

Commit 667a16f

Browse files
authored
Merge pull request #252 from oceanbase/fix_hregion_locator_not_return_leader
fix getRegion not return leader host
2 parents c27889e + b23214a commit 667a16f

File tree

2 files changed

+40
-30
lines changed

2 files changed

+40
-30
lines changed

src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocator.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,18 @@
2828

2929
import java.io.IOException;
3030
import java.util.List;
31+
import java.util.stream.Collectors;
3132

3233
public class OHRegionLocator implements RegionLocator {
33-
private byte[][] startKeys;
34-
private byte[][] endKeys;
35-
private ObTableClient tableClient;
36-
private TableName tableName;
34+
private byte[][] startKeys;
35+
private byte[][] endKeys;
36+
private final ObTableClient tableClient;
37+
private final TableName tableName;
3738

38-
private List<HRegionLocation> regionLocations;
39+
private List<Pair<HRegionLocation, Boolean>> regionLocations;
3940

4041
public OHRegionLocator(byte[][] startKeys, byte[][] endKeys,
41-
List<HRegionLocation> regionLocations, TableName tableName,
42+
List<Pair<HRegionLocation, Boolean>> regionLocations, TableName tableName,
4243
ObTableClient tableClient) {
4344
this.startKeys = startKeys;
4445
this.endKeys = endKeys;
@@ -50,24 +51,28 @@ public OHRegionLocator(byte[][] startKeys, byte[][] endKeys,
5051
@Override
5152
public HRegionLocation getRegionLocation(byte[] bytes) throws IOException {
5253
// check if bytes is in the range of startKeys and endKeys
53-
for (HRegionLocation regionLocation : regionLocations) {
54-
if (regionLocation.getRegionInfo().containsRow(bytes)) {
55-
return regionLocation;
54+
for (Pair<HRegionLocation, Boolean> pair : regionLocations) {
55+
if (pair.getSecond() && pair.getFirst().getRegionInfo().containsRow(bytes)) {
56+
return pair.getFirst();
5657
}
5758
}
5859
return null;
5960
}
6061

62+
public List<Pair<HRegionLocation, Boolean>> getRegionLocationPair() {
63+
return regionLocations;
64+
}
65+
6166
@Override
6267
public HRegionLocation getRegionLocation(byte[] bytes, boolean b) throws IOException {
6368
if (b || regionLocations.isEmpty()) {
6469
OHRegionLocatorExecutor executor = new OHRegionLocatorExecutor(tableName.toString(),
6570
tableClient);
6671
try {
67-
RegionLocator location = executor.getRegionLocator(tableName.toString());
72+
OHRegionLocator location = executor.getRegionLocator(tableName.toString());
6873
this.startKeys = location.getStartKeys();
6974
this.endKeys = location.getEndKeys();
70-
this.regionLocations = location.getAllRegionLocations();
75+
this.regionLocations = location.getRegionLocationPair();
7176
} catch (IOException e) {
7277
if (e.getCause() instanceof ObTableTransportException
7378
&& ((ObTableTransportException) e.getCause()).getErrorCode() == TransportCodes.BOLT_TIMEOUT) {
@@ -82,7 +87,7 @@ public HRegionLocation getRegionLocation(byte[] bytes, boolean b) throws IOExcep
8287

8388
@Override
8489
public List<HRegionLocation> getAllRegionLocations() throws IOException {
85-
return regionLocations;
90+
return regionLocations.stream().map(Pair::getFirst).collect(Collectors.toList());
8691
}
8792

8893
/**

src/main/java/com/alipay/oceanbase/hbase/util/OHRegionLocatorExecutor.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.alipay.oceanbase.rpc.meta.ObTableMetaResponse;
2929
import com.alipay.oceanbase.rpc.meta.ObTableRpcMetaType;
3030
import org.apache.hadoop.hbase.*;
31+
import org.apache.hadoop.hbase.util.Pair;
3132

3233
import java.io.IOException;
3334
import java.util.*;
@@ -72,25 +73,25 @@ public OHRegionLocator parse(ObTableMetaResponse response) throws IOException {
7273
"partitions": [
7374
// 表1001的3个分区,每个分区3副本
7475
[0, 50001, "rowkey_1", 0, 1], // leader
75-
[0, 50001, "rowkey_1", 1, 0], // follower
76-
[0, 50001, "rowkey_1", 2, 0], // follower
76+
[0, 50001, "rowkey_1", 1, 2], // follower
77+
[0, 50001, "rowkey_1", 2, 2], // follower
7778
[0, 50002, "rowkey_2", 0, 1],
78-
[0, 50002, "rowkey_2", 1, 0],
79-
[0, 50002, "rowkey_2", 2, 0],
79+
[0, 50002, "rowkey_2", 1, 2],
80+
[0, 50002, "rowkey_2", 2, 2],
8081
[0, 50003, "rowkey_3", 0, 1],
81-
[0, 50003, "rowkey_3", 1, 0],
82-
[0, 50003, "rowkey_3", 2, 0],
82+
[0, 50003, "rowkey_3", 1, 2],
83+
[0, 50003, "rowkey_3", 2, 2],
8384
8485
// 表1002的3个分区,每个分区3副本
8586
[1, 50004, "rowkey_1", 0, 1],
86-
[1, 50004, "rowkey_1", 1, 0],
87-
[1, 50004, "rowkey_1", 2, 0],
87+
[1, 50004, "rowkey_1", 1, 2],
88+
[1, 50004, "rowkey_1", 2, 2],
8889
[1, 50005, "rowkey_2", 0, 1],
89-
[1, 50005, "rowkey_2", 1, 0],
90-
[1, 50005, "rowkey_2", 2, 0],
90+
[1, 50005, "rowkey_2", 1, 2],
91+
[1, 50005, "rowkey_2", 2, 2],
9192
[1, 50006, "rowkey_3", 0, 1],
92-
[1, 50006, "rowkey_3", 1, 0],
93-
[1, 50006, "rowkey_3", 2, 0]
93+
[1, 50006, "rowkey_3", 1, 2],
94+
[1, 50006, "rowkey_3", 2, 2]
9495
]
9596
}
9697
*/
@@ -175,7 +176,7 @@ private OHRegionLocator createRangePartitionLocator(
175176
final byte[][] startKeys = startKeysList.toArray(new byte[0][]);
176177
final byte[][] endKeys = endKeysList.toArray(new byte[0][]);
177178
// Create region locations for all regions in one table
178-
final List<HRegionLocation> regionLocations = IntStream.range(0, regionCountPerTable)
179+
final List regionLocations = IntStream.range(0, regionCountPerTable)
179180
.mapToObj(i -> {
180181
final List<Object> partition = (List<Object>) partitions.get(Math.min(i, regionCountPerTable - 1));
181182
final int replicationIdx = (int) partition.get(3);
@@ -192,7 +193,9 @@ private OHRegionLocator createRangePartitionLocator(
192193
startKeys[boundIndex],
193194
endKeys[boundIndex]
194195
);
195-
return new HRegionLocation(regionInfo, serverName, i);
196+
HRegionLocation location = new HRegionLocation(regionInfo, serverName, i);
197+
Boolean role = (int) partition.get(4) == 1;
198+
return new Pair(location, role);
196199
})
197200
.collect(Collectors.toList());
198201

@@ -215,8 +218,8 @@ private OHRegionLocator createHashPartitionLocator(
215218
final byte[][] endKeys = new byte[1][];
216219
startKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
217220
endKeys[0] = HConstants.EMPTY_BYTE_ARRAY;
218-
219-
final List<HRegionLocation> regionLocations = IntStream.range(0, partitions.size())
221+
final int regionCountPerTable = partitions.size() / tableIdDict.size();
222+
final List regionLocations = IntStream.range(0, regionCountPerTable)
220223
.mapToObj(i -> {
221224
final List<Object> partition = (List<Object>) partitions.get(i);
222225
final int replicationIdx = (int) partition.get(3);
@@ -232,10 +235,12 @@ private OHRegionLocator createHashPartitionLocator(
232235
startKeys[0],
233236
endKeys[0]
234237
);
235-
return new HRegionLocation(regionInfo, serverName, i);
238+
HRegionLocation location = new HRegionLocation(regionInfo, serverName, i);
239+
Boolean role = (int) partition.get(4) == 1;
240+
return new Pair(location, role);
236241
})
237242
.collect(Collectors.toList());
238-
243+
239244
return new OHRegionLocator(startKeys, endKeys, regionLocations, TableName.valueOf(tableName), client);
240245
}
241246

0 commit comments

Comments
 (0)