18
18
package com .alipay .oceanbase .hbase .util ;
19
19
20
20
import com .alipay .oceanbase .hbase .OHTable ;
21
- import com .google . common . annotations . VisibleForTesting ;
21
+ import com .alipay . oceanbase . rpc . protocol . payload . impl . execute . ObTableBatchOperation ;
22
22
import org .apache .hadoop .classification .InterfaceAudience ;
23
23
import org .apache .hadoop .conf .Configuration ;
24
+ import org .apache .hadoop .hbase .KeyValue ;
24
25
import org .apache .hadoop .hbase .TableName ;
25
26
import org .apache .hadoop .hbase .client .*;
26
27
import org .slf4j .Logger ;
31
32
import java .util .concurrent .ExecutorService ;
32
33
import java .util .concurrent .TimeUnit ;
33
34
import java .util .concurrent .atomic .AtomicLong ;
34
- import static com .alipay .oceanbase .rpc .util .TableClientLoggerFactory .LCD ;
35
+
36
+ import static com .alipay .oceanbase .hbase .util .TableHBaseLoggerFactory .LCD ;
37
+ import static com .alipay .oceanbase .rpc .ObGlobal .*;
35
38
36
39
@ InterfaceAudience .Private
37
40
public class OHBufferedMutatorImpl implements BufferedMutator {
@@ -40,13 +43,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
40
43
41
44
private final ExceptionListener listener ;
42
45
43
- private final OHTable ohTable ;
44
46
private final TableName tableName ;
45
47
private volatile Configuration conf ;
46
48
47
- @ VisibleForTesting
49
+ private OHTable ohTable ;
48
50
final ConcurrentLinkedQueue <Mutation > asyncWriteBuffer = new ConcurrentLinkedQueue <Mutation >();
49
- @ VisibleForTesting
50
51
AtomicLong currentAsyncBufferSize = new AtomicLong (0 );
51
52
52
53
private long writeBufferSize ;
@@ -55,9 +56,11 @@ public class OHBufferedMutatorImpl implements BufferedMutator {
55
56
private final ExecutorService pool ;
56
57
private int rpcTimeout ;
57
58
private int operationTimeout ;
59
+ private static final long OB_VERSION_4_2_5_1 = calcVersion (4 , (short ) 2 ,
60
+ (byte ) 5 , (byte ) 1 );
58
61
59
- public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params )
60
- throws IOException {
62
+ public OHBufferedMutatorImpl (OHConnectionImpl ohConnection , BufferedMutatorParams params ,
63
+ OHTable ohTable ) throws IOException {
61
64
if (ohConnection == null || ohConnection .isClosed ()) {
62
65
throw new IllegalArgumentException ("Connection is null or closed." );
63
66
}
@@ -77,7 +80,11 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
77
80
.getMaxKeyValueSize () : connectionConfig .getMaxKeyValueSize ();
78
81
79
82
// create an OHTable object to do batch work
80
- this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
83
+ if (ohTable != null ) {
84
+ this .ohTable = ohTable ;
85
+ } else {
86
+ this .ohTable = new OHTable (tableName , ohConnection , connectionConfig , pool );
87
+ }
81
88
}
82
89
83
90
@ Override
@@ -119,14 +126,12 @@ public void mutate(List<? extends Mutation> mutations) throws IOException {
119
126
validateOperation (m );
120
127
toAddSize += m .heapSize ();
121
128
}
122
-
123
129
currentAsyncBufferSize .addAndGet (toAddSize );
124
130
asyncWriteBuffer .addAll (mutations );
125
131
126
132
if (currentAsyncBufferSize .get () > writeBufferSize ) {
127
- execute (false );
133
+ batchExecute (false );
128
134
}
129
-
130
135
}
131
136
132
137
/**
@@ -142,10 +147,18 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
142
147
}
143
148
if (mt instanceof Put ) {
144
149
// family empty check is in validatePut
145
- HTable .validatePut ((Put ) mt , maxKeyValueSize );
146
- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
150
+ OHTable .validatePut ((Put ) mt , maxKeyValueSize );
151
+ if (isMultiFamilySupport ()) {
152
+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), true );
153
+ } else {
154
+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
155
+ }
147
156
} else {
148
- OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
157
+ if (isMultiFamilySupport ()) {
158
+ OHTable .checkFamilyViolation (mt .getFamilyMap ().keySet (), false );
159
+ } else {
160
+ OHTable .checkFamilyViolationForOneFamily (mt .getFamilyMap ().keySet ());
161
+ }
149
162
}
150
163
}
151
164
@@ -156,7 +169,7 @@ private void validateOperation(Mutation mt) throws IllegalArgumentException {
156
169
* @param flushAll - if true, sends all the writes and wait for all of them to finish before
157
170
* returning.
158
171
*/
159
- private void execute (boolean flushAll ) throws IOException {
172
+ private void batchExecute (boolean flushAll ) throws IOException {
160
173
LinkedList <Mutation > execBuffer = new LinkedList <>();
161
174
long dequeuedSize = 0L ;
162
175
try {
@@ -172,19 +185,16 @@ private void execute(boolean flushAll) throws IOException {
172
185
if (execBuffer .isEmpty ()) {
173
186
return ;
174
187
}
175
- ohTable .batch (execBuffer );
188
+ Object [] results = new Object [execBuffer .size ()];
189
+ ohTable .batch (execBuffer , results );
176
190
// if commit all successfully, clean execBuffer
177
191
execBuffer .clear ();
178
192
} catch (Exception ex ) {
179
- LOGGER .error (LCD .convert ("01-00026" ), ex );
193
+ // do not recollect error operations, notify outside
194
+ LOGGER .error ("error happens: table name = " , tableName .getNameAsString (), ex );
180
195
if (ex .getCause () instanceof RetriesExhaustedWithDetailsException ) {
181
- LOGGER .error (tableName + ": One or more of the operations have failed after retries." );
196
+ LOGGER .error (tableName . getNameAsString () + ": One or more of the operations have failed after retries." , ex );
182
197
RetriesExhaustedWithDetailsException retryException = (RetriesExhaustedWithDetailsException ) ex .getCause ();
183
- // recollect mutations
184
- execBuffer .clear ();
185
- for (int i = 0 ; i < retryException .getNumExceptions (); ++i ) {
186
- execBuffer .add ((Mutation ) retryException .getRow (i ));
187
- }
188
198
if (listener != null ) {
189
199
listener .onException (retryException , this );
190
200
} else {
@@ -194,12 +204,6 @@ private void execute(boolean flushAll) throws IOException {
194
204
LOGGER .error ("Errors unrelated to operations occur during mutation operation" , ex );
195
205
throw ex ;
196
206
}
197
- } finally {
198
- for (Mutation mutation : execBuffer ) {
199
- long size = mutation .heapSize ();
200
- currentAsyncBufferSize .addAndGet (size );
201
- asyncWriteBuffer .add (mutation );
202
- }
203
207
}
204
208
}
205
209
@@ -209,7 +213,7 @@ public void close() throws IOException {
209
213
return ;
210
214
}
211
215
try {
212
- execute (true );
216
+ batchExecute (true );
213
217
} finally {
214
218
// the pool in ObTableClient will be shut down too
215
219
this .pool .shutdown ();
@@ -234,13 +238,21 @@ public void setWriteBufferSize(long writeBufferSize) throws IOException {
234
238
}
235
239
}
236
240
241
+ /**
242
+ * Only 4_2_5 BP1 - 4_3_0 and after 4_3_4 support multi-cf
243
+ * */
244
+ boolean isMultiFamilySupport () {
245
+ return (OB_VERSION >= OB_VERSION_4_2_5_1 && OB_VERSION < OB_VERSION_4_3_0_0 )
246
+ || (OB_VERSION >= OB_VERSION_4_3_4_0 );
247
+ }
248
+
237
249
/**
238
250
* Force to commit all operations
239
251
* do not care whether the pool is shut down or this BufferedMutator is closed
240
252
*/
241
253
@ Override
242
254
public void flush () throws IOException {
243
- execute (true );
255
+ batchExecute (true );
244
256
}
245
257
246
258
@ Override
@@ -258,6 +270,10 @@ public void setOperationTimeout(int operationTimeout) {
258
270
this .ohTable .setOperationTimeout (operationTimeout );
259
271
}
260
272
273
+ public long getCurrentBufferSize () {
274
+ return currentAsyncBufferSize .get ();
275
+ }
276
+
261
277
@ Deprecated
262
278
public List <Row > getWriteBuffer () {
263
279
return Arrays .asList (asyncWriteBuffer .toArray (new Row [0 ]));
0 commit comments