Skip to content

Commit a1fb8b8

Browse files
committed
Revert "datachecker 支持 qualitis v2"
This reverts commit aa37fa0.
1 parent a5184b2 commit a1fb8b8

File tree

5 files changed

+45
-117
lines changed

5 files changed

+45
-117
lines changed

dss-appconn/appconns/dss-datachecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/datachecker/DataChecker.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,24 +28,18 @@
2828
public class DataChecker {
2929
public final static String SOURCE_TYPE = "source.type";
3030
public final static String DATA_OBJECT = "check.object";
31-
/**
32-
* 检查对象在节点中的序号
33-
*/
34-
public final static String DATA_OBJECT_NUM = "check.sn.object.num";
3531
public final static String WAIT_TIME = "max.check.hours";
3632
public final static String QUERY_FREQUENCY = "query.frequency";
3733
public final static String TIME_SCAPE = "time.scape";
3834
public final static String MASK_URL = "bdp.mask.url";
3935
public final static String MASK_APP_ID = "bdp.mask.app.id";
4036
public final static String MASK_APP_TOKEN = "bdp.mask.app.token";
41-
public final static String CONTEXTID_USER = "contextId.user";
42-
public final static String CONTEXTID_PROJECT_NAME = "contextId.projectName";
43-
public final static String CONTEXTID_FLOW_NAME = "contextId.flowName";
44-
public final static String NAME_NAME = "nodeName";
45-
37+
public static final String JOB_ID = "azkaban.job.id";
4638
public final static String QUALITIS_CHECK = "qualitis.check";
4739

48-
40+
public final static String DB_NAME = "dbName";
41+
public final static String TABLE_NAME = "tableName";
42+
public final static String PARTITION_NAME = "partitionName";
4943

5044
private Properties p;
5145
private static final Logger logger = LoggerFactory.getLogger(DataChecker.class);;

dss-appconn/appconns/dss-datachecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/datachecker/connector/DataCheckerDao.java

Lines changed: 19 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import okhttp3.RequestBody;
2929
import okhttp3.Response;
3030
import okhttp3.ResponseBody;
31-
import org.apache.commons.codec.digest.DigestUtils;
3231
import org.apache.commons.lang.StringUtils;
3332
import org.slf4j.Logger;
3433

@@ -59,12 +58,9 @@ public class DataCheckerDao {
5958
"AND (UNIX_TIMESTAMP() - UNIX_TIMESTAMP(STR_TO_DATE(modify_time, '%Y-%m-%d %H:%i:%s'))) <= ? AND status = '1';";
6059

6160
private static final String SQL_DOPS_CHECK_TABLE =
62-
"SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is null AND task_state NOT IN (10,13) order by order_id desc limit 1";
61+
"SELECT * FROM dops_clean_task_list WHERE task_state = 10 AND db_name = ? AND tb_name = ? ";
6362
private static final String SQL_DOPS_CHECK_PARTITION =
64-
"SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name = ? AND task_state NOT IN (10,13) order by order_id desc limit 1";
65-
66-
private static final String SQL_DOPS_CHECK_ALL_PARTITION =
67-
"SELECT * FROM dops_clean_task_list WHERE db_name = ? AND tb_name = ? AND part_name is not null AND task_state != 13 order by order_id desc limit 1";
63+
"SELECT * FROM dops_clean_task_list WHERE task_state = 10 AND db_name = ? AND tb_name = ? AND part_name = ?";
6864
private static final String HIVE_SOURCE_TYPE = "hivedb";
6965
private static final String MASK_SOURCE_TYPE = "maskdb";
7066

@@ -181,7 +177,6 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
181177
if (StringUtils.isNotBlank(dataObjectStr)) {
182178
dataObjectStr = dataObjectStr.replace(" ", "").trim();
183179
}
184-
String objectNum = proObjectMap.get(DataChecker.DATA_OBJECT_NUM);
185180
CheckDataObject dataObject;
186181
try {
187182
dataObject = parseDataObject(dataObjectStr);
@@ -218,7 +213,8 @@ private boolean getDataCheckResult(Map<String, String> proObjectMap,
218213
}
219214
}
220215
return normalCheck
221-
&& checkQualitisData( objectNum,dataObject, log, props, dopsConn,qualitisUtil);
216+
&& getDopsTotalCount(dataObject, dopsConn, log) > 0
217+
&& checkQualitisData(dataObject, log, props, qualitisUtil);
222218

223219
}
224220

@@ -253,15 +249,13 @@ private Map<String, String> key2Map(Object key, Properties p) {
253249
proMap.put(DataChecker.SOURCE_TYPE, String.valueOf(p.get(stKey)));
254250
}
255251
proMap.put(DataChecker.DATA_OBJECT, String.valueOf(p.get(doKey)));
256-
proMap.put(DataChecker.DATA_OBJECT_NUM, keyNum);
257252
} else {
258253
String stKey = DataChecker.SOURCE_TYPE;
259254
String doKey = DataChecker.DATA_OBJECT;
260255
if (null != p.get(stKey)) {
261256
proMap.put(DataChecker.SOURCE_TYPE, String.valueOf(p.get(stKey)));
262257
}
263258
proMap.put(DataChecker.DATA_OBJECT, String.valueOf(p.get(doKey)));
264-
proMap.put(DataChecker.DATA_OBJECT_NUM, "0");
265259
}
266260
}
267261

