Skip to content

Commit 797394d

Browse files
committed
Merge remote-tracking branch 'origin/dev-1.1.20' into dev-1.1.20
2 parents 7936b9b + 38a2a87 commit 797394d

File tree

4 files changed

+15
-16
lines changed

4 files changed

+15
-16
lines changed

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/entity/EventChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public class EventChecker implements Runnable{
4545
public final static String SAVE_KEY="msg.savekey";
4646
public final static String USER_TIME="msg.init.querytime";
4747
public final static String TODAY="only.receive.today";
48-
public final static String USE_RUN_DATE ="use.rundate";
48+
public final static String USE_RUN_DATE ="msg.receive.use.rundate";
4949
public final static String AFTERSEND="msg.after.send";
5050

5151
private Properties p;

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/AbstractEventCheckReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ String getOffset(int jobId, Properties props, Logger log){
138138
* Consistent entrance to consumer message
139139
*/
140140
String[] getMsg(Properties props, Logger log,String ... params){
141-
boolean useRunDate=Boolean.getBoolean(params[3]);
141+
boolean useRunDate=Boolean.parseBoolean(params[3]);
142142
String sqlForReadTMsg;
143143
if(useRunDate){
144144
sqlForReadTMsg ="SELECT * FROM event_queue WHERE topic=? AND msg_name=? AND send_time >=? AND send_time <=? AND msg_id >? AND run_date =?ORDER BY msg_id ASC LIMIT 1";
@@ -159,6 +159,7 @@ String[] getMsg(Properties props, Logger log,String ... params){
159159
pstmt.setString(4, params[1]);
160160
pstmt.setString(5, params[2]);
161161
if(useRunDate){
162+
log.info("use run_date, run_date:{}", params[4]);
162163
pstmt.setString(6,params[4]);
163164
}
164165
log.info("param {} StartTime: " + params[0] + ", EndTime: " + params[1]

dss-appconn/appconns/dss-eventchecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/eventchecker/service/DefaultEventcheckReceiver.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public boolean reciveMsg(int jobId, Properties props, Logger log) {
5454
try{
5555
String lastMsgId = getOffset(jobId,props,log);
5656
String[] executeType = createExecuteType(jobId,props,log,lastMsgId);
57+
log.info("event receiver executeType[]:{},{},{},{},{}",executeType[0],executeType[1],executeType[2],executeType[3],executeType[4]);
5758
if(executeType!=null && executeType.length ==5){
5859
String[] consumedMsgInfo = getMsg(props, log,executeType);
5960
if(consumedMsgInfo!=null && consumedMsgInfo.length == 4){
@@ -77,17 +78,17 @@ private String[] createExecuteType(int jobId, Properties props, Logger log,Strin
7778
Boolean useRunDateFlag = receiveTodayFlag && (null != useRunDate && "true".equalsIgnoreCase(useRunDate.trim()));
7879
String[] executeType = null;
7980
try {
80-
if(receiveTodayFlag){
81-
if(afterSendFlag){
82-
executeType = new String[]{nowStartTime,todayEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
83-
}else{
84-
executeType = new String[]{todayStartTime,todayEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
81+
if (receiveTodayFlag && !useRunDateFlag) {
82+
if (afterSendFlag) {
83+
executeType = new String[]{nowStartTime, todayEndTime, lastMsgId, useRunDateFlag.toString(), runDate};
84+
} else {
85+
executeType = new String[]{todayStartTime, todayEndTime, lastMsgId, useRunDateFlag.toString(), runDate};
8586
}
86-
}else{
87-
if(afterSendFlag){
88-
executeType = new String[]{nowStartTime,allEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
89-
}else{
90-
executeType = new String[]{allStartTime,allEndTime,lastMsgId,useRunDateFlag.toString(),runDate};
87+
} else {
88+
if (afterSendFlag) {
89+
executeType = new String[]{nowStartTime, allEndTime, lastMsgId, useRunDateFlag.toString(), runDate};
90+
} else {
91+
executeType = new String[]{allStartTime, allEndTime, lastMsgId, useRunDateFlag.toString(), runDate};
9192
}
9293
}
9394
}catch(Exception e){

dss-appconn/appconns/dss-eventchecker-appconn/src/main/scala/com/webank/wedatasphere/dss/appconn/eventchecker/execution/EventCheckerRefExecutionOperation.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,7 @@ class EventCheckerRefExecutionOperation
7676
val InstanceConfig = this.service.getAppInstance.getConfig
7777
val scalaParams: scala.collection.mutable.Map[String, Object] = requestRef.getExecutionRequestRefContext.getRuntimeMap
7878
val properties = new Properties()
79-
val variableParams: scala.collection.mutable.Map[String, Object] = requestRef.getRefJobContent.get("variable").asInstanceOf[java.util.Map[String, Object]]
80-
if (variableParams.exists(x => x._1.equalsIgnoreCase(VariableUtils.RUN_DATE))) {
81-
properties.put(VariableUtils.RUN_DATE, variableParams.get(VariableUtils.RUN_DATE))
82-
}
79+
properties.put(VariableUtils.RUN_DATE,requestRef.getRunDate)
8380
InstanceConfig.foreach { record =>
8481
if(null == record._2) {
8582
properties.put(record._1, "")}

0 commit comments

Comments
 (0)