Skip to content

Commit cf09ce6

Browse files
authored
Merge pull request #161 from oceanbase/fix_put_comp_conn
Fix put comp conn
2 parents 3c16ebf + ee6ba73 commit cf09ce6

File tree

2 files changed

+31
-28
lines changed

2 files changed

+31
-28
lines changed

src/main/java/com/alipay/oceanbase/hbase/OHTable.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,6 @@ public class OHTable implements HTableInterface {
122122
*/
123123
private boolean closeClientOnClose = true;
124124

125-
/**
126-
* If the connection this ObTable obtains is created by the ObTable itself,
127-
* should set true and close the connection when this ObTable closes;
128-
* otherwise set false
129-
*/
130-
private final boolean cleanupConnectionOnClose;
131-
132125
/**
133126
* when the operationExecuteInPool is true the <code>Get</code>
134127
* will be executed in the pool.
@@ -166,11 +159,6 @@ public class OHTable implements HTableInterface {
166159

167160
private int scannerTimeout;
168161

169-
/**
170-
* the connection to obtain bufferedMutator for Put operations
171-
*/
172-
private OHConnectionImpl connection;
173-
174162
/**
175163
* the bufferedMutator to execute Puts
176164
*/
@@ -199,8 +187,6 @@ public OHTable(Configuration configuration, String tableName) throws IOException
199187
this.configuration = configuration;
200188
this.tableName = tableName.getBytes();
201189
this.tableNameString = tableName;
202-
this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration);
203-
this.cleanupConnectionOnClose = true;
204190

205191
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
206192
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
@@ -258,8 +244,6 @@ public OHTable(Configuration configuration, final byte[] tableName,
258244
this.configuration = configuration;
259245
this.tableName = tableName;
260246
this.tableNameString = Bytes.toString(tableName);
261-
this.connection = (OHConnectionImpl) ConnectionFactory.createConnection(configuration);
262-
this.cleanupConnectionOnClose = true;
263247
this.executePool = executePool;
264248
this.cleanupPoolOnClose = false;
265249
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
@@ -296,7 +280,6 @@ public OHTable(final byte[] tableName, final ObTableClient obTableClient,
296280
this.tableNameString = Bytes.toString(tableName);
297281
this.cleanupPoolOnClose = false;
298282
this.closeClientOnClose = false;
299-
this.cleanupConnectionOnClose = false;
300283
this.executePool = executePool;
301284
this.obTableClient = obTableClient;
302285
this.configuration = HBaseConfiguration.create();
@@ -314,8 +297,6 @@ public OHTable(TableName tableName, Connection connection,
314297
this.tableNameString = Bytes.toString(tableName.getName());
315298
this.configuration = connection.getConfiguration();
316299
this.executePool = executePool;
317-
this.connection = (OHConnectionImpl) connection;
318-
this.cleanupConnectionOnClose = false;
319300
if (executePool == null) {
320301
int maxThreads = configuration.getInt(HBASE_HTABLE_PRIVATE_THREADS_MAX,
321302
DEFAULT_HBASE_HTABLE_PRIVATE_THREADS_MAX);
@@ -1531,11 +1512,6 @@ public void close() throws IOException {
15311512
if (cleanupPoolOnClose) {
15321513
executePool.shutdown();
15331514
}
1534-
if (cleanupConnectionOnClose) {
1535-
if (this.connection != null) {
1536-
this.connection.close();
1537-
}
1538-
}
15391515
this.isClosed = true;
15401516
}
15411517

@@ -2262,10 +2238,9 @@ public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
22622238

22632239
private BufferedMutator getBufferedMutator() throws IOException {
22642240
if (this.mutator == null) {
2265-
this.mutator = (OHBufferedMutatorImpl) this.connection.getBufferedMutator(
2266-
new BufferedMutatorParams(TableName.valueOf(this.tableNameString))
2267-
.pool(this.executePool).writeBufferSize(this.writeBufferSize)
2268-
.maxKeyValueSize(this.maxKeyValueSize), this);
2241+
this.mutator = new OHBufferedMutatorImpl(this.configuration, new BufferedMutatorParams(
2242+
TableName.valueOf(this.tableNameString)).pool(this.executePool)
2243+
.writeBufferSize(this.writeBufferSize).maxKeyValueSize(this.maxKeyValueSize), this);
22692244
}
22702245
return this.mutator;
22712246
}

src/main/java/com/alipay/oceanbase/hbase/util/OHBufferedMutatorImpl.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alipay.oceanbase.hbase.util;
1919

2020
import com.alipay.oceanbase.hbase.OHTable;
21+
import com.alipay.oceanbase.rpc.exception.ObTableUnexpectedException;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableBatchOperation;
2223
import org.apache.hadoop.classification.InterfaceAudience;
2324
import org.apache.hadoop.conf.Configuration;
@@ -87,6 +88,33 @@ public OHBufferedMutatorImpl(OHConnectionImpl ohConnection, BufferedMutatorParam
8788
}
8889
}
8990

91+
/**
92+
* only used for OHTable get bufferedMutator
93+
* */
94+
public OHBufferedMutatorImpl(Configuration conf, BufferedMutatorParams params, OHTable ohTable)
95+
throws IOException {
96+
// create an OHTable object to do batch work
97+
if (ohTable == null) {
98+
throw new ObTableUnexpectedException("The ohTable is null.");
99+
}
100+
this.ohTable = ohTable;
101+
// init params in OHBufferedMutatorImpl
102+
this.tableName = params.getTableName();
103+
this.conf = conf;
104+
this.listener = params.getListener();
105+
106+
OHConnectionConfiguration connectionConfig = new OHConnectionConfiguration(conf);
107+
this.pool = params.getPool();
108+
this.rpcTimeout = connectionConfig.getRpcTimeout();
109+
this.operationTimeout = connectionConfig.getOperationTimeout();
110+
111+
this.writeBufferSize = params.getWriteBufferSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
112+
.getWriteBufferSize() : connectionConfig.getWriteBufferSize();
113+
this.maxKeyValueSize = params.getMaxKeyValueSize() != OHConnectionImpl.BUFFERED_PARAM_UNSET ? params
114+
.getMaxKeyValueSize() : connectionConfig.getMaxKeyValueSize();
115+
116+
}
117+
90118
@Override
91119
public TableName getName() {
92120
return this.tableName;

0 commit comments

Comments
 (0)