Skip to content

Commit 2debf6e

Browse files
committed
添加PutBuffer工具类,添加获取Table的接口
1 parent 528d6d2 commit 2debf6e

File tree

6 files changed

+168
-33
lines changed

6 files changed

+168
-33
lines changed

README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,17 @@ try {
197197
e.printStackTrace();
198198
}
199199
```
200+
- 使用Put缓冲器入库
201+
```java
202+
PutBuffer buffer = client.createPutBuffer("test_table", 1000, 5000);
203+
for (int i = 0; i < 100_000; i++) {
204+
Put put = new Put(Bytes.toBytes("ggg_132131" + i));
205+
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("phone"), Bytes.toBytes("1891234567" + i));
206+
207+
buffer.put(put);
208+
}
209+
buffer.flush();
210+
```
200211
- 提交Put异步请求
201212
```java
202213
client.putAsync("tb_test", putList, new PutCallback() {

src/com/skey/evehbase/client/EveHBase.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import com.skey.evehbase.util.GenericUtils;
88
import com.skey.evehbase.util.HResultUtils;
99
import com.skey.evehbase.util.IOUtils;
10+
import com.skey.evehbase.util.PutBuffer;
1011
import org.apache.hadoop.conf.Configuration;
1112
import org.apache.hadoop.hbase.*;
1213
import org.apache.hadoop.hbase.client.*;
@@ -111,15 +112,33 @@ public void create(@Nonnull EveTable eveTable) {
111112
if (LOG.isErrorEnabled()) LOG.error("Create table failed.", e);
112113
} finally {
113114
close(admin, "Close admin failed ");
115+
if (LOG.isInfoEnabled()) LOG.info("Exiting create.");
114116
}
115-
if (LOG.isInfoEnabled()) LOG.info("Exiting create.");
116117
}
117118

118119
@Override
119-
public void disable(EveTable eveTable) {
120+
public Table getTable(String tableName) {
121+
if (LOG.isInfoEnabled()) LOG.info("Entering getTable.");
122+
123+
Table table = null;
124+
try {
125+
table = conn.getTable(TableName.valueOf(tableName));
126+
127+
if (LOG.isInfoEnabled()) LOG.info("GetTable successfully.");
128+
} catch (IOException e) {
129+
if (LOG.isErrorEnabled()) LOG.error("Get table failed.", e);
130+
} finally {
131+
if (LOG.isInfoEnabled()) LOG.info("Exiting getTable.");
132+
}
133+
134+
return table;
135+
}
136+
137+
@Override
138+
public void disable(String tableName) {
120139
if (LOG.isInfoEnabled()) LOG.info("Entering disable.");
121140

122-
TableName tn = eveTable.getTableName();
141+
TableName tn = TableName.valueOf(tableName);
123142

124143
Admin admin = null;
125144
try {
@@ -141,15 +160,15 @@ public void disable(EveTable eveTable) {
141160
if (LOG.isErrorEnabled()) LOG.error("Disable table failed.", e);
142161
} finally {
143162
close(admin, "Close admin failed ");
163+
if (LOG.isInfoEnabled()) LOG.info("Exiting disable.");
144164
}
145-
if (LOG.isInfoEnabled()) LOG.info("Exiting disable.");
146165
}
147166

148167
@Override
149-
public void enable(EveTable eveTable) {
168+
public void enable(String tableName) {
150169
if (LOG.isInfoEnabled()) LOG.info("Entering enable.");
151170

152-
TableName tn = eveTable.getTableName();
171+
TableName tn = TableName.valueOf(tableName);
153172

154173
Admin admin = null;
155174
try {
@@ -171,15 +190,15 @@ public void enable(EveTable eveTable) {
171190
if (LOG.isErrorEnabled()) LOG.error("Enable table failed.", e);
172191
} finally {
173192
close(admin, "Close admin failed ");
193+
if (LOG.isInfoEnabled()) LOG.info("Exiting enable.");
174194
}
175-
if (LOG.isInfoEnabled()) LOG.info("Exiting enable.");
176195
}
177196

178197
@Override
179-
public void delete(EveTable eveTable) {
198+
public void delete(String tableName) {
180199
if (LOG.isInfoEnabled()) LOG.info("Entering delete.");
181200

182-
TableName tn = eveTable.getTableName();
201+
TableName tn = TableName.valueOf(tableName);
183202

184203
Admin admin = null;
185204
try {
@@ -201,15 +220,15 @@ public void delete(EveTable eveTable) {
201220
if (LOG.isErrorEnabled()) LOG.error("Delete table failed.", e);
202221
} finally {
203222
close(admin, "Close admin failed ");
223+
if (LOG.isInfoEnabled()) LOG.info("Exiting delete.");
204224
}
205-
if (LOG.isInfoEnabled()) LOG.info("Exiting delete.");
206225
}
207226

208227
@Override
209-
public void disableAndDelete(EveTable eveTable) {
228+
public void disableAndDelete(String tableName) {
210229
if (LOG.isInfoEnabled()) LOG.info("Entering DisableAndDelete.");
211230

212-
TableName tn = eveTable.getTableName();
231+
TableName tn = TableName.valueOf(tableName);
213232

214233
Admin admin = null;
215234
try {
@@ -232,8 +251,8 @@ public void disableAndDelete(EveTable eveTable) {
232251
if (LOG.isErrorEnabled()) LOG.error("DisableAndDelete table failed.", e);
233252
} finally {
234253
close(admin, "Close admin failed ");
254+
if (LOG.isInfoEnabled()) LOG.info("Exiting DisableAndDelete.");
235255
}
236-
if (LOG.isInfoEnabled()) LOG.info("Exiting DisableAndDelete.");
237256
}
238257

239258
@Override
@@ -269,8 +288,8 @@ public void multiSplit(@Nonnull String tableName, @Nonnull String... splitKeys)
269288
} finally {
270289
close(table, "Close table failed ");
271290
close(admin, "Close admin failed ");
291+
if (LOG.isInfoEnabled()) LOG.info("Exiting multiSplit.");
272292
}
273-
if (LOG.isInfoEnabled()) LOG.info("Exiting multiSplit.");
274293
}
275294

276295
@Override
@@ -313,8 +332,14 @@ public void createIndex(@Nonnull String tableName, @Nonnull String familyName,
313332
} finally {
314333
close(admin, "Close admin failed ");
315334
close(iAdmin, "Close admin failed ");
335+
if (LOG.isInfoEnabled()) LOG.info("Exiting createIndex.");
316336
}
317-
if (LOG.isInfoEnabled()) LOG.info("Exiting createIndex.");
337+
}
338+
339+
@Override
340+
public PutBuffer createPutBuffer(String tableName, int bufferSize, int duration) {
341+
Table table = getTable(tableName);
342+
return new PutBuffer(table, bufferSize, duration);
318343
}
319344

320345
@Override
@@ -347,8 +372,8 @@ public void put(@Nonnull String tableName, @Nonnull List<Put> putList) throws IO
347372
if (LOG.isInfoEnabled()) LOG.info("Put successfully.");
348373
} finally {
349374
close(table, "Close table failed ");
375+
if (LOG.isInfoEnabled()) LOG.info("Exiting Put.");
350376
}
351-
if (LOG.isInfoEnabled()) LOG.info("Exiting put.");
352377
}
353378

354379
@Override
@@ -365,7 +390,7 @@ public <T> List<T> scan(@Nonnull EveScan eveScan, @Nonnull Class<T> clazz) throw
365390
// 提交scan请求
366391
scanner = table.getScanner(eveScan.getScan());
367392
Result r;
368-
while ((r = scanner.next()) != null ) {
393+
while ((r = scanner.next()) != null) {
369394
results.add(r);
370395
}
371396

@@ -416,7 +441,7 @@ public <T> List<T> get(@Nonnull EveGet eveGet, @Nonnull Class<T> clazz) throws I
416441

417442
List<T> objects = HResultUtils.parseResults(results, clazz);
418443

419-
if (LOG.isInfoEnabled()) LOG.info("Exiting testGet.");
444+
if (LOG.isInfoEnabled()) LOG.info("Exiting Get.");
420445
return objects;
421446
}
422447

src/com/skey/evehbase/client/HBaseClient.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.skey.evehbase.client;
22

33
import com.skey.evehbase.request.*;
4+
import com.skey.evehbase.util.PutBuffer;
45
import org.apache.hadoop.hbase.TableName;
56
import org.apache.hadoop.hbase.client.Connection;
67
import org.apache.hadoop.hbase.client.Put;
8+
import org.apache.hadoop.hbase.client.Table;
79

810
import java.io.IOException;
911
import java.util.List;
@@ -42,29 +44,37 @@ public interface HBaseClient {
4244
*/
4345
void create(EveTable eveTable);
4446

47+
/**
48+
* HBase获取表
49+
*
50+
* @param tableName 表名
51+
* @return HBase 的 {@link Table}
52+
*/
53+
Table getTable(String tableName);
54+
4555
/**
4656
* 关闭表
47-
* @param eveTable {@link EveTable}
57+
* @param tableName 表名
4858
*/
49-
void disable(EveTable eveTable);
59+
void disable(String tableName);
5060

5161
/**
5262
* 启用表
53-
* @param eveTable {@link EveTable}
63+
* @param tableName 表名
5464
*/
55-
void enable(EveTable eveTable);
65+
void enable(String tableName);
5666

5767
/**
5868
* 删表
59-
* @param eveTable {@link EveTable}
69+
* @param tableName 表名
6070
*/
61-
void delete(EveTable eveTable);
71+
void delete(String tableName);
6272

6373
/**
6474
* 关闭表,并删除
65-
* @param eveTable {@link EveTable}
75+
* @param tableName 表名
6676
*/
67-
void disableAndDelete(EveTable eveTable);
77+
void disableAndDelete(String tableName);
6878

6979
/**
7080
* 表Region拆分
@@ -84,6 +94,15 @@ public interface HBaseClient {
8494
*/
8595
void createIndex(String tableName, String familyName, String qualifier, String indexName);
8696

97+
/**
98+
* Put缓冲器,辅助入表,用完请记得调用 {@link PutBuffer#flush}
99+
* @param tableName 表名
100+
* @param bufferSize 缓冲区大小
101+
* @param duration 每批入表最大时间间隔,毫秒
102+
* @return 缓冲器 {@link PutBuffer}
103+
*/
104+
PutBuffer createPutBuffer(String tableName, int bufferSize, int duration);
105+
87106
void put(String tableName, Put put) throws IOException;
88107

89108
/**

src/com/skey/evehbase/util/PutBuffer.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,58 @@
1010
/**
1111
* Put缓冲器
1212
* <p>
13+
* 当bufferSize达到指定大小,或者距离上次入库时间达到duration毫秒,将会直接入库
14+
* </p>
15+
*
1316
* Date: 2018/11/16 11:41
1417
*
1518
* @author A Lion~
1619
*/
1720
public class PutBuffer {
1821

19-
private static final int BUFFER_SIZE = 2000;
22+
private final int bufferSize;
23+
24+
private final int duration;
25+
26+
private final Table table;
2027

21-
private Table table;
28+
private final List<Put> putList ;
2229

23-
private List<Put> putList = new ArrayList<>(BUFFER_SIZE);
30+
private long lastTime = 0;
2431

2532
public PutBuffer(Table table) {
33+
this(table, 1000, 5000);
34+
}
35+
36+
public PutBuffer(Table table, int bufferSize, int duration) {
2637
this.table = table;
38+
this.bufferSize = bufferSize;
39+
this.duration = duration;
40+
putList = new ArrayList<>(bufferSize);
2741
}
2842

43+
/**
44+
* 将Put放入缓冲区
45+
* @param put HBase的 {@link Put}
46+
* @throws IOException
47+
*/
2948
public void put(Put put) throws IOException {
30-
if (putList.size() < BUFFER_SIZE) {
49+
if (putList.size() < bufferSize && System.currentTimeMillis() - lastTime < duration) {
3150
putList.add(put);
3251
} else {
3352
flush();
3453
}
3554
}
3655

56+
/**
57+
* 将当前缓冲区的数据直接写入HBase
58+
* @throws IOException
59+
*/
3760
public void flush() throws IOException {
3861
table.put(putList);
3962
putList.clear();
63+
64+
lastTime = System.currentTimeMillis();
4065
}
4166

4267
}

test/com/skey/evehbase/Demo01Test.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ public class Demo01Test {
2525
public static void main(String[] args) {
2626
// HBase配置
2727
Configuration hbaseConf = HBaseConfiguration.create();
28-
hbaseConf.addResource(new Path("./conf/core-site.xml"));
28+
hbaseConf.addResource(new Path("./conf/core-site.xml"));
2929
hbaseConf.addResource(new Path("./conf/hdfs-site.xml"));
3030
hbaseConf.addResource(new Path("./conf/hbase-site.xml"));
3131

3232
// 安全认证配置
3333
SecurityConf securityConf = new SecurityConf(
3434
"test",
3535
"./kerberos/user.keytab",
36-
"./kerberos/krb5.conf");
36+
"./kerberos/krb5.conf");
3737

3838
// 构建客户端
3939
HBaseClient client = new EveHBase.Builder()
@@ -42,7 +42,6 @@ public static void main(String[] args) {
4242
.build();
4343

4444

45-
4645
client.close();
4746
}
4847
}

0 commit comments

Comments
 (0)