Skip to content

Commit 04be632

Browse files
author
linchao.wc
committed
fix 修改全量同步只有一条数据不同步的问题,格式化代码
1 parent 8bc323c commit 04be632

15 files changed

+51
-43
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.star.sync</groupId>
77
<artifactId>canal-mysql-elasticsearch-sync</artifactId>
8-
<version>1.0.4</version>
8+
<version>1.0.5</version>
99
<packaging>jar</packaging>
1010

1111
<name>canal-mysql-elasticsearch-sync</name>

src/main/java/com/star/sync/elasticsearch/event/CanalEvent.java renamed to src/main/java/com/star/sync/elasticsearch/event/AbstractCanalEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@
88
* @version 1.0
99
* @since 2017-08-26 22:22:00
1010
*/
11-
public abstract class CanalEvent extends ApplicationEvent {
11+
public abstract class AbstractCanalEvent extends ApplicationEvent {
1212

1313
/**
1414
* Create a new ApplicationEvent.
1515
*
1616
* @param source the object on which the event initially occurred (never {@code null})
1717
*/
18-
public CanalEvent(Entry source) {
18+
public AbstractCanalEvent(Entry source) {
1919
super(source);
2020
}
2121

src/main/java/com/star/sync/elasticsearch/event/DeleteCanalEvent.java renamed to src/main/java/com/star/sync/elasticsearch/event/DeleteAbstractCanalEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
* @version 1.0
88
* @since 2017-08-26 22:31:00
99
*/
10-
public class DeleteCanalEvent extends CanalEvent {
10+
public class DeleteAbstractCanalEvent extends AbstractCanalEvent {
1111
/**
1212
* Create a new ApplicationEvent.
1313
*
1414
* @param source the object on which the event initially occurred (never {@code null})
1515
*/
16-
public DeleteCanalEvent(Entry source) {
16+
public DeleteAbstractCanalEvent(Entry source) {
1717
super(source);
1818
}
1919
}

src/main/java/com/star/sync/elasticsearch/event/InsertCanalEvent.java renamed to src/main/java/com/star/sync/elasticsearch/event/InsertAbstractCanalEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
* @version 1.0
88
* @since 2017-08-26 22:30:00
99
*/
10-
public class InsertCanalEvent extends CanalEvent {
10+
public class InsertAbstractCanalEvent extends AbstractCanalEvent {
1111
/**
1212
* Create a new ApplicationEvent.
1313
*
1414
* @param source the object on which the event initially occurred (never {@code null})
1515
*/
16-
public InsertCanalEvent(Entry source) {
16+
public InsertAbstractCanalEvent(Entry source) {
1717
super(source);
1818
}
1919
}

src/main/java/com/star/sync/elasticsearch/event/UpdateCanalEvent.java renamed to src/main/java/com/star/sync/elasticsearch/event/UpdateAbstractCanalEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77
* @version 1.0
88
* @since 2017-08-26 22:30:00
99
*/
10-
public class UpdateCanalEvent extends CanalEvent {
10+
public class UpdateAbstractCanalEvent extends AbstractCanalEvent {
1111
/**
1212
* Create a new ApplicationEvent.
1313
*
1414
* @param source the object on which the event initially occurred (never {@code null})
1515
*/
16-
public UpdateCanalEvent(Entry source) {
16+
public UpdateAbstractCanalEvent(Entry source) {
1717
super(source);
1818
}
1919
}

src/main/java/com/star/sync/elasticsearch/listener/AbstractCanalListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
66
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
77
import com.google.protobuf.InvalidProtocolBufferException;
8-
import com.star.sync.elasticsearch.event.CanalEvent;
8+
import com.star.sync.elasticsearch.event.AbstractCanalEvent;
99
import com.star.sync.elasticsearch.model.DatabaseTableModel;
1010
import com.star.sync.elasticsearch.model.IndexTypeModel;
1111
import com.star.sync.elasticsearch.service.MappingService;
@@ -23,7 +23,7 @@
2323
* @version 1.0
2424
* @since 2017-08-28 14:40:00
2525
*/
26-
public abstract class AbstractCanalListener<EVENT extends CanalEvent> implements ApplicationListener<EVENT> {
26+
public abstract class AbstractCanalListener<EVENT extends AbstractCanalEvent> implements ApplicationListener<EVENT> {
2727
private static final Logger logger = LoggerFactory.getLogger(AbstractCanalListener.class);
2828

2929
@Resource

src/main/java/com/star/sync/elasticsearch/listener/DeleteCanalListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
44
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
5-
import com.star.sync.elasticsearch.event.DeleteCanalEvent;
5+
import com.star.sync.elasticsearch.event.DeleteAbstractCanalEvent;
66
import com.star.sync.elasticsearch.service.ElasticsearchService;
77
import com.star.sync.elasticsearch.service.MappingService;
88
import org.apache.commons.lang.StringUtils;
@@ -20,7 +20,7 @@
2020
* @since 2017-08-26 22:33:00
2121
*/
2222
@Component
23-
public class DeleteCanalListener extends AbstractCanalListener<DeleteCanalEvent> {
23+
public class DeleteCanalListener extends AbstractCanalListener<DeleteAbstractCanalEvent> {
2424
private static final Logger logger = LoggerFactory.getLogger(DeleteCanalListener.class);
2525

2626
@Resource

src/main/java/com/star/sync/elasticsearch/listener/InsertCanalListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
44
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
5-
import com.star.sync.elasticsearch.event.InsertCanalEvent;
5+
import com.star.sync.elasticsearch.event.InsertAbstractCanalEvent;
66
import com.star.sync.elasticsearch.service.ElasticsearchService;
77
import com.star.sync.elasticsearch.service.MappingService;
88
import com.star.sync.elasticsearch.util.JsonUtil;
@@ -22,7 +22,7 @@
2222
* @since 2017-08-26 22:32:00
2323
*/
2424
@Component
25-
public class InsertCanalListener extends AbstractCanalListener<InsertCanalEvent> {
25+
public class InsertCanalListener extends AbstractCanalListener<InsertAbstractCanalEvent> {
2626
private static final Logger logger = LoggerFactory.getLogger(InsertCanalListener.class);
2727

2828
@Resource

src/main/java/com/star/sync/elasticsearch/listener/UpdateCanalListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
44
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
5-
import com.star.sync.elasticsearch.event.UpdateCanalEvent;
5+
import com.star.sync.elasticsearch.event.UpdateAbstractCanalEvent;
66
import com.star.sync.elasticsearch.service.ElasticsearchService;
77
import com.star.sync.elasticsearch.service.MappingService;
88
import org.apache.commons.lang.StringUtils;
@@ -21,7 +21,7 @@
2121
* @since 2017-08-26 22:32:00
2222
*/
2323
@Component
24-
public class UpdateCanalListener extends AbstractCanalListener<UpdateCanalEvent> {
24+
public class UpdateCanalListener extends AbstractCanalListener<UpdateAbstractCanalEvent> {
2525
private static final Logger logger = LoggerFactory.getLogger(UpdateCanalListener.class);
2626

2727
@Resource

src/main/java/com/star/sync/elasticsearch/model/DatabaseTableModel.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ public void setTable(String table) {
3737

3838
@Override
3939
public boolean equals(Object o) {
40-
if (this == o) return true;
41-
if (o == null || getClass() != o.getClass()) return false;
40+
if (this == o) {
41+
return true;
42+
}
43+
if (o == null || getClass() != o.getClass()) {
44+
return false;
45+
}
4246
DatabaseTableModel that = (DatabaseTableModel) o;
4347
return Objects.equal(database, that.database) &&
4448
Objects.equal(table, that.table);

src/main/java/com/star/sync/elasticsearch/model/IndexTypeModel.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ public void setType(String type) {
3737

3838
@Override
3939
public boolean equals(Object o) {
40-
if (this == o) return true;
41-
if (o == null || getClass() != o.getClass()) return false;
40+
if (this == o) {
41+
return true;
42+
}
43+
if (o == null || getClass() != o.getClass()) {
44+
return false;
45+
}
4246
IndexTypeModel that = (IndexTypeModel) o;
4347
return Objects.equal(index, that.index) &&
4448
Objects.equal(type, that.type);

src/main/java/com/star/sync/elasticsearch/scheduling/CanalScheduling.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
66
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
77
import com.alibaba.otter.canal.protocol.Message;
8-
import com.star.sync.elasticsearch.event.DeleteCanalEvent;
9-
import com.star.sync.elasticsearch.event.InsertCanalEvent;
10-
import com.star.sync.elasticsearch.event.UpdateCanalEvent;
8+
import com.star.sync.elasticsearch.event.DeleteAbstractCanalEvent;
9+
import com.star.sync.elasticsearch.event.InsertAbstractCanalEvent;
10+
import com.star.sync.elasticsearch.event.UpdateAbstractCanalEvent;
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313
import org.springframework.beans.BeansException;
@@ -64,13 +64,13 @@ private void publishCanalEvent(Entry entry) {
6464
EventType eventType = entry.getHeader().getEventType();
6565
switch (eventType) {
6666
case INSERT:
67-
applicationContext.publishEvent(new InsertCanalEvent(entry));
67+
applicationContext.publishEvent(new InsertAbstractCanalEvent(entry));
6868
break;
6969
case UPDATE:
70-
applicationContext.publishEvent(new UpdateCanalEvent(entry));
70+
applicationContext.publishEvent(new UpdateAbstractCanalEvent(entry));
7171
break;
7272
case DELETE:
73-
applicationContext.publishEvent(new DeleteCanalEvent(entry));
73+
applicationContext.publishEvent(new DeleteAbstractCanalEvent(entry));
7474
break;
7575
default:
7676
break;

src/main/java/com/star/sync/elasticsearch/service/impl/MappingServiceImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
@PropertySource("classpath:mapping.properties")
2828
@ConfigurationProperties
2929
public class MappingServiceImpl implements MappingService, InitializingBean {
30-
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
30+
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
3131

3232
private Map<String, String> dbEsMapping;
3333
private BiMap<DatabaseTableModel, IndexTypeModel> dbEsBiMapping;
@@ -74,8 +74,8 @@ public void afterPropertiesSet() throws Exception {
7474
mysqlTypeElasticsearchTypeMapping.put("text", data -> data);
7575
mysqlTypeElasticsearchTypeMapping.put("blob", data -> data);
7676
mysqlTypeElasticsearchTypeMapping.put("int", Long::valueOf);
77-
mysqlTypeElasticsearchTypeMapping.put("date", data -> LocalDateTime.parse(data, formatter));
78-
mysqlTypeElasticsearchTypeMapping.put("time", data -> LocalDateTime.parse(data, formatter));
77+
mysqlTypeElasticsearchTypeMapping.put("date", data -> LocalDateTime.parse(data, FORMATTER));
78+
mysqlTypeElasticsearchTypeMapping.put("time", data -> LocalDateTime.parse(data, FORMATTER));
7979
mysqlTypeElasticsearchTypeMapping.put("float", Double::valueOf);
8080
mysqlTypeElasticsearchTypeMapping.put("double", Double::valueOf);
8181
mysqlTypeElasticsearchTypeMapping.put("decimal", Double::valueOf);

src/main/java/com/star/sync/elasticsearch/service/impl/SyncServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public boolean syncByTable(SyncByTableRequest request) {
5656
long maxPK = Optional.ofNullable(request.getTo()).orElse(baseDao.selectMaxPK(primaryKey, request.getDatabase(), request.getTable()));
5757
cachedThreadPool.submit(() -> {
5858
try {
59-
for (long i = minPK; i < maxPK; i += request.getStepSize()) {
59+
for (long i = minPK; i <= maxPK; i += request.getStepSize()) {
6060
transactionalService.batchInsertElasticsearch(request, primaryKey, i, i + request.getStepSize(), indexTypeModel);
6161
logger.info(String.format("当前同步pk=%s,总共total=%s,进度=%s%%", i, maxPK, new BigDecimal(i * 100).divide(new BigDecimal(maxPK), 3, BigDecimal.ROUND_HALF_UP)));
6262
}

src/main/java/com/star/sync/elasticsearch/util/JsonUtil.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
* @version 1.0
2121
* @since 2017-08-26 22:54:00
2222
*/
23-
public abstract class JsonUtil {
23+
public class JsonUtil {
2424
private static final Logger logger = LoggerFactory.getLogger(JsonUtil.class);
2525
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
2626
/**
2727
* 对象映射
2828
*/
29-
private static final ObjectMapper objMapper = new ObjectMapper();
29+
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
3030

3131
static {
32-
objMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
33-
objMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
34-
objMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
35-
objMapper.setSerializationInclusion(Include.NON_NULL);
32+
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
33+
OBJECT_MAPPER.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
34+
OBJECT_MAPPER.setDateFormat(new SimpleDateFormat(DATE_FORMAT));
35+
OBJECT_MAPPER.setSerializationInclusion(Include.NON_NULL);
3636
}
3737

3838

@@ -48,7 +48,7 @@ public static String toJson(Object obj) {
4848
return (String) obj;
4949
}
5050
try {
51-
rst = objMapper.writeValueAsString(obj);
51+
rst = OBJECT_MAPPER.writeValueAsString(obj);
5252
} catch (Exception e) {
5353
logger.error("将Java对象转换成Json串出错!");
5454
throw new RuntimeException("将Java对象转换成Json串出错!", e);
@@ -66,7 +66,7 @@ public static String toJson(Object obj) {
6666
public static <T> T fromJson(String json, Class<T> type) {
6767
T rst;
6868
try {
69-
rst = objMapper.readValue(json, type);
69+
rst = OBJECT_MAPPER.readValue(json, type);
7070
} catch (Exception e) {
7171
logger.error("Json串转换成对象出错:{}", json);
7272
throw new RuntimeException("Json串转换成对象出错!", e);
@@ -87,7 +87,7 @@ public static <T> T fromJson(String json, Class<T> type) {
8787
public static <T> T fromJson(String json, TypeReference<T> typeRef) {
8888
T rst;
8989
try {
90-
rst = objMapper.readValue(json, typeRef);
90+
rst = OBJECT_MAPPER.readValue(json, typeRef);
9191
} catch (Exception e) {
9292
logger.error("Json串转换成对象出错:{}", json);
9393
throw new RuntimeException("Json串转换成对象出错!", e);
@@ -99,7 +99,7 @@ public static <T> T fromJson(String json, TypeReference<T> typeRef) {
9999
public static HashMap<String, Object> fromJsonToMap(String json) {
100100
HashMap<String, Object> map = new HashMap<String, Object>();
101101
try {
102-
map = objMapper.readValue(json, map.getClass());
102+
map = OBJECT_MAPPER.readValue(json, map.getClass());
103103
} catch (IOException e) {
104104
logger.error("Json串转换成对象出错:{}", json);
105105
}
@@ -110,7 +110,7 @@ public static HashMap<String, Object> fromJsonToMap(String json) {
110110
public static HashMap<String, Object> toMap(String json) {
111111
HashMap<String, Object> map;
112112
try {
113-
map = objMapper.readValue(json, HashMap.class);
113+
map = OBJECT_MAPPER.readValue(json, HashMap.class);
114114
} catch (Exception e) {
115115
map = null;
116116
logger.error("Json串转换成对象出错:{}", json);
@@ -122,7 +122,7 @@ public static HashMap<String, Object> toMap(String json) {
122122
public static List<HashMap<String, Object>> fromJsonToList(String json) {
123123
List<HashMap<String, Object>> list;
124124
try {
125-
list = objMapper.readValue(json, List.class);
125+
list = OBJECT_MAPPER.readValue(json, List.class);
126126
} catch (IOException e) {
127127
logger.error("Json串转换成对象出错:{}", json);
128128
throw new RuntimeException("Json串转换成List出错!", e);

0 commit comments

Comments
 (0)