Skip to content

Commit 7dfcd1c

Browse files
author
xlinliu
committed
add use.rundate to eventchecker
1 parent 7260764 commit 7dfcd1c

File tree

5 files changed

+26
-7
lines changed

5 files changed

+26
-7
lines changed

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/entity/EventChecker.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class EventChecker implements Runnable{
4545
public final static String SAVE_KEY="msg.savekey";
4646
public final static String USER_TIME="msg.init.querytime";
4747
public final static String TODAY="only.receive.today";
48+
public final static String USE_RUN_DATE ="use.rundate";
4849
public final static String AFTERSEND="msg.after.send";
4950

5051
private Properties p;

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/AbstractEventCheck.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public abstract class AbstractEventCheck implements EventCheckAdapter {
4242
String receiver;
4343
String sender;
4444
String receiveToday;
45+
String useRunDate;
46+
String runDate;
4547
String userTime;
4648
String waitTime;
4749
String query_frequency;
@@ -64,6 +66,8 @@ void initECParams(Properties props){
6466
sender = props.getProperty(EventChecker.SENDER);
6567
msg = props.getProperty(EventChecker.MSG);
6668
receiveToday = props.getProperty(EventChecker.TODAY);
69+
useRunDate = props.getProperty(EventChecker.USE_RUN_DATE);
70+
runDate = props.getProperty("run_date");
6771
userTime = props.getProperty(EventChecker.USER_TIME);
6872
waitTime = props.getProperty(EventChecker.WAIT_TIME, "1");
6973
query_frequency = props.getProperty(EventChecker.QUERY_FREQUENCY, "30000");

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/AbstractEventCheckReceiver.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,14 @@ String getOffset(int jobId, Properties props, Logger log){
138138
* Consistent entrance to consumer message
139139
*/
140140
String[] getMsg(Properties props, Logger log,String ... params){
141-
String sqlForReadTMsg = "SELECT * FROM event_queue WHERE topic=? AND msg_name=? AND send_time >=? AND send_time <=? AND msg_id >? ORDER BY msg_id ASC LIMIT 1";
141+
boolean useRunDate=Boolean.getBoolean(params[3]);
142+
String sqlForReadTMsg;
143+
if(useRunDate){
144+
sqlForReadTMsg ="SELECT * FROM event_queue WHERE topic=? AND msg_name=? AND send_time >=? AND send_time <=? AND msg_id >? AND run_date =?ORDER BY msg_id ASC LIMIT 1";
145+
} else{
146+
sqlForReadTMsg="SELECT * FROM event_queue WHERE topic=? AND msg_name=? AND send_time >=? AND send_time <=? AND msg_id >? ORDER BY msg_id ASC LIMIT 1";
147+
}
148+
142149
PreparedStatement pstmt = null;
143150
Connection msgConn = null;
144151
ResultSet rs = null;
@@ -151,6 +158,9 @@ String[] getMsg(Properties props, Logger log,String ... params){
151158
pstmt.setString(3, params[0]);
152159
pstmt.setString(4, params[1]);
153160
pstmt.setString(5, params[2]);
161+
if(useRunDate){
162+
pstmt.setString(6,params[4]);
163+
}
154164
log.info("param {} StartTime: " + params[0] + ", EndTime: " + params[1]
155165
+ ", Topic: " + topic + ", MessageName: " + msgName + ", LastMessageID: " + params[2]);
156166
rs = pstmt.executeQuery();

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/DefaultEventcheckReceiver.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public boolean reciveMsg(int jobId, Properties props, Logger log) {
5454
try{
5555
String lastMsgId = getOffset(jobId,props,log);
5656
String[] executeType = createExecuteType(jobId,props,log,lastMsgId);
57-
if(executeType!=null && executeType.length ==3){
57+
if(executeType!=null && executeType.length ==5){
5858
String[] consumedMsgInfo = getMsg(props, log,executeType);
5959
if(consumedMsgInfo!=null && consumedMsgInfo.length == 4){
6060
result = updateMsgOffset(jobId,props,log,consumedMsgInfo,lastMsgId);
@@ -73,19 +73,21 @@ public boolean reciveMsg(int jobId, Properties props, Logger log) {
7373
private String[] createExecuteType(int jobId, Properties props, Logger log,String lastMsgId){
7474
boolean receiveTodayFlag = (null != receiveToday && "true".equals(receiveToday.trim().toLowerCase()));
7575
boolean afterSendFlag = (null != afterSend && "true".equals(afterSend.trim().toLowerCase()));
76+
//只有receiveTodayFlag为true时,useRunDateFlag才有意义。
77+
Boolean useRunDateFlag = receiveTodayFlag && (null != useRunDate && "true".equalsIgnoreCase(useRunDate.trim()));
7678
String[] executeType = null;
7779
try {
7880
if(receiveTodayFlag){
7981
if(afterSendFlag){
80-
executeType = new String[]{nowStartTime,todayEndTime,lastMsgId};
82+
executeType = new String[]{nowStartTime,todayEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
8183
}else{
82-
executeType = new String[]{todayStartTime,todayEndTime,lastMsgId};
84+
executeType = new String[]{todayStartTime,todayEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
8385
}
8486
}else{
8587
if(afterSendFlag){
86-
executeType = new String[]{nowStartTime,allEndTime,lastMsgId};
88+
executeType = new String[]{nowStartTime,allEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
8789
}else{
88-
executeType = new String[]{allStartTime,allEndTime,lastMsgId};
90+
executeType = new String[]{allStartTime,allEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
8991
}
9092
}
9193
}catch(Exception e){

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/EventCheckSender.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public boolean sendMsg(int jobId, Properties props, Logger log) {
3737
PreparedStatement pstmt = null;
3838
Connection msgConn = null;
3939
String sendTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");
40-
String sqlForSendMsg = "INSERT INTO event_queue (sender,send_time,topic,msg_name,msg,send_ip) VALUES(?,?,?,?,?,?)";
40+
String runDate = props.getProperty("run_date");
41+
String sqlForSendMsg = "INSERT INTO event_queue (sender,send_time,topic,msg_name,msg,send_ip,run_date) VALUES(?,?,?,?,?,?,?)";
4142
try {
4243
String vIP = getLinuxLocalIp(log);
4344
msgConn = getEventCheckerConnection(props,log);
@@ -49,6 +50,7 @@ public boolean sendMsg(int jobId, Properties props, Logger log) {
4950
pstmt.setString(4, msgName);
5051
pstmt.setString(5, msg);
5152
pstmt.setString(6, vIP);
53+
pstmt.setString(7, runDate);
5254
int rs = pstmt.executeUpdate();
5355
if (rs == 1) {
5456
result = true;

0 commit comments

Comments
 (0)