1
1
package com .alipay .oceanbase .hbase .util ;
2
2
3
3
import com .alibaba .fastjson .JSON ;
4
+ import com .alibaba .fastjson .JSONObject ;
4
5
import com .alipay .oceanbase .hbase .execute .AbstractObTableMetaExecutor ;
5
6
import com .alipay .oceanbase .rpc .ObTableClient ;
7
+ import com .alipay .oceanbase .rpc .constant .Constants ;
6
8
import com .alipay .oceanbase .rpc .exception .ObTableException ;
7
9
import com .alipay .oceanbase .rpc .location .model .TableEntry ;
8
10
import com .alipay .oceanbase .rpc .meta .ObTableMetaRequest ;
9
11
import com .alipay .oceanbase .rpc .meta .ObTableMetaResponse ;
10
12
import com .alipay .oceanbase .rpc .meta .ObTableRpcMetaType ;
13
+ import org .apache .hadoop .hbase .*;
11
14
12
15
import java .io .IOException ;
13
- import java .util .HashMap ;
14
- import java .util .Map ;
16
+ import java .util .*;
17
+ import java .util .stream .Collectors ;
18
+ import java .util .stream .IntStream ;
15
19
16
20
public class OHRegionLocatorExecutor extends AbstractObTableMetaExecutor <OHRegionLocator > {
17
- OHRegionLocatorExecutor (ObTableClient client ) {
21
+ private final String tableName ;
22
+ private final ObTableClient client ;
23
+
24
+ OHRegionLocatorExecutor (String tableName , ObTableClient client ) {
25
+ this .tableName = tableName ;
18
26
this .client = client ;
19
27
}
20
28
@@ -26,24 +34,140 @@ public ObTableRpcMetaType getMetaType() {
26
34
@ Override
27
35
public OHRegionLocator parse (ObTableMetaResponse response ) throws IOException {
28
36
try {
29
- String jsonData = response .getData ();
30
- // process json
31
- return new OHRegionLocator (null , null );
37
+ final String jsonData = response .getData ();
38
+ final JSONObject jsonMap = Optional .<JSONObject >ofNullable (JSON .parseObject (jsonData ))
39
+ .orElseThrow (() -> new IOException ("jsonMap is null" ));
40
+
41
+ final List <Object > partitions = Optional .<List <Object >>ofNullable (jsonMap .getJSONArray ("partitions" ))
42
+ .orElseThrow (() -> new IOException ("partitions is null" ));
43
+
44
+ final List <Object > tableIdDict = Optional .<List <Object >>ofNullable (jsonMap .getJSONArray ("table_id_dict" ))
45
+ .orElseThrow (() -> new IOException ("tableIdDict is null" ));
46
+ final List <Object > replicaDict = Optional .<List <Object >>ofNullable (jsonMap .getJSONArray ("replica_dict" ))
47
+ .orElseThrow (() -> new IOException ("replicaDict is null" ));
48
+
49
+ final boolean isHashLikePartition = partitions .stream ()
50
+ .map (obj -> (List <Object >) obj )
51
+ .filter (partition -> {
52
+ if (partition .size () <= 3 ) {
53
+ throw new IllegalArgumentException ("partition size is not 3" );
54
+ }
55
+ return true ;
56
+ })
57
+ .allMatch (partition -> {
58
+ final byte [] highBound = partition .get (2 ).toString ().getBytes ();
59
+ return Arrays .equals (highBound , Constants .EMPTY_STRING .getBytes ());
60
+ });
61
+ return isHashLikePartition ?
62
+ createHashPartitionLocator (tableIdDict , replicaDict , partitions ) :
63
+ createRangePartitionLocator (tableIdDict , replicaDict , partitions );
32
64
} catch (IllegalArgumentException e ) {
33
- throw new IOException ("msg" , e );
65
+ throw new IOException ("Invalid partition data: " + e .getMessage (), e );
66
+ }
67
+ }
68
+
69
+ /**
70
+ * Creates a region locator for range partitions
71
+ * @param tableIdDict table ID dictionary
72
+ * @param replicaDict replica dictionary
73
+ * @param partitions list of partition data
74
+ * @return OHRegionLocator for range partitions
75
+ */
76
+ private OHRegionLocator createRangePartitionLocator (
77
+ final List <Object > tableIdDict ,
78
+ final List <Object > replicaDict ,
79
+ final List <Object > partitions ) {
80
+ final int partitionCount = partitions .size ();
81
+ final int regionCount = partitionCount + 1 ;
82
+ final byte [][] startKeys = new byte [regionCount ][];
83
+ final byte [][] endKeys = new byte [regionCount ][];
84
+
85
+ for (int i = 0 ; i < regionCount ; i ++) {
86
+ if (i == 0 ) {
87
+ startKeys [i ] = HConstants .EMPTY_BYTE_ARRAY ;
88
+ endKeys [i ] = ((List <Object >) partitions .get (i )).get (2 ).toString ().getBytes ();
89
+ } else if (i == regionCount - 1 ) {
90
+ startKeys [i ] = ((List <Object >) partitions .get (i - 1 )).get (2 ).toString ().getBytes ();
91
+ endKeys [i ] = HConstants .EMPTY_BYTE_ARRAY ;
92
+ } else {
93
+ startKeys [i ] = ((List <Object >) partitions .get (i - 1 )).get (2 ).toString ().getBytes ();
94
+ endKeys [i ] = ((List <Object >) partitions .get (i )).get (2 ).toString ().getBytes ();
95
+ }
34
96
}
97
+
98
+ // Create region locations for all regions
99
+ final List <HRegionLocation > regionLocations = IntStream .range (0 , regionCount )
100
+ .mapToObj (i -> {
101
+ final List <Object > partition = (List <Object >) partitions .get (Math .min (i , partitionCount - 1 ));
102
+ final int replicationIdx = (int ) partition .get (3 );
103
+ final List <Object > hostInfo = (List <Object >) replicaDict .get (replicationIdx );
104
+
105
+ final ServerName serverName = ServerName .valueOf (
106
+ hostInfo .get (0 ).toString (),
107
+ (int ) hostInfo .get (1 ),
108
+ i
109
+ );
110
+ final HRegionInfo regionInfo = new HRegionInfo (
111
+ TableName .valueOf (tableName ),
112
+ startKeys [i ],
113
+ endKeys [i ]
114
+ );
115
+ return new HRegionLocation (regionInfo , serverName , i );
116
+ })
117
+ .collect (Collectors .toList ());
118
+
119
+ return new OHRegionLocator (startKeys , endKeys , regionLocations );
120
+ }
121
+
122
+ /**
123
+ * Creates a region locator for hash partitions
124
+ * @param tableIdDict table ID dictionary
125
+ * @param replicaDict replica dictionary
126
+ * @param partitions list of partition data
127
+ * @return OHRegionLocator for hash partitions
128
+ */
129
+ private OHRegionLocator createHashPartitionLocator (
130
+ final List <Object > tableIdDict ,
131
+ final List <Object > replicaDict ,
132
+ final List <Object > partitions ) {
133
+
134
+ final byte [][] startKeys = new byte [1 ][];
135
+ final byte [][] endKeys = new byte [1 ][];
136
+ startKeys [0 ] = HConstants .EMPTY_BYTE_ARRAY ;
137
+ endKeys [0 ] = HConstants .EMPTY_BYTE_ARRAY ;
138
+
139
+ final List <HRegionLocation > regionLocations = IntStream .range (0 , partitions .size ())
140
+ .mapToObj (i -> {
141
+ final List <Object > partition = (List <Object >) partitions .get (i );
142
+ final int replicationIdx = (int ) partition .get (3 );
143
+ final List <Object > hostInfo = (List <Object >) replicaDict .get (replicationIdx );
144
+
145
+ final ServerName serverName = ServerName .valueOf (
146
+ hostInfo .get (0 ).toString (),
147
+ (int ) hostInfo .get (1 ),
148
+ i
149
+ );
150
+ final HRegionInfo regionInfo = new HRegionInfo (
151
+ TableName .valueOf (tableName ),
152
+ startKeys [0 ],
153
+ endKeys [0 ]
154
+ );
155
+ return new HRegionLocation (regionInfo , serverName , i );
156
+ })
157
+ .collect (Collectors .toList ());
158
+
159
+ return new OHRegionLocator (startKeys , endKeys , regionLocations );
35
160
}
36
161
37
- public OHRegionLocator getRegionLocator (String tableName ) throws IOException {
38
- ObTableMetaRequest request = new ObTableMetaRequest ();
162
+ public OHRegionLocator getRegionLocator (final String tableName ) throws IOException {
163
+ final ObTableMetaRequest request = new ObTableMetaRequest ();
39
164
request .setMetaType (getMetaType ());
40
- Map <String , Object > requestData = new HashMap <>();
165
+ final Map <String , String > requestData = new HashMap <>();
41
166
requestData .put ("table_name" , tableName );
42
- String jsonData = JSON .toJSONString (requestData );
167
+
168
+ final String jsonData = JSON .toJSONString (requestData );
43
169
request .setData (jsonData );
44
170
45
171
return execute (client , request );
46
172
}
47
-
48
- private final ObTableClient client ;
49
173
}
0 commit comments