@@ -324,17 +318,6 @@ private PreparedStatement getDopsStatement(Connection conn, CheckDataObject data
324318
}
325319
}
326320

327-
/**
328-
* 构造查询dops库的查询,分区表全表校验场景
329-
*/
330-
private PreparedStatement getDopsStatementCheckAllPartition(Connection conn, CheckDataObject dataObject) throws SQLException {
331-
PreparedStatement pstmt = conn.prepareCall(SQL_DOPS_CHECK_ALL_PARTITION);
332-
pstmt.setString(1, dataObject.getDbName());
333-
pstmt.setString(2, dataObject.getTableName());
334-
return pstmt;
335-
336-
}
337-
338321

339322
/**
340323
* 反序列化检查对象
@@ -343,7 +326,7 @@ private PreparedStatement getDopsStatementCheckAllPartition(Connection conn, Che
343326
*/
344327
private CheckDataObject parseDataObject(String dataObjectStr)throws SQLException{
345328
CheckDataObject dataObject;
346-
if(!dataObjectStr.contains(".")){
329+
if(!dataObjectStr.contains("\\.")){
347330
throw new SQLException("Error for DataObject format!"+dataObjectStr);
348331
}
349332
String dbName = dataObjectStr.split("\\.")[0];
@@ -368,12 +351,10 @@ private CheckDataObject parseDataObject(String dataObjectStr)throws SQLException
368351
*/
369352
private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logger log) {
370353
log.info("-------------------------------------- search hive/spark/mr data ");
371-
log.info("-------------------------------------- dataObject: " + dataObject);
354+
log.info("-------------------------------------- : " + dataObject);
372355
try (PreparedStatement pstmt = getJobStatement(conn, dataObject)) {
373356
ResultSet rs = pstmt.executeQuery();
374-
long ret = rs.last() ? rs.getRow() : 0;
375-
log.info("-------------------------------------- hive/spark/mr data result:"+ret);
376-
return ret;
357+
return rs.last() ? rs.getRow() : 0;
377358
} catch (SQLException e) {
378359
log.error("fetch data from Hive MetaStore error", e);
379360
return 0;
@@ -386,84 +367,45 @@ private long getJobTotalCount(CheckDataObject dataObject, Connection conn, Logge
386367
private long getBdpTotalCount(CheckDataObject dataObject, Connection conn, Logger log, Properties props) {
387368
String timeScape = props.getOrDefault(DataChecker.TIME_SCAPE, "NULL").toString();
388369
log.info("-------------------------------------- search bdp data ");
389-
log.info("-------------------------------------- dataObject: " + dataObject.toString());
370+
log.info("-------------------------------------- : " + dataObject.toString());
390371
try (PreparedStatement pstmt = getBdpStatement(conn, dataObject, timeScape)) {
391372
ResultSet rs = pstmt.executeQuery();
392-
long ret=rs.last() ? rs.getRow() : 0;
393-
log.info("-------------------------------------- bdp data result:"+ret);
394-
return ret;
373+
return rs.last() ? rs.getRow() : 0;
395374
} catch (SQLException e) {
396375
log.error("fetch data from bdp error", e);
397376
return 0;
398377
}
399378
}
400379

401380
/**
402-
* - 返回0表示未找到任何记录 ;
403-
* - 返回1表示非分区表的全表校验场景找到了记录;
404-
* - 返回2表示分区表的分区校验场景找到了记录;
405-
* - 返回3表示分区表的全表校验场景找到了记录;
406-
* - 返回4表示查询出错了
381+
* 查询dops库
407382
*/
408-
private int checkDops(CheckDataObject dataObject, Connection conn, Logger log){
383+
private long getDopsTotalCount(CheckDataObject dataObject, Connection conn, Logger log) {
384+
409385
log.info("-------------------------------------- search dops data ");
410-
log.info("-------------------------------------- dataObject: " + dataObject.toString());
386+
log.info("-------------------------------------- : " + dataObject.toString());
411387
try (PreparedStatement pstmt = getDopsStatement(conn, dataObject)) {
412388
ResultSet rs = pstmt.executeQuery();
413-
long count = rs.last() ? rs.getRow() : 0;
414-
log.info("-------------------------------------- dops data check table or partition,count:"+count);
415-
if(count>0){
416-
return CheckDataObject.Type.PARTITION == dataObject.getType() ? 2 : 1;
417-
}
389+
return rs.last() ? rs.getRow() : 0;
418390
} catch (SQLException e) {
419-
log.error("fetch data from dops error while check table or partition", e);
391+
log.error("fetch data from dops error", e);
420392
//如果查询出错,还是认为dops处理过这个表/分区
421-
return 4;
393+
return 1;
422394
}
423-
424-
try(PreparedStatement pstmt = getDopsStatementCheckAllPartition(conn, dataObject)){
425-
ResultSet rs = pstmt.executeQuery();
426-
long count = rs.last() ? rs.getRow() : 0;
427-
log.info("-------------------------------------- dops data check all partition result count:"+count);
428-
if(count>0){
429-
return 3;
430-
}
431-
}catch (SQLException e) {
432-
log.error("fetch data from dops error while check all partition", e);
433-
//如果查询出错,还是认为dops处理过这个表/分区
434-
return 4;
435-
}
436-
return 0;
437395
}
438396

439397
/**
440398
* 从qualitis去check数据
441399
*/
442-
private boolean checkQualitisData(String objectNum,CheckDataObject dataObject, Logger log, Properties props,Connection conn, QualitisUtil qualitisUtil) {
400+
private boolean checkQualitisData(CheckDataObject dataObject, Logger log, Properties props, QualitisUtil qualitisUtil) {
443401
boolean systemCheck = Boolean.valueOf(props.getProperty("job.eventchecker.qualitis.switch"));
444402
boolean userCheck = Boolean.valueOf(props.getProperty(DataChecker.QUALITIS_CHECK, "true"));
445403
if (systemCheck && userCheck ) {
446-
447-
int dopsState=checkDops(dataObject,conn,log);
448-
if(dopsState==0){
449-
//没找到记录,直接通过校验
450-
return true;
451-
} else if (dopsState == 3 || dopsState == 4) {
452-
//找记录失败、或者是找到了分区表的全表校验记录,直接校验不通过。
453-
return false;
454-
}
455-
// 其他情况,继续走qualitis校验
456404
log.info(
457405
"=============================Data Check Qualitis Start==========================================");
458406
try {
459-
String projectName = props.getProperty(DataChecker.CONTEXTID_PROJECT_NAME);
460-
String user = props.getProperty(DataChecker.CONTEXTID_USER);
461-
String flowName = props.getProperty(DataChecker.CONTEXTID_FLOW_NAME);
462-
String nodeName=props.getProperty(DataChecker.NAME_NAME);
463-
464-
String ruleName = getMD5Str(projectName + flowName + nodeName + objectNum);
465407
String applicationId = qualitisUtil
466-
.createAndSubmitRule(dataObject,projectName,ruleName,user);
408+
.createAndSubmitRule(dataObject);
467409
if (StringUtils.isEmpty(applicationId)) {
468410
return false;
469411
}
@@ -478,7 +420,7 @@ private boolean checkQualitisData(String objectNum,CheckDataObject dataObject, L
478420
case 12:
479421
try {
480422
Thread
481-
.sleep(Double.valueOf(props.getProperty("qualitis.getStatus.interval")).longValue());
423+
.sleep(Integer.valueOf(props.getProperty("qualitis.getStatus.interval")));
482424
} catch (InterruptedException e) {
483425
log.error("get datachecker result from qualitis InterruptedException", e);
484426
}
@@ -502,9 +444,6 @@ private boolean checkQualitisData(String objectNum,CheckDataObject dataObject, L
502444
return true;
503445
}
504446
}
505-
public static String getMD5Str(String str){
506-
return DigestUtils.md5Hex(str);
507-
}
508447

509448
private Map<String, String> fetchMaskCode(CheckDataObject dataObject, Logger log, Properties props) {
510449
log.info("=============================调用BDP MASK接口查询数据状态==========================================");

dss-appconn/appconns/dss-datachecker-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/datachecker/utils/QualitisUtil.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import com.webank.wedatasphere.dss.appconn.datachecker.common.CheckDataObject;
66
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
77
import okhttp3.*;
8-
import org.apache.commons.codec.digest.DigestUtils;
98
import org.apache.commons.lang.RandomStringUtils;
109
import org.apache.commons.lang.StringUtils;
1110
import org.slf4j.Logger;
@@ -36,7 +35,7 @@ public class QualitisUtil {
3635
String baseUrl = "";
3736
String appId = "";
3837
String appToken = "";
39-
long getStatusTimeout;
38+
int getStatusTimeout;
4039
private Properties properties;
4140

4241

@@ -46,8 +45,8 @@ public QualitisUtil(Properties properties) {
4645
this.baseUrl = this.properties.getProperty("qualitis.baseUrl");
4746
this.appId = this.properties.getProperty("qualitis.appId");
4847
this.appToken = this.properties.getProperty("qualitis.appToken");
49-
this.getStatusTimeout =Double
50-
.valueOf(this.properties.getProperty("qualitis.getStatus.timeout", "60000")).longValue();
48+
this.getStatusTimeout = Integer
49+
.valueOf(this.properties.getProperty("qualitis.getStatus.timeout", "60000"));
5150
}
5251

5352
/**
@@ -112,7 +111,7 @@ public String submitTask(long groupId, String createUser, String executionUser)
112111
return applicationId;
113112
}
114113

115-
public String createAndSubmitRule(CheckDataObject dataObject,String projectName,String ruleName,String user) throws IOException {
114+
public String createAndSubmitRule(CheckDataObject dataObject) throws IOException {
116115
logger.info("");
117116
String applicationId = "";
118117
String url = "";
@@ -125,8 +124,9 @@ public String createAndSubmitRule(CheckDataObject dataObject,String projectName,
125124

126125
// JSON 请求参数
127126
Map<String, Object> param = new HashMap<>();
128-
param.put("create_user", user);
129-
param.put("execution_user",user);
127+
param.put("create_user", properties.getProperty("user.to.proxy"));
128+
param.put("execution_user", properties.getProperty("user.to.proxy"));
129+
// expectFileAmountCount(\"HDP-GZPC-BDAP-UAT.dqm_test.test_dqm_left:ds=${run_date}\", null, false).addRuleMetricWithCheck(\"BDP-DQM_custom-metric_filetest_Year\", false, false, false).fixValueNotEqual(0)
130130
// 集群名
131131
String clusterName = properties.getProperty("cluster.name");
132132
// 子系统名 WTSS-BDPWFM/WTSS-BDAPWFM
@@ -136,20 +136,21 @@ public String createAndSubmitRule(CheckDataObject dataObject,String projectName,
136136
.append(dataObject.getType()== CheckDataObject.Type.PARTITION? ":" + dataObject.getPartitionName() : "")
137137
.append("\", null, false).addRuleMetricWithCheck(\"")
138138
.append(String.format(properties.getProperty("qualitis.rule.metric"),
139-
ruleName))
139+
properties.getProperty(DataChecker.JOB_ID).replace("_", "-")))
140140
.append("\", false, false, false).fixValueNotEqual(0)");
141141
logger.info("template_function:{}",sb);
142142
param.put("template_function", sb.toString());
143-
param.put("project_name", projectName);
144-
param.put("rule_name", ruleName);
143+
param.put("project_name", properties.getProperty("azkaban.flow.projectname"));
144+
param.put("rule_name", properties.getProperty("azkaban.flow.projectname") + properties
145+
.getProperty(DataChecker.JOB_ID) + dataObject);
145146

146147
String json = DSSCommonUtils.COMMON_GSON.toJson(param);
147-
logger.info("start to call qualitis,url:{} ,Request Json:{} ", url, json );
148+
logger.info("Request Json: " + json);
148149
MediaType applicationJson = MediaType.parse("application/json;charset=utf-8");
149150
RequestBody requestBody = RequestBody.create(json, applicationJson);
150151

151152
OkHttpClient okHttpClient = new OkHttpClient.Builder()
152-
.callTimeout(Double.valueOf(properties.getProperty("qualitis.submitTask.timeout")).longValue(), TimeUnit.MILLISECONDS)
153+
.callTimeout(Integer.valueOf(properties.getProperty("qualitis.submitTask.timeout")), TimeUnit.MILLISECONDS)
153154
.connectTimeout(10, TimeUnit.SECONDS)
154155
.writeTimeout(20, TimeUnit.SECONDS)
155156
.readTimeout(20, TimeUnit.SECONDS)
@@ -163,7 +164,7 @@ public String createAndSubmitRule(CheckDataObject dataObject,String projectName,
163164
Call call = okHttpClient.newCall(request);
164165
Response response = call.execute();
165166
String resultJson = response.body().string();
166-
logger.info("call qualitis end,Response Json:{} " , resultJson);
167+
logger.info("Response Json: " + resultJson);
167168
if (StringUtils.isNotEmpty(resultJson)) {
168169
Map<String, Object> resultMap = DSSCommonUtils.COMMON_GSON.fromJson(resultJson,Map.class);
169170
String code = (String) resultMap.get("code");
@@ -224,6 +225,7 @@ public String getTaskStatus(String applicationId){
224225
logger.info("Response Json: " + resultJson);
225226
if (StringUtils.isNotEmpty(resultJson)) {
226227
Map<String, Object> resultMap = DSSCommonUtils.COMMON_GSON.fromJson(resultJson,Map.class);
228+
logger.info(String.valueOf(resultMap));
227229
String code = (String) resultMap.get("code");
228230
if ("200".equals(code)) {
229231
status = ((Map<String, Object>) (resultMap.get("data"))).get(
@@ -255,6 +257,7 @@ public static String getSignature(String appId, String appToken, String nonce, S
255257
return Sha256Utils.getSHA256L32(Sha256Utils.getSHA256L32(appId + nonce + timestamp) + appToken);
256258
}
257259

260+
258261
public static void main(String[] args) throws Exception {
259262

260263
/*Map<Long, Integer> ruleGroup = new HashMap<>();
@@ -295,15 +298,16 @@ public static void main(String[] args) throws Exception {
295298
properties.setProperty("subsystem.name", "WTSS-BDPWFM");
296299
properties.setProperty("azkaban.flow.flowid", "test");
297300
properties.setProperty("azkaban.flow.projectname", "test");
301+
properties.setProperty(DataChecker.JOB_ID,"4134314");
298302
properties.setProperty("qualitis.rule.metric","WTSS-BDPWFM_general-metric_%s_Daily");
299303

300304
String dBName = "dqm_test";
301305
String tableName = "test_dqm_left";
302306
String partitionName = "ds=2023-05-08";
303307

304308
QualitisUtil qualitisUtil = new QualitisUtil(properties);
305-
String applicationId = qualitisUtil.createAndSubmitRule(new CheckDataObject(dBName, tableName, partitionName), "test", "4134314","hadoop");
306-
String applicationId1 = qualitisUtil.createAndSubmitRule(new CheckDataObject(dBName, tableName, partitionName), "test", "4134314","hadoop");
309+
String applicationId = qualitisUtil.createAndSubmitRule(new CheckDataObject(dBName,tableName,partitionName));
310+
String applicationId1 = qualitisUtil.createAndSubmitRule(new CheckDataObject(dBName,tableName,partitionName));
307311
System.out.println(applicationId);
308312
System.out.println(applicationId1);
309313

dss-appconn/appconns/dss-datachecker-appconn/src/main/resources/appconn.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ bdp.datachecker.jdo.option.username=
2626
bdp.datachecker.jdo.option.password=
2727
bdp.datachecker.jdo.option.login.type=base64
2828

29-
dops.datachecker.jdo.option.name=dops
29+
dops.datachecker.jdo.option.name="dops"
3030
dops.datachecker.jdo.option.url=
3131
dops.datachecker.jdo.option.username=
3232
dops.datachecker.jdo.option.password=
@@ -54,7 +54,7 @@ qualitis.getStatus.timeout=60000
5454
# ???
5555
cluster.name=HDP-GZPC-BDAP-UAT
5656
# Qualitis ????
57-
qualitis.rule.metric=DSS-IDE_general-metric_%s_Daily
57+
qualitis.rule.metric=WTSS-BDPWFM_general-metric_%s_Daily
5858
# Qualitis ??????????
5959
qualitis.getStatus.all.timeout=60000
6060

0 commit comments

Comments
 (0)