From 233785ac3c03e9c8d1ad6be9adcee23bf352b970 Mon Sep 17 00:00:00 2001 From: peacewong Date: Mon, 20 Jan 2025 12:08:01 +0800 Subject: [PATCH] Optimize monitor module --- .../linkis/monitor/config/ListenerConfig.java | 4 +- .../dao/UserDepartmentInfoMapper.java | 45 +++++ .../department/entity/UserDepartmentInfo.java | 150 +++++++++++++++ .../monitor/entity/ClientSingleton.java | 107 +++++++++++ .../linkis/monitor/jobhistory/QueryUtils.java | 2 +- .../scheduled/EntranceTaskMonitor.java | 7 +- .../monitor/scheduled/JobHistoryMonitor.java | 67 ++++++- .../monitor/scheduled/ResourceMonitor.java | 2 +- .../scheduled/UserDepartmentInfoSync.java | 146 ++++++++++++++ .../monitor/scheduled/UserModeMonitor.java | 7 +- .../linkis/monitor/until/HttpsUntils.java | 181 +++++++++--------- .../linkis/monitor/until/JobMonitorUtils.java | 8 +- .../linkis/monitor/until/ThreadUtils.java | 9 + .../mapper/common/JobHistoryMapper.xml | 4 +- .../common/UserDepartmentInfoMapper.xml | 57 ++++++ ...kisJobHistoryScanSpringConfiguration.scala | 5 + .../monitor/client/MonitorHTTPClient.scala | 42 +++- .../client/MonitorHTTPClientClientImpl.scala | 4 +- .../client/MonitorResourceClient.scala | 4 +- .../client/MonitorResourceClientImpl.scala | 6 +- .../linkis/monitor/constants/Constants.scala | 15 ++ .../monitor/factory/MapperFactory.scala | 11 ++ .../jobhistory/JobHistoryDataFetcher.scala | 94 ++++----- .../JobHistoryAnalyzeAlertSender.scala | 34 ++++ .../analyze/JobHistoryAnalyzeHitEvent.scala | 22 +++ .../analyze/JobHistoryAnalyzeRule.scala | 60 ++++++ .../jobtime/JobTimeExceedRule.scala | 3 +- .../StarrocksTimeExceedAlertSender.scala | 84 ++++++++ .../jobtime/StarrocksTimeExceedHitEvent.scala | 22 +++ .../jobtime/StarrocksTimeExceedRule.scala | 128 +++++++++++++ .../StarrocksTimeKillAlertSender.scala | 30 +++ .../jobtime/StarrocksTimeKillRule.scala | 125 ++++++++++++ .../monitor/request/AnalyzeJobAction.scala | 58 ++++++ .../request/DataSourceParamsAction.scala | 92 +++++++++ .../monitor/request/EmsListAction.scala | 2 +- .../monitor/request/EntranceTaskAction.scala | 2 +- .../monitor/request/KeyvalueAction.scala | 80 ++++++++ .../monitor/request/KillJobAction.scala | 80 ++++++++ .../request/MonitorResourceAction.scala | 3 +- .../linkis/monitor/request/UserAction.scala | 10 +- .../response/AnalyzeJobResultAction.scala | 33 ++++ .../monitor/response/KeyvalueResult.scala | 36 ++++ .../response/KillJobResultAction.scala | 33 ++++ .../utils/alert/ims/MonitorAlertUtils.scala | 31 ++- .../monitor/utils/job/JohistoryUtils.scala | 43 +++++ 45 files changed, 1791 insertions(+), 197 deletions(-) create mode 100644 linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java create mode 100644 linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java create mode 100644 linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java create mode 100644 linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java create mode 100644 linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/UserDepartmentInfoMapper.xml create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala create mode 100644 linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java index eb5c11af87..98aec85f00 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/config/ListenerConfig.java @@ -17,7 +17,7 @@ package org.apache.linkis.monitor.config; -import org.apache.linkis.monitor.until.HttpsUntils; +import org.apache.linkis.monitor.entity.ClientSingleton; import org.apache.linkis.monitor.until.ThreadUtils; import org.apache.linkis.monitor.utils.log.LogUtils; @@ -38,7 +38,7 @@ public class ListenerConfig { private void shutdownEntrance(ContextClosedEvent event) { try { ThreadUtils.executors.shutdown(); - HttpsUntils.client.close(); + ClientSingleton.getInstance().close(); } catch (IOException e) { logger.error("ListenerConfig error msg {}", e.getMessage()); } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java new file mode 100644 index 0000000000..1b37d30cb5 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/dao/UserDepartmentInfoMapper.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.department.dao; + +import org.apache.linkis.monitor.department.entity.UserDepartmentInfo; + +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; + +@Mapper +public interface UserDepartmentInfoMapper { + + void insertUser(UserDepartmentInfo user); + + @Transactional(rollbackFor = Exception.class) + int batchInsertUsers(@Param("userDepartmentInfos") List userDepartmentInfos); + + void updateUser(UserDepartmentInfo user); + + UserDepartmentInfo selectUser(@Param("userName") String userName); + + @Transactional(rollbackFor = Exception.class) + void deleteUser(); + + List selectAllUsers(); +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java new file mode 100644 index 0000000000..c5ea27e7ce --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/department/entity/UserDepartmentInfo.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.department.entity; + +import java.util.Date; + +public class UserDepartmentInfo { + + private String clusterCode; + + private String userType; + private String userName; + private String orgId; + private String orgName; + private String queueName; + private String dbName; + private String interfaceUser; + private String isUnionAnalyse; + private Date createTime; + private String userItsmNo; + + // 构造函数、getter和setter方法 + public UserDepartmentInfo( + String clusterCode, + String userType, + String userName, + String orgId, + String orgName, + String queueName, + String dbName, + String interfaceUser, + String isUnionAnalyse, + Date createTime, + String userItsmNo) { + this.clusterCode = clusterCode; + this.userType = userType; + this.userName = userName; + this.orgId = orgId; + this.orgName = orgName; + this.queueName = queueName; + this.dbName = dbName; + this.interfaceUser = interfaceUser; + this.isUnionAnalyse = isUnionAnalyse; + this.createTime = createTime; + this.userItsmNo = userItsmNo; + } + + public String getClusterCode() { + return clusterCode; + } + + public void setClusterCode(String clusterCode) { + this.clusterCode = clusterCode; + } + + public String getUserType() { + return userType; + } + + public void setUserType(String userType) { + this.userType = userType; + } + + public String getUserName() { + return userName; + } + + public void setUserName(String userName) { + this.userName = userName; + } + + public String getOrgId() { + return orgId; + } + + public void setOrgId(String orgId) { + this.orgId = orgId; + } + + public String getOrgName() { + return orgName; + } + + public void setOrgName(String orgName) { + this.orgName = orgName; + } + + public String getQueueName() { + return queueName; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getInterfaceUser() { + return interfaceUser; + } + + public void setInterfaceUser(String interfaceUser) { + this.interfaceUser = interfaceUser; + } + + public String getIsUnionAnalyse() { + return isUnionAnalyse; + } + + public void setIsUnionAnalyse(String isUnionAnalyse) { + this.isUnionAnalyse = isUnionAnalyse; + } + + public Date getCreateTime() { + return createTime; + } + + public void setCreateTime(Date createTime) { + this.createTime = createTime; + } + + public String getUserItsmNo() { + return userItsmNo; + } + + public void setUserItsmNo(String userItsmNo) { + this.userItsmNo = userItsmNo; + } +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java new file mode 100644 index 0000000000..3c660426b3 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/entity/ClientSingleton.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.entity; + +import org.apache.linkis.bml.conf.BmlConfiguration; +import org.apache.linkis.common.conf.Configuration; +import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy; +import org.apache.linkis.httpclient.dws.config.DWSClientConfig; +import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder; +import org.apache.linkis.monitor.client.MonitorHTTPClient; +import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl; + +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ClientSingleton { + private static MonitorHTTPClient instance; + private static DWSClientConfig dwsClientConfig; + + private ClientSingleton() {} + + public static synchronized MonitorHTTPClient getInstance() { + if (instance == null) { + if (dwsClientConfig == null) { + dwsClientConfig = createClientConfig(null, null); // NOSONAR + } + instance = new MonitorHTTPClientClientImpl(dwsClientConfig); + } + return instance; + } + + public static DWSClientConfig createClientConfig(String url, Map properties) { + String realUrl = ""; + if (StringUtils.isBlank(url)) { + realUrl = Configuration.getGateWayURL(); + } else { + realUrl = url; + } + Map parms = new HashMap<>(); + if (MapUtils.isNotEmpty(properties)) { + parms = properties; + } + int maxConnection = + (int) + parms.getOrDefault( // NOSONAR + BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(), + BmlConfiguration.CONNECTION_MAX_SIZE().getValue()); + int connectTimeout = + (int) + parms.getOrDefault( + BmlConfiguration.CONNECTION_TIMEOUT_SHORT_NAME(), + BmlConfiguration.CONNECTION_TIMEOUT().getValue()); + int readTimeout = + (int) + parms.getOrDefault( + BmlConfiguration.CONNECTION_READ_TIMEOUT_SHORT_NAME(), + BmlConfiguration.CONNECTION_READ_TIMEOUT().getValue()); + String tokenKey = + (String) + parms.getOrDefault( + BmlConfiguration.AUTH_TOKEN_KEY_SHORT_NAME(), + BmlConfiguration.AUTH_TOKEN_KEY().getValue()); + String tokenValue = + (String) + parms.getOrDefault( + BmlConfiguration.AUTH_TOKEN_VALUE_SHORT_NAME(), + BmlConfiguration.AUTH_TOKEN_VALUE().getValue()); + + DWSClientConfig clientConfig = + ((DWSClientConfigBuilder) + (DWSClientConfigBuilder.newBuilder() + .addServerUrl(realUrl) + .connectionTimeout(connectTimeout) + .discoveryEnabled(false) + .discoveryFrequency(1, TimeUnit.MINUTES) + .loadbalancerEnabled(false) + .maxConnectionSize(maxConnection) + .retryEnabled(false) + .readTimeout(readTimeout) + .setAuthenticationStrategy(new TokenAuthenticationStrategy()) + .setAuthTokenKey(tokenKey) + .setAuthTokenValue(tokenValue))) + .setDWSVersion("v1") + .build(); + + return clientConfig; + } +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java index aa73471c49..688fda5afb 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/jobhistory/QueryUtils.java @@ -23,7 +23,7 @@ public class QueryUtils { - private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + private static DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); // NOSONAR public static String dateToString(Date date) { return dateFormat.format(date); diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java index edce194481..02ea5322e9 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/EntranceTaskMonitor.java @@ -121,7 +121,8 @@ public void entranceTask() throws IOException { }); Map likisData = null; try { - likisData = MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop", null), "data"); + likisData = + MapUtils.getMap(HttpsUntils.getEntranceTask(null, Constants.ADMIN_USER(), null), "data"); logger.info("TaskMonitor hadoop response {}:", likisData); } catch (IOException e) { logger.warn("failed to get EntranceTask data"); @@ -163,7 +164,9 @@ public static void resourceSendToIms() { try { // 通过serviceInstance 获取entrance中任务数量信息 Map entranceData = - MapUtils.getMap(HttpsUntils.getEntranceTask(null, "hadoop", entranceService), "data"); + MapUtils.getMap( + HttpsUntils.getEntranceTask(null, Constants.ADMIN_USER(), entranceService), + "data"); int runningTaskNumber = 0; int queuedTaskNumber = 0; int totalTaskNumber = 0; diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java index be7758f7a0..c1a7a16d06 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/JobHistoryMonitor.java @@ -23,12 +23,13 @@ import org.apache.linkis.monitor.core.scanner.AnomalyScanner; import org.apache.linkis.monitor.core.scanner.DefaultScanner; import org.apache.linkis.monitor.factory.MapperFactory; +import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeAlertSender; +import org.apache.linkis.monitor.jobhistory.analyze.JobHistoryAnalyzeRule; import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrCodeRule; import org.apache.linkis.monitor.jobhistory.errorcode.JobHistoryErrorCodeAlertSender; import org.apache.linkis.monitor.jobhistory.index.JobIndexRule; import org.apache.linkis.monitor.jobhistory.index.JobIndexSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedAlertSender; -import org.apache.linkis.monitor.jobhistory.jobtime.JobTimeExceedRule; +import org.apache.linkis.monitor.jobhistory.jobtime.*; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsAlertSender; import org.apache.linkis.monitor.jobhistory.labels.JobHistoryLabelsRule; import org.apache.linkis.monitor.jobhistory.runtime.CommonJobRunTimeRule; @@ -75,7 +76,7 @@ public class JobHistoryMonitor { @Scheduled(cron = "${linkis.monitor.jobHistory.finished.cron}") public void jobHistoryFinishedScan() { logger.info("Start scan jobHistoryFinishedScan"); - long intervalMs = 20 * 60 * 1000; + long intervalMs = 20 * 60 * 1000L; long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; long endTime = System.currentTimeMillis(); long startTime = endTime - intervalMs; @@ -98,7 +99,7 @@ public void jobHistoryFinishedScan() { logger.info("Get JobHistoryId from cache ID:" + id); } List fetchers = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "updated_time"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "finished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -169,6 +170,14 @@ public void jobHistoryFinishedScan() { } catch (Exception e) { logger.warn("CommonJobRunTimeRule Scan Error msg: " + e.getMessage()); } + // 新增失败任务分析扫描 + try { + JobHistoryAnalyzeRule jobHistoryAnalyzeRule = + new JobHistoryAnalyzeRule(new JobHistoryAnalyzeAlertSender()); + scanner.addScanRule(jobHistoryAnalyzeRule); + } catch (Exception e) { + logger.warn("JobHistoryAnalyzeRule Scan Error msg: " + e.getMessage()); + } // 执行任务扫描 JobMonitorUtils.run(scanner, fetchers, true); @@ -176,7 +185,7 @@ public void jobHistoryFinishedScan() { JobIndexRule jobIndexRule = new JobIndexRule(new JobIndexSender()); scannerIndex.addScanRule(jobIndexRule); List createFetcher = - JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, "department"); + JobMonitorUtils.generateFetchersfortime(startTime, endTime, id, ""); JobMonitorUtils.run(scannerIndex, createFetcher, true); } @@ -193,7 +202,7 @@ public void jobHistoryUnfinishedScan() { AnomalyScanner scanner = new DefaultScanner(); boolean shouldStart = false; List fetchers = - JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "created_time"); + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, "unfinished_job"); if (fetchers.isEmpty()) { logger.warn("generated 0 dataFetchers, plz check input"); return; @@ -215,4 +224,50 @@ public void jobHistoryUnfinishedScan() { } JobMonitorUtils.run(scanner, fetchers, shouldStart); } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,告警要求:管理台配置告警相关参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.alert.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedAlertScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedAlertScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000L; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } + StarrocksTimeExceedRule starrocksTimeExceedRule = + new StarrocksTimeExceedRule(new StarrocksTimeExceedAlertSender()); + scanner.addScanRule(starrocksTimeExceedRule); + JobMonitorUtils.run(scanner, fetchers, true); + } + + /** * 每10分钟扫描一次,扫描两个小时之内的任务,满足要求触发kill kill要求:数据源配置kill参数 */ + @Scheduled(cron = "${linkis.monitor.jdbc.timeout.kill.cron:0 0/10 0 * * ?}") + public void jdbcUnfinishedKillScan() { + long id = + Optional.ofNullable(CacheUtils.cacheBuilder.getIfPresent("jdbcUnfinishedKillScan")) + .orElse(MonitorConfig.JOB_HISTORY_TIME_EXCEED.getValue()); + long intervalMs = 7200 * 1000L; + long maxIntervalMs = Constants.ERRORCODE_MAX_INTERVALS_SECONDS() * 1000; + long endTime = System.currentTimeMillis(); + long startTime = endTime - intervalMs; + AnomalyScanner scanner = new DefaultScanner(); + List fetchers = + JobMonitorUtils.generateFetchers(startTime, endTime, maxIntervalMs, id, ""); + if (fetchers.isEmpty()) { + logger.warn("jdbcUnfinishedScan generated 0 dataFetchers, plz check input"); + return; + } + StarrocksTimeKillRule starrocksTimeKillRule = + new StarrocksTimeKillRule(new StarrocksTimeKillAlertSender()); + scanner.addScanRule(starrocksTimeKillRule); + JobMonitorUtils.run(scanner, fetchers, true); + } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java index e8658050e0..a9a1a30887 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/ResourceMonitor.java @@ -61,7 +61,7 @@ public void ecmResourceTask() { // 获取emNode资源信息 List> emNodeVoList = new ArrayList<>(); try { - Map resultmap = HttpsUntils.sendHttp(null, null); + Map resultmap = HttpsUntils.getEmsResourceList(); // got interface data Map>> data = MapUtils.getMap(resultmap, "data"); emNodeVoList = data.getOrDefault("EMs", new ArrayList<>()); diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java new file mode 100644 index 0000000000..338d220489 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserDepartmentInfoSync.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.scheduled; + +import org.apache.linkis.monitor.constants.Constants; +import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper; +import org.apache.linkis.monitor.department.entity.UserDepartmentInfo; +import org.apache.linkis.monitor.factory.MapperFactory; +import org.apache.linkis.monitor.utils.alert.AlertDesc; +import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils; +import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils; + +import org.apache.commons.lang3.StringUtils; + +import org.springframework.context.annotation.PropertySource; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Component +@PropertySource(value = "classpath:linkis-et-monitor.properties", encoding = "UTF-8") +public class UserDepartmentInfoSync { + + private static final Logger logger = LoggerFactory.getLogger(ResourceMonitor.class); + private static final int pagesize = 5000; + + private static final UserDepartmentInfoMapper userDepartmentInfoMapper = + MapperFactory.getUserDepartmentInfoMapper(); + + @Scheduled(cron = "${linkis.monitor.org.user.sync.cron:0 0 11 1/7 * ?}") + public static void DepartmentInfoSync() { + // 获取linkis_org_user_sync信息 + // 收集异常用户 + List alterList = new ArrayList<>(); + int pageNum = 1; // 初始pageNum + while (true) { + List departSyncList = null; + PageHelper.startPage(pageNum, pagesize); + try { + departSyncList = userDepartmentInfoMapper.selectAllUsers(); + } finally { + PageHelper.clearPage(); + } + PageInfo pageInfo = new PageInfo<>(departSyncList); + // 处理 departSyncList 中的数据 + processDepartSyncList(pageInfo.getList(), alterList); + if (!pageInfo.isHasNextPage()) { + break; // 没有更多记录,退出循环 + } + pageNum++; + } + // 统计异常名称,然后发送告警 + String usernames = + alterList.stream() + .filter(s -> StringUtils.isNotBlank(s.getUserName())) + .map(UserDepartmentInfo::getUserName) + .limit(5) + .collect(Collectors.joining(",")); + if (StringUtils.isNotBlank(usernames)) { + HashMap replaceParm = new HashMap<>(); + replaceParm.put("$user", usernames); + replaceParm.put("$count", String.valueOf(alterList.size())); + Map ecmResourceAlerts = + MonitorAlertUtils.getAlerts(Constants.DEPARTMENT_USER_IM(), replaceParm); + PooledImsAlertUtils.addAlert(ecmResourceAlerts.get("12019")); + } + } + + private static void processDepartSyncList( + List departSyncList, List alterList) { + if (CollectionUtils.isEmpty(departSyncList)) { + logger.info("No user department info to sync"); + // 并且发送告警通知 + return; + } else { + logger.info("Start to sync user department info"); + // 收集异常用户 + List errorUserList = + departSyncList.stream() + .filter( + userDepartmentInfo -> + StringUtils.isNotBlank(userDepartmentInfo.getUserName()) + && (StringUtils.isBlank(userDepartmentInfo.getOrgId()) + || StringUtils.isBlank(userDepartmentInfo.getOrgName()))) + .collect(Collectors.toList()); + // 收集需要同步用户 + List syncList = + departSyncList.stream() + .filter( + userDepartmentInfo -> + StringUtils.isNotBlank(userDepartmentInfo.getUserName()) + && StringUtils.isNotBlank(userDepartmentInfo.getOrgId()) + && StringUtils.isNotBlank(userDepartmentInfo.getOrgName())) + .collect(Collectors.toList()); + if (!CollectionUtils.isEmpty(errorUserList)) { + alterList.addAll(errorUserList); + } + if (!CollectionUtils.isEmpty(syncList)) { + // 同步用户 + List insertList = new ArrayList<>(); + syncList.forEach( + departSyncInfo -> { + UserDepartmentInfo userDepartmentInfo = + userDepartmentInfoMapper.selectUser(departSyncInfo.getUserName()); + if (null == userDepartmentInfo) { + insertList.add(departSyncInfo); + } else { + if ((!departSyncInfo.getOrgId().equals(userDepartmentInfo.getOrgId())) + || (!departSyncInfo.getOrgName().equals(userDepartmentInfo.getOrgName()))) { + userDepartmentInfoMapper.updateUser(departSyncInfo); + } + } + }); + if (!CollectionUtils.isEmpty(insertList)) { + userDepartmentInfoMapper.batchInsertUsers(insertList); + } + } + } + } +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java index ad6f861479..e55e01352d 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/scheduled/UserModeMonitor.java @@ -24,7 +24,7 @@ import org.apache.linkis.manager.label.constant.LabelKeyConstant; import org.apache.linkis.monitor.config.MonitorConfig; import org.apache.linkis.monitor.constants.Constants; -import org.apache.linkis.monitor.until.HttpsUntils; +import org.apache.linkis.monitor.entity.ClientSingleton; import org.apache.linkis.monitor.utils.alert.AlertDesc; import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils; import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils; @@ -57,7 +57,8 @@ public class UserModeMonitor { private static final Logger logger = LoggerFactory.getLogger(UserModeMonitor.class); - private static final DWSClientConfig clientConfig = HttpsUntils.dwsClientConfig; + private static final DWSClientConfig clientConfig = + ClientSingleton.createClientConfig(null, null); private static final UJESClient client = new UJESClientImpl(clientConfig); @@ -138,7 +139,7 @@ private static JobExecuteResult toSubmit(LinkedTreeMap engine) { public void dbJob() { Map properties = new HashMap<>(); properties.put("readTimeout", MonitorConfig.USER_MODE_INTERFACE_TIMEOUT.getValue()); - DWSClientConfig clientConfig = HttpsUntils.createClientConfig(null, properties); + DWSClientConfig clientConfig = ClientSingleton.createClientConfig(null, properties); UJESClientImpl ujesClient = new UJESClientImpl(clientConfig); GetTableStatisticInfoAction builder = GetTableStatisticInfoAction.builder() diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java index a504a9d41d..0476765594 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/HttpsUntils.java @@ -17,24 +17,26 @@ package org.apache.linkis.monitor.until; -import org.apache.linkis.bml.conf.BmlConfiguration; -import org.apache.linkis.common.conf.Configuration; import org.apache.linkis.common.utils.Utils; -import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy; -import org.apache.linkis.httpclient.dws.config.DWSClientConfig; -import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder; +import org.apache.linkis.datasource.client.response.GetInfoPublishedByDataSourceNameResult; +import org.apache.linkis.governance.common.conf.GovernanceCommonConf; import org.apache.linkis.monitor.client.MonitorHTTPClient; -import org.apache.linkis.monitor.client.MonitorHTTPClientClientImpl; import org.apache.linkis.monitor.config.MonitorConfig; +import org.apache.linkis.monitor.constants.Constants; +import org.apache.linkis.monitor.entity.ClientSingleton; import org.apache.linkis.monitor.entity.IndexEntity; -import org.apache.linkis.monitor.request.EmsListAction; -import org.apache.linkis.monitor.request.EntranceTaskAction; +import org.apache.linkis.monitor.jobhistory.entity.JobHistory; +import org.apache.linkis.monitor.request.*; +import org.apache.linkis.monitor.response.AnalyzeJobResultAction; import org.apache.linkis.monitor.response.EntranceTaskResult; +import org.apache.linkis.monitor.response.KeyvalueResult; +import org.apache.linkis.monitor.response.KillJobResultAction; +import org.apache.linkis.protocol.utils.ZuulEntranceUtils; import org.apache.linkis.server.BDPJettyServerHelper; import org.apache.linkis.ujes.client.response.EmsListResult; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -47,10 +49,8 @@ import org.springframework.util.Assert; import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.nio.charset.StandardCharsets; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,89 +58,18 @@ public class HttpsUntils { private static final Logger logger = LoggerFactory.getLogger(HttpsUntils.class); - public static DWSClientConfig dwsClientConfig = createClientConfig(null, null); - // IOUtils.closeQuietly(client); - public static MonitorHTTPClient client = new MonitorHTTPClientClientImpl(dwsClientConfig); public static final String localHost = Utils.getLocalHostname(); - public static Map sendHttp(String url, Map properties) - throws IOException { - if (null == dwsClientConfig) { - dwsClientConfig = createClientConfig(url, properties); - } - if (null == client) { - client = new MonitorHTTPClientClientImpl(dwsClientConfig); - } - EmsListAction build = EmsListAction.newBuilder().setUser("hadoop").build(); + public static Map getEmsResourceList() throws IOException { + MonitorHTTPClient client = ClientSingleton.getInstance(); + EmsListAction build = EmsListAction.newBuilder().setUser(Constants.ADMIN_USER()).build(); EmsListResult result = client.list(build); return result.getResultMap(); } - public static DWSClientConfig createClientConfig(String url, Map properties) { - String realUrl = ""; - if (StringUtils.isBlank(url)) { - realUrl = Configuration.getGateWayURL(); - } else { - realUrl = url; - } - Map parms = new HashMap<>(); - if (MapUtils.isNotEmpty(properties)) { - parms = properties; - } - int maxConnection = - (int) - parms.getOrDefault( - BmlConfiguration.CONNECTION_MAX_SIZE_SHORT_NAME(), - BmlConfiguration.CONNECTION_MAX_SIZE().getValue()); - int connectTimeout = - (int) - parms.getOrDefault( - BmlConfiguration.CONNECTION_TIMEOUT_SHORT_NAME(), - BmlConfiguration.CONNECTION_TIMEOUT().getValue()); - int readTimeout = - (int) - parms.getOrDefault( - BmlConfiguration.CONNECTION_READ_TIMEOUT_SHORT_NAME(), - BmlConfiguration.CONNECTION_READ_TIMEOUT().getValue()); - String tokenKey = - (String) - parms.getOrDefault( - BmlConfiguration.AUTH_TOKEN_KEY_SHORT_NAME(), - BmlConfiguration.AUTH_TOKEN_KEY().getValue()); - String tokenValue = - (String) - parms.getOrDefault( - BmlConfiguration.AUTH_TOKEN_VALUE_SHORT_NAME(), - BmlConfiguration.AUTH_TOKEN_VALUE().getValue()); - - DWSClientConfig clientConfig = - ((DWSClientConfigBuilder) - (DWSClientConfigBuilder.newBuilder() - .addServerUrl(realUrl) - .connectionTimeout(connectTimeout) - .discoveryEnabled(false) - .discoveryFrequency(1, TimeUnit.MINUTES) - .loadbalancerEnabled(false) - .maxConnectionSize(maxConnection) - .retryEnabled(false) - .readTimeout(readTimeout) - .setAuthenticationStrategy(new TokenAuthenticationStrategy()) - .setAuthTokenKey(tokenKey) - .setAuthTokenValue(tokenValue))) - .setDWSVersion("v1") - .build(); - - return clientConfig; - } - public static Map getEntranceTask(String url, String user, String Instance) throws IOException { - if (null == dwsClientConfig) { - dwsClientConfig = createClientConfig(null, null); - } - if (null == client) { - client = new MonitorHTTPClientClientImpl(dwsClientConfig); - } + MonitorHTTPClient client = ClientSingleton.getInstance(); EntranceTaskAction build = EntranceTaskAction.newBuilder().setUser(user).setInstance(Instance).build(); EntranceTaskResult result = client.entranList(build); @@ -156,8 +85,10 @@ public static void sendIndex(List list) throws IOException { RequestConfig requestConfig = RequestConfig.DEFAULT; StringEntity entity = new StringEntity( - json, ContentType.create(ContentType.APPLICATION_JSON.getMimeType(), "UTF-8")); - entity.setContentEncoding("UTF-8"); + json, + ContentType.create( + ContentType.APPLICATION_JSON.getMimeType(), StandardCharsets.UTF_8.toString())); + entity.setContentEncoding(StandardCharsets.UTF_8.toString()); HttpPost httpPost = new HttpPost(MonitorConfig.ECM_TASK_IMURL.getValue()); httpPost.setConfig(requestConfig); @@ -165,9 +96,77 @@ public static void sendIndex(List list) throws IOException { CloseableHttpClient httpClient = HttpClients.createDefault(); CloseableHttpResponse execute = httpClient.execute(httpPost); - String responseStr = EntityUtils.toString(execute.getEntity(), "UTF-8"); + String responseStr = + EntityUtils.toString(execute.getEntity(), StandardCharsets.UTF_8.toString()); Map map = BDPJettyServerHelper.gson().fromJson(responseStr, Map.class); logger.info("send index response :{}", map); Assert.isTrue(!"0".equals(map.get("resultCode")), map.get("resultMsg")); } + + public static String getJDBCConf(String user, String conf) { + MonitorHTTPClient client = ClientSingleton.getInstance(); + KeyvalueAction build = + KeyvalueAction.newBuilder() + .setVersion("4") + .setEngineType(Constants.JDBC_ENGINE()) + .setCreator("IDE") + .setConfigKey(conf) + .setUser(user) + .build(); + KeyvalueResult result = client.getConfKeyValue(build); + Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>()); + ArrayList arrayList = (ArrayList) data.get("configValues"); + if (CollectionUtils.isNotEmpty(arrayList)) { + String json = BDPJettyServerHelper.gson().toJson(arrayList.get(0)); + Map map = BDPJettyServerHelper.gson().fromJson(json, Map.class); + return MapUtils.getString(map, "configValue", ""); + } else { + return ""; + } + } + + public static Map getDatasourceConf(String user, String datasourceName) { + MonitorHTTPClient client = ClientSingleton.getInstance(); + DataSourceParamsAction dataSourceParamsAction = + DataSourceParamsAction.builder() + .setSystem(Constants.ALERT_SUB_SYSTEM_ID()) + .setDataSourceName(datasourceName) + .setUser(user) + .build(); + GetInfoPublishedByDataSourceNameResult result = + client.getInfoByDataSourceInfo(dataSourceParamsAction); + Map data = MapUtils.getMap(result.getResultMap(), "data", new HashMap<>()); + Map datasourceInfoMap = MapUtils.getMap(data, "info", new HashMap<>()); + return datasourceInfoMap; + } + + public static void killJob(JobHistory jobHistory) { + MonitorHTTPClient client = ClientSingleton.getInstance(); + String[] split = jobHistory.getInstances().split(Constants.SPLIT_DELIMITER()); + String execID = + ZuulEntranceUtils.generateExecID( + jobHistory.getJobReqId(), + GovernanceCommonConf.ENTRANCE_SERVICE_NAME().getValue(), + split); + KillJobAction killJobAction = + KillJobAction.builder() + .setIdList(Collections.singletonList(execID)) + .setTaskIDList(Collections.singletonList(jobHistory.getId())) + .setExecID(execID) + .setUser(jobHistory.getSubmitUser()) + .build(); + KillJobResultAction killJobResultAction = client.killJob(killJobAction); + Map data = MapUtils.getMap(killJobResultAction.getResultMap(), "data", new HashMap<>()); + } + + public static void analyzeJob(JobHistory jobHistory) { + MonitorHTTPClient client = ClientSingleton.getInstance(); + + AnalyzeJobAction analyzeJobAction = + AnalyzeJobAction.newBuilder() + .setTaskID(String.valueOf(jobHistory.getId())) + .setUser(Constants.ADMIN_USER()) + .build(); + AnalyzeJobResultAction analyzeJobResultAction = client.analyzeJob(analyzeJobAction); + } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java index 3366014c89..66dd4a3b68 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/JobMonitorUtils.java @@ -44,14 +44,14 @@ public static void run(AnomalyScanner scanner, List fetchers, Boole } public static List generateFetchers( - long startTime, long endTime, long maxIntervalMs, long id, String timeType) { + long startTime, long endTime, long maxIntervalMs, long id, String jobStatus) { List ret = new ArrayList<>(); long pe = endTime; long ps; while (pe > startTime) { ps = Math.max(pe - maxIntervalMs, startTime); String[] fetcherArgs = - new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), timeType}; + new String[] {String.valueOf(ps), String.valueOf(pe), String.valueOf(id), jobStatus}; ret.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( "Generated dataFetcher for startTime: " + new Date(ps) + ". EndTime: " + new Date(pe)); @@ -61,11 +61,11 @@ public static List generateFetchers( } public static List generateFetchersfortime( - long startTime, long endTime, long id, String timeType) { + long startTime, long endTime, long id, String jobStatus) { List fetchers = new ArrayList<>(); String[] fetcherArgs = new String[] { - String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), timeType + String.valueOf(startTime), String.valueOf(endTime), String.valueOf(id), jobStatus }; fetchers.add(new JobHistoryDataFetcher(fetcherArgs, MapperFactory.getJobHistoryMapper())); logger.info( diff --git a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java index 15a2626379..5e4133aa90 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java +++ b/linkis-extensions/linkis-et-monitor/src/main/java/org/apache/linkis/monitor/until/ThreadUtils.java @@ -20,6 +20,7 @@ import org.apache.linkis.common.utils.Utils; import org.apache.linkis.monitor.config.MonitorConfig; import org.apache.linkis.monitor.constants.Constants; +import org.apache.linkis.monitor.jobhistory.entity.JobHistory; import org.apache.linkis.monitor.utils.alert.AlertDesc; import org.apache.linkis.monitor.utils.alert.ims.MonitorAlertUtils; import org.apache.linkis.monitor.utils.alert.ims.PooledImsAlertUtils; @@ -42,6 +43,9 @@ public class ThreadUtils extends ApplicationContextEvent { public static ExecutionContextExecutorService executors = Utils.newCachedExecutionContext(5, "alert-pool-thread-", false); + public static ExecutionContextExecutorService executors_analyze = + Utils.newCachedExecutionContext(50, "analyze-pool-thread-", false); + public ThreadUtils(ApplicationContext source) { super(source); } @@ -64,4 +68,9 @@ public static String run(List cmdList, String shellName) { } return msg; } + + public static void analyzeRun(JobHistory jobHistory) { + FutureTask future = new FutureTask(() -> HttpsUntils.analyzeJob(jobHistory), -1); + executors_analyze.submit(future); + } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml index 7be8f93154..dcab938da9 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml +++ b/linkis-extensions/linkis-et-monitor/src/main/resources/mapper/common/JobHistoryMapper.xml @@ -47,7 +47,7 @@ job.`id`,job.`job_req_id`,job.`submit_user`,job.`execute_user`,job.`labels`,job.`params`,job.`status`,job.`error_code`,job.`created_time`, - job.`updated_time`,job.`instances`,job.`observe_info`,org.`org_id`,org.`org_name` + job.`updated_time`,job.`instances`,job.`engine_type`,job.`observe_info`,org.`org_id`,org.`org_name` + SELECT * FROM linkis_org_user WHERE user_name = #{userName} + + + + + DELETE FROM linkis_org_user + + + + + + + + + diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala index e3652306c1..c6b2b04d59 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/LinkisJobHistoryScanSpringConfiguration.scala @@ -17,6 +17,7 @@ package org.apache.linkis.monitor +import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper import org.apache.linkis.monitor.factory.MapperFactory import org.apache.linkis.monitor.instance.dao.InstanceInfoDao import org.apache.linkis.monitor.jobhistory.dao.JobHistoryMapper @@ -39,10 +40,14 @@ class LinkisJobHistoryScanSpringConfiguration { @Autowired private var instanceInfoMapper: InstanceInfoDao = _ + @Autowired + private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _ + @PostConstruct def init(): Unit = { MapperFactory.setJobHistoryMapper(jobHistoryMapper) MapperFactory.setInstanceInfoMapper(instanceInfoMapper) + MapperFactory.setUserDepartmentInfoMapper(userDepartmentInfoMapper) } } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala index 4caccd73a3..ab22404d59 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClient.scala @@ -17,12 +17,30 @@ package org.apache.linkis.monitor.client +import org.apache.linkis.datasource.client.response.{ + GetConnectParamsByDataSourceNameResult, + GetInfoByDataSourceNameResult, + GetInfoPublishedByDataSourceNameResult +} import org.apache.linkis.httpclient.authentication.AuthenticationStrategy import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import org.apache.linkis.httpclient.response.Result -import org.apache.linkis.monitor.request.{EmsListAction, EntranceTaskAction, MonitorResourceAction} -import org.apache.linkis.monitor.response.EntranceTaskResult +import org.apache.linkis.monitor.request.{ + AnalyzeJobAction, + DataSourceParamsAction, + EmsListAction, + EntranceTaskAction, + KeyvalueAction, + KillJobAction, + MonitorAction +} +import org.apache.linkis.monitor.response.{ + AnalyzeJobResultAction, + EntranceTaskResult, + KeyvalueResult, + KillJobResultAction +} import org.apache.linkis.ujes.client.response.EmsListResult import java.io.Closeable @@ -30,7 +48,7 @@ import java.util.concurrent.TimeUnit abstract class MonitorHTTPClient extends Closeable { - protected[client] def executeJob(ujesJobAction: MonitorResourceAction): Result + protected[client] def executeJob(ujesJobAction: MonitorAction): Result def list(emsListAction: EmsListAction): EmsListResult = { executeJob(emsListAction).asInstanceOf[EmsListResult] @@ -40,6 +58,24 @@ abstract class MonitorHTTPClient extends Closeable { executeJob(entranceTaskAction).asInstanceOf[EntranceTaskResult] } + def getConfKeyValue(keyvalueAction: KeyvalueAction): KeyvalueResult = { + executeJob(keyvalueAction).asInstanceOf[KeyvalueResult] + } + + def getInfoByDataSourceInfo( + datasourceInfoAction: DataSourceParamsAction + ): GetInfoPublishedByDataSourceNameResult = { + executeJob(datasourceInfoAction).asInstanceOf[GetInfoPublishedByDataSourceNameResult] + } + + def killJob(killJobAction: KillJobAction): KillJobResultAction = { + executeJob(killJobAction).asInstanceOf[KillJobResultAction] + } + + def analyzeJob(analyzeJobAction: AnalyzeJobAction): AnalyzeJobResultAction = { + executeJob(analyzeJobAction).asInstanceOf[AnalyzeJobResultAction] + } + } object MonitorHTTPClient { diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala index 5554701571..8074aeeb62 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorHTTPClientClientImpl.scala @@ -21,14 +21,14 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient import org.apache.linkis.httpclient.dws.config.DWSClientConfig import org.apache.linkis.httpclient.request.Action import org.apache.linkis.httpclient.response.Result -import org.apache.linkis.monitor.request.MonitorResourceAction +import org.apache.linkis.monitor.request.MonitorAction class MonitorHTTPClientClientImpl(clientConfig: DWSClientConfig) extends MonitorHTTPClient { private val dwsHttpClient = new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread") - override protected[client] def executeJob(ujesJobAction: MonitorResourceAction): Result = + override protected[client] def executeJob(ujesJobAction: MonitorAction): Result = ujesJobAction match { case action: Action => dwsHttpClient.execute(action) diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala index d0660e1116..77a3226ba3 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClient.scala @@ -21,7 +21,7 @@ import org.apache.linkis.httpclient.authentication.AuthenticationStrategy import org.apache.linkis.httpclient.dws.authentication.StaticAuthenticationStrategy import org.apache.linkis.httpclient.dws.config.{DWSClientConfig, DWSClientConfigBuilder} import org.apache.linkis.httpclient.response.Result -import org.apache.linkis.monitor.request.{EmsListAction, MonitorResourceAction} +import org.apache.linkis.monitor.request.{EmsListAction, MonitorAction} import org.apache.linkis.ujes.client.response.EmsListResult import java.io.Closeable @@ -29,7 +29,7 @@ import java.util.concurrent.TimeUnit abstract class MonitorResourceClient extends Closeable { - protected[client] def executeJob(ujesJobAction: MonitorResourceAction): Result + protected[client] def executeJob(monitorAction: MonitorAction): Result def list(jobListAction: EmsListAction): EmsListResult = { executeJob(jobListAction).asInstanceOf[EmsListResult] diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala index 06cff3b46a..3112b2b63f 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/client/MonitorResourceClientImpl.scala @@ -21,15 +21,15 @@ import org.apache.linkis.httpclient.dws.DWSHttpClient import org.apache.linkis.httpclient.dws.config.DWSClientConfig import org.apache.linkis.httpclient.request.Action import org.apache.linkis.httpclient.response.Result -import org.apache.linkis.monitor.request.MonitorResourceAction +import org.apache.linkis.monitor.request.MonitorAction class MonitorResourceClientImpl(clientConfig: DWSClientConfig) extends MonitorResourceClient { private val dwsHttpClient = new DWSHttpClient(clientConfig, "Linkis-MonitorResource-Execution-Thread") - override protected[client] def executeJob(ujesJobAction: MonitorResourceAction): Result = - ujesJobAction match { + override protected[client] def executeJob(monitorAction: MonitorAction): Result = + monitorAction match { case action: Action => dwsHttpClient.execute(action) diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala index 9da7e35ebd..affa0ccb83 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/constants/Constants.scala @@ -79,6 +79,7 @@ object Constants { val BML_CLEAR_IM = "bml.clear.monitor.im." val THREAD_TIME_OUT_IM = "thread.monitor.timeout.im." val JOB_RESULT_IM = "jobhistory.result.monitor.im." + val DEPARTMENT_USER_IM = "department.user.sync.im." val BML_VERSION_MAX_NUM: CommonVars[Int] = CommonVars[Int]("linkis.monitor.bml.cleaner.version.max.num", 50) @@ -98,4 +99,18 @@ object Constants { val LINKIS_CLUSTER_NAME = CommonVars.properties.getProperty("linkis.cluster.name", "") + val ADMIN_USER = "hadoop" + + val SPLIT_DELIMITER = ";" + + val JDBC_ALERT_TIME = "linkis.jdbc.task.timeout.alert.time" + + val JDBC_ALERT_USER = "linkis.jdbc.task.timeout.alert.user" + + val JDBC_ALERT_LEVEL = "linkis.jdbc.task.timeout.alert.level" + + val JOB_DATASOURCE_CONF = "wds.linkis.engine.runtime.datasource" + + val JDBC_ENGINE = "jdbc" + } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala index eb503c52aa..3f81c66514 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/factory/MapperFactory.scala @@ -17,6 +17,7 @@ package org.apache.linkis.monitor.factory +import org.apache.linkis.monitor.department.dao.UserDepartmentInfoMapper import org.apache.linkis.monitor.instance.dao.{ InsLabelRelationDao, InstanceInfoDao, @@ -34,6 +35,8 @@ object MapperFactory { private var instanceLabelRelationMapper: InsLabelRelationDao = _ + private var userDepartmentInfoMapper: UserDepartmentInfoMapper = _ + def getJobHistoryMapper(): JobHistoryMapper = jobHistoryMapper def setJobHistoryMapper(jobHistoryMapper: JobHistoryMapper): Unit = { @@ -58,4 +61,12 @@ object MapperFactory { MapperFactory.instanceLabelRelationMapper = instanceLabelRelationMapper } + // 获取userDepartmentInfoMapper的值 + def getUserDepartmentInfoMapper: UserDepartmentInfoMapper = userDepartmentInfoMapper + + // 设置userDepartmentInfoMapper的值 + def setUserDepartmentInfoMapper(mapper: UserDepartmentInfoMapper): Unit = { + userDepartmentInfoMapper = mapper + } + } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala index 4f43d86d40..4f553847f6 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/JobHistoryDataFetcher.scala @@ -54,60 +54,46 @@ class JobHistoryDataFetcher(args: Array[Any], mapper: JobHistoryMapper) "Wrong input for JobHistoryDataFetcher. DataType: " + args.getClass.getCanonicalName ) } - if (args != null && args.length == 2) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - mapper - .search(null, null, null, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else if (args != null && args.length == 4) { - val start = Utils.tryCatch(args(0).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val end = Utils.tryCatch(args(1).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - val id = Utils.tryCatch(args(2).asInstanceOf[String].toLong) { t => - { - logger.error("Failed to get data from DB: Illegal arguments.", t) - throw t - } - } - if ( - StringUtils.isNotBlank(args(3).asInstanceOf[String]) && args(3) - .asInstanceOf[String] - .equals("updated_time") - ) { - val list = new util.ArrayList[String]() - Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) - mapper - .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] - } else { - var list = new util.ArrayList[String]() - Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) - if (args(3).asInstanceOf[String].equals("department")) { - list = null; - } - mapper - .searchByCache(id, null, list, new Date(start), new Date(end), null) - .asInstanceOf[util.List[scala.Any]] + if (args != null) { + val start = args(0).asInstanceOf[String].toLong + val end = args(1).asInstanceOf[String].toLong + // 根据参数数量进行不同的处理 + args.length match { + // 参数数量为2,则数据库查询仅筛选开始和结束时间 + case 2 => + mapper + .search(null, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 参数数量为4,根据第四个参数进行不同的查询 + case 4 => + val id = args(2).asInstanceOf[String].toLong + val parm = args(3).asInstanceOf[String] + parm match { + // 筛选任务包含id,时间,已完成状态任务 + case "finished_job" => + val list = new util.ArrayList[String]() + Constants.DATA_FINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCacheAndUpdateTime(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间,未完成状态任务 + case "unfinished_job" => + var list = new util.ArrayList[String]() + Constants.DATA_UNFINISHED_JOB_STATUS_ARRAY.foreach(list.add) + mapper + .searchByCache(id, null, list, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + // 筛选任务包含id,时间 + case _ => + mapper + .searchByCache(id, null, null, new Date(start), new Date(end), null) + .asInstanceOf[util.List[scala.Any]] + } + case _ => + throw new AnomalyScannerException( + 21304, + "Wrong input for JobHistoryDataFetcher. Data: " + args + ) } } else { throw new AnomalyScannerException( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala new file mode 100644 index 0000000000..2b17adb39b --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeAlertSender.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.{Event, Observer} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException +import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, PooledImsAlertUtils} + +import java.util + +import scala.collection.JavaConverters._ + +class JobHistoryAnalyzeAlertSender() extends Observer with Logging { + override def update(e: Event, jobHistroyList: scala.Any): Unit = {} + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala new file mode 100644 index 0000000000..675aacfc98 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class JobHistoryAnalyzeHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala new file mode 100644 index 0000000000..aec128ffab --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/analyze/JobHistoryAnalyzeRule.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.analyze + +import org.apache.linkis.common.utils.{Logging, Utils} +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils, ThreadUtils} +import org.apache.linkis.monitor.utils.job.JohistoryUtils + +import java.util + +class JobHistoryAnalyzeRule(hitObserver: Observer) + extends AbstractScanRule(event = new JobHistoryAnalyzeHitEvent, observer = hitObserver) + with Logging { + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + if (!getHitEvent.isRegistered) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scanedData <- JohistoryUtils.getJobhistorySanData(data)) { + scanedData match { + case jobHistory: JobHistory => + val jobStatus = jobHistory.getStatus.toUpperCase() + if (Constants.FINISHED_JOB_STATUS.contains(jobStatus) && jobStatus.equals("FAILED")) { + // 执行任务分析 + ThreadUtils.analyzeRun(jobHistory) + } + case _ => + } + } + true + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala index f788173e43..0367bfc05e 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/JobTimeExceedRule.scala @@ -18,6 +18,7 @@ package org.apache.linkis.monitor.jobhistory.jobtime import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.config.MonitorConfig import org.apache.linkis.monitor.constants.Constants import org.apache.linkis.monitor.core.ob.Observer import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} @@ -26,7 +27,7 @@ import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException import org.apache.linkis.monitor.until.CacheUtils import java.util -import java.util.Locale +import java.util.{Locale, Optional} import scala.collection.JavaConverters._ diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala new file mode 100644 index 0000000000..4e6e707335 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedAlertSender.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.{Event, Observer} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.jobhistory.exception.AnomalyScannerException +import org.apache.linkis.monitor.until.HttpsUntils +import org.apache.linkis.monitor.utils.alert.ims.{MonitorAlertUtils, PooledImsAlertUtils} + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils + +import java.util + +import scala.collection.JavaConverters.asScalaBufferConverter + +class StarrocksTimeExceedAlertSender extends Observer with Logging { + + /** + * Observer Pattern + */ + override def update(e: Event, jobHistroyList: scala.Any): Unit = { + if (!e.isInstanceOf[StarrocksTimeExceedHitEvent]) { + throw new AnomalyScannerException( + 21304, + "Wrong event that triggers JobHistoryErrorCodeAlertSender. Input DataType: " + e.getClass.getCanonicalName + ) + } + if (null == jobHistroyList || !jobHistroyList.isInstanceOf[util.List[_]]) { + throw new AnomalyScannerException( + 21304, + "Wrong input for JobHistoryErrorCodeAlertSender. Input DataType: " + jobHistroyList.getClass.getCanonicalName + ) + } + for (a <- jobHistroyList.asInstanceOf[util.List[_]].asScala) { + if (a == null) { + logger.warn("Ignore null input data") + } else if (!a.isInstanceOf[JobHistory]) { + logger.warn("Ignore wrong input data Type : " + a.getClass.getCanonicalName) + } else { + val jobHistory = a.asInstanceOf[JobHistory] + val timeValue = + HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, Constants.JDBC_ALERT_TIME) + val userValue = + HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, Constants.JDBC_ALERT_USER) + var levelValue = + HttpsUntils.getJDBCConf(jobHistory.getSubmitUser, Constants.JDBC_ALERT_LEVEL) + if (StringUtils.isNotBlank(timeValue) && StringUtils.isNotBlank(userValue)) { + val replaceParm: util.HashMap[String, String] = new util.HashMap[String, String] + replaceParm.put("$id", String.valueOf(jobHistory.getId)) + replaceParm.put("$timeoutTime", timeValue) + replaceParm.put("$alteruser", userValue) + replaceParm.put("$eccAlertUser", userValue) + replaceParm.put("$submitUser", jobHistory.getSubmitUser) + if (StringUtils.isBlank(levelValue)) { + levelValue = "3"; + } + replaceParm.put("$alterLevel", levelValue) + val alters = MonitorAlertUtils.getAlerts(Constants.USER_LABEL_MONITOR, replaceParm) + PooledImsAlertUtils.addAlert(alters.get("12020")) + } + } + } + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala new file mode 100644 index 0000000000..af5432cd9f --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedHitEvent.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.monitor.core.ob.SingleObserverEvent + +class StarrocksTimeExceedHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala new file mode 100644 index 0000000000..b616c5c02c --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeExceedRule.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} +import org.apache.linkis.server.BDPJettyServerHelper + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils + +import java.util +import java.util.{ArrayList, HashMap, List, Locale, Map} + +import scala.collection.JavaConverters._ + +class StarrocksTimeExceedRule(hitObserver: Observer) + extends AbstractScanRule(event = new StarrocksTimeExceedHitEvent, observer = hitObserver) + with Logging { + + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + + if (!getHitEvent().isRegistered || data == null) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + val alertData: util.List[JobHistory] = new util.ArrayList[JobHistory]() + for (scannedData <- data.asScala) { + if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; + for (jobHistory <- scannedData.getData().asScala) { + jobHistory match { + case job: JobHistory => + val status = job.getStatus.toUpperCase(Locale.getDefault) + val engineType = job.getEngineType.toUpperCase(Locale.getDefault) + if ( + Constants.UNFINISHED_JOB_STATUS + .contains(status) && engineType.equals( + Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault) + ) + ) { + // 获取job所使用的数据源类型 + val datasourceConfMap = getDatasourceConf(job) + logger.info("starock datasourceConfMap: {}", datasourceConfMap) + // 计算任务执行时间 + val elapse = System.currentTimeMillis() - job.getCreatedTime.getTime + // 获取告警配置 + val timeValue = + HttpsUntils.getJDBCConf(job.getSubmitUser, Constants.JDBC_ALERT_TIME) + logger.info("starock timeValue: {},elapse {}", timeValue, elapse) + if (StringUtils.isNotBlank(timeValue)) { + val timeoutInSeconds = timeValue.toDouble + val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong + if (elapse > timeoutInMillis) { + // 发送告警 + alertData.add(job) + } + } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedAlertScan", taskMinID) + } + case _ => + logger.warn( + "Ignored wrong input data Type : " + jobHistory + ", " + jobHistory.getClass.getCanonicalName + ) + } + } + } else { + logger.warn("Ignored null scanned data") + } + + } + logger.info("hit " + alertData.size() + " data in one iteration") + if (alertData.size() > 0) { + getHitEvent.notifyObserver(getHitEvent, alertData) + true + } else { + false + } + } + + private def getDatasourceConf(job: JobHistory): util.Map[_, _] = { + // 获取任务参数中datasourcename + val parmMap = + BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[java.util.Map[String, String]]) + val configurationMap = + MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, String]()) + val runtimeMap = + MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, String]()) + val datasourceName = MapUtils.getString(runtimeMap, Constants.JOB_DATASOURCE_CONF, "") + // 获取datasource信息 + if (StringUtils.isNotBlank(datasourceName)) { + HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName) + } else { + new util.HashMap[String, String]() + } + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala new file mode 100644 index 0000000000..18553f228e --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillAlertSender.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.core.ob.{Event, Observer} + +class StarrocksTimeKillAlertSender extends Observer with Logging { + + /** + * Observer Pattern + */ + override def update(e: Event, jobHistroyList: scala.Any): Unit = {} + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala new file mode 100644 index 0000000000..e5df6f3ff7 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/jobhistory/jobtime/StarrocksTimeKillRule.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.jobhistory.jobtime + +import org.apache.linkis.common.utils.Logging +import org.apache.linkis.monitor.constants.Constants +import org.apache.linkis.monitor.core.ob.Observer +import org.apache.linkis.monitor.core.pac.{AbstractScanRule, ScannedData} +import org.apache.linkis.monitor.jobhistory.entity.JobHistory +import org.apache.linkis.monitor.until.{CacheUtils, HttpsUntils} +import org.apache.linkis.server.BDPJettyServerHelper + +import org.apache.commons.collections.MapUtils +import org.apache.commons.lang3.StringUtils + +import java.util +import java.util.Locale + +import scala.collection.JavaConverters._ + +class StarrocksTimeKillRule(hitObserver: Observer) + extends AbstractScanRule(event = new StarrocksTimeKillHitEvent, observer = hitObserver) + with Logging { + + private val scanRuleList = CacheUtils.cacheBuilder + + /** + * if data match the pattern, return true and trigger observer should call isMatched() + * + * @param data + * @return + */ + override def triggerIfMatched(data: util.List[ScannedData]): Boolean = { + + if (!getHitEvent().isRegistered || data == null) { + logger.error("ScanRule is not bind with an observer. Will not be triggered") + return false + } + for (scannedData <- data.asScala) { + if (scannedData != null && scannedData.getData() != null) { + var taskMinID = 0L; + for (jobHistory <- scannedData.getData().asScala) { + jobHistory match { + case job: JobHistory => + val status = job.getStatus.toUpperCase(Locale.getDefault) + val engineType = job.getEngineType.toUpperCase(Locale.getDefault) + if ( + Constants.UNFINISHED_JOB_STATUS + .contains(status) && engineType.equals( + Constants.JDBC_ENGINE.toUpperCase(Locale.getDefault) + ) + ) { + // 计算任务执行时间 + val elapse = System.currentTimeMillis() - job.getCreatedTime.getTime + // 获取超时kill配置信息 + if (StringUtils.isNotBlank(job.getParams)) { + val connectParamsMap = MapUtils.getMap( + getDatasourceConf(job), + "connectParams", + new util.HashMap[AnyRef, AnyRef] + ) + val killTime = MapUtils.getString(connectParamsMap, "kill_task_time", "") + logger.info("starock killTime: {}", killTime) + if (StringUtils.isNotBlank(killTime) && elapse > killTime.toLong * 60 * 1000) { + if (StringUtils.isNotBlank(killTime)) { + val timeoutInSeconds = killTime.toDouble + val timeoutInMillis = (timeoutInSeconds * 60 * 1000).toLong + if (elapse > timeoutInMillis) { + // 触发kill任务 + HttpsUntils.killJob(job) + } + } + } + } + } + if (taskMinID == 0L || taskMinID > job.getId) { + taskMinID = job.getId + scanRuleList.put("jdbcUnfinishedKillScan", taskMinID) + } + case _ => + logger.warn( + "Ignored wrong input data Type : " + jobHistory + ", " + jobHistory.getClass.getCanonicalName + ) + } + } + } else { + logger.warn("Ignored null scanned data") + } + } + true + } + + private def getDatasourceConf(job: JobHistory): util.Map[_, _] = { + // 获取任务参数中datasourcename + val parmMap = + BDPJettyServerHelper.gson.fromJson(job.getParams, classOf[java.util.Map[String, String]]) + val configurationMap = + MapUtils.getMap(parmMap, "configuration", new util.HashMap[String, String]()) + val runtimeMap = + MapUtils.getMap(configurationMap, "runtime", new util.HashMap[String, String]()) + val datasourceName = MapUtils.getString(runtimeMap, Constants.JOB_DATASOURCE_CONF, "") + // 获取datasource信息 + if (StringUtils.isNotBlank(datasourceName)) { + HttpsUntils.getDatasourceConf(job.getSubmitUser, datasourceName) + } else { + new util.HashMap[String, String]() + } + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala new file mode 100644 index 0000000000..333c7ff1c2 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/AnalyzeJobAction.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.request + +import org.apache.linkis.httpclient.request.GetAction + +import org.apache.commons.lang3.StringUtils + +class AnalyzeJobAction extends GetAction with MonitorAction { + + override def suffixURLs: Array[String] = Array("jobhistory", "diagnosis-query") + +} + +object AnalyzeJobAction { + + def newBuilder(): Builder = new Builder + + class Builder private[AnalyzeJobAction] () { + + private var taskID: String = _ + private var user: String = _ + + def setTaskID(taskID: String): Builder = { + this.taskID = taskID + this + } + + def setUser(user: String): Builder = { + this.user = user + this + } + + def build(): AnalyzeJobAction = { + val analyzeJobAction = new AnalyzeJobAction + if (StringUtils.isNotBlank(taskID)) analyzeJobAction.setParameter("taskID", taskID) + if (StringUtils.isNotBlank(user)) analyzeJobAction.setUser(user) + analyzeJobAction + } + + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala new file mode 100644 index 0000000000..11b0167aef --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/DataSourceParamsAction.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.request + +import org.apache.linkis.datasource.client.config.DatasourceClientConfig.DATA_SOURCE_SERVICE_MODULE +import org.apache.linkis.datasource.client.errorcode.DatasourceClientErrorCodeSummary +import org.apache.linkis.datasource.client.exception.DataSourceClientBuilderException +import org.apache.linkis.httpclient.request.GetAction +import org.apache.linkis.monitor.request.KeyvalueAction.Builder + +import org.apache.commons.lang3.StringUtils + +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.text.MessageFormat + +class DataSourceParamsAction extends GetAction with MonitorAction { + + private var dataSourceName: String = _ + + override def suffixURLs: Array[String] = + Array(DATA_SOURCE_SERVICE_MODULE.getValue, "publishedInfo", "name", dataSourceName) + + private var user: String = _ + + override def setUser(user: String): Unit = this.user = user + + override def getUser: String = this.user +} + +object DataSourceParamsAction { + def builder(): Builder = new Builder + + class Builder private[DataSourceParamsAction] () { + private var dataSourceName: String = _ + private var system: String = _ + private var user: String = _ + + def setUser(user: String): Builder = { + this.user = user + this + } + + def setDataSourceName(dataSourceName: String): Builder = { + this.dataSourceName = dataSourceName + this + } + + def setSystem(system: String): Builder = { + this.system = system + this + } + + def build(): DataSourceParamsAction = { + if (dataSourceName == null) { + throw new DataSourceClientBuilderException( + DatasourceClientErrorCodeSummary.DATASOURCENAME_NEEDED.getErrorDesc + ) + } + if (system == null) + throw new DataSourceClientBuilderException( + DatasourceClientErrorCodeSummary.SYSTEM_NEEDED.getErrorDesc + ) + if (user == null) + throw new DataSourceClientBuilderException( + DatasourceClientErrorCodeSummary.USER_NEEDED.getErrorDesc + ) + val dataSourceParamsAction = new DataSourceParamsAction + dataSourceParamsAction.dataSourceName = this.dataSourceName + dataSourceParamsAction.setParameter("system", system) + dataSourceParamsAction.setUser(user) + dataSourceParamsAction + } + + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala index 6f3158e869..3ac1719de5 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EmsListAction.scala @@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils import scala.collection.mutable.ArrayBuffer -class EmsListAction extends GetAction with MonitorResourceAction { +class EmsListAction extends GetAction with MonitorAction { override def suffixURLs: Array[String] = Array("linkisManager", "listAllEMs") diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala index f3175d802f..4e60b59a30 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/EntranceTaskAction.scala @@ -21,7 +21,7 @@ import org.apache.linkis.httpclient.request.GetAction import org.apache.commons.lang3.StringUtils -class EntranceTaskAction extends GetAction with MonitorResourceAction { +class EntranceTaskAction extends GetAction with MonitorAction { override def suffixURLs: Array[String] = Array("entrance/operation/metrics", "taskinfo") } diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala new file mode 100644 index 0000000000..654de3ed28 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KeyvalueAction.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.request + +import org.apache.linkis.httpclient.request.GetAction +import org.apache.linkis.ujes.client.request.UJESJobAction + +import org.apache.commons.lang3.StringUtils + +class KeyvalueAction extends GetAction with MonitorAction { + + override def suffixURLs: Array[String] = Array("configuration", "keyvalue") + +} + +object KeyvalueAction { + + def newBuilder(): Builder = new Builder + + class Builder private[KeyvalueAction] () { + + private var engineType: String = _ + private var version: String = _ + private var creator: String = _ + private var configKey: String = _ + private var user: String = _ + + def setEngineType(engineType: String): Builder = { + this.engineType = engineType + this + } + + def setVersion(version: String): Builder = { + this.version = version + this + } + + def setCreator(creator: String): Builder = { + this.creator = creator + this + } + + def setConfigKey(configKey: String): Builder = { + this.configKey = configKey + this + } + + def setUser(user: String): Builder = { + this.user = user + this + } + + def build(): KeyvalueAction = { + val keyvalueAction = new KeyvalueAction + if (StringUtils.isNotBlank(engineType)) keyvalueAction.setParameter("engineType", engineType) + if (StringUtils.isNotBlank(version)) keyvalueAction.setParameter("version", version) + if (StringUtils.isNotBlank(creator)) keyvalueAction.setParameter("creator", creator) + if (StringUtils.isNotBlank(configKey)) keyvalueAction.setParameter("configKey", configKey) + if (StringUtils.isNotBlank(user)) keyvalueAction.setUser(user) + keyvalueAction + } + + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala new file mode 100644 index 0000000000..4b23626efb --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/KillJobAction.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.request + +import org.apache.linkis.httpclient.dws.DWSHttpClient +import org.apache.linkis.httpclient.request.POSTAction + +import java.util + +class KillJobAction extends POSTAction with MonitorAction { + + private var execID: String = _ + + override def suffixURLs: Array[String] = Array("entrance", execID, "killJobs") + + override def getRequestPayload: String = + DWSHttpClient.jacksonJson.writeValueAsString(getRequestPayloads) + +} + +object KillJobAction { + + def builder(): Builder = new Builder + + class Builder private[KillJobAction] () { + private var user: String = _ + + private var idList: util.List[String] = _ + + private var taskIDList: util.List[Long] = _ + + private var execID: String = _ + + def setIdList(idList: util.List[String]): Builder = { + this.idList = idList + this + } + + def setTaskIDList(taskIDList: util.List[Long]): Builder = { + this.taskIDList = taskIDList + this + } + + def setExecID(execID: String): Builder = { + this.execID = execID + this + } + + def setUser(user: String): Builder = { + this.user = user + this + } + + def build(): KillJobAction = { + val killJobAction = new KillJobAction + killJobAction.setUser(user) + killJobAction.addRequestPayload("idList", idList) + killJobAction.addRequestPayload("taskIDList", taskIDList) + killJobAction.execID = execID + killJobAction + } + + } + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala index 7ea2001481..8c203303a0 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/MonitorResourceAction.scala @@ -18,5 +18,6 @@ package org.apache.linkis.monitor.request import org.apache.linkis.httpclient.dws.request.DWSHttpAction +import org.apache.linkis.ujes.client.request.UserAction -trait MonitorResourceAction extends DWSHttpAction with UserAction +trait MonitorAction extends DWSHttpAction with UserAction diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala index 4733a1b45f..7cdabf33a8 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/request/UserAction.scala @@ -15,12 +15,8 @@ * limitations under the License. */ -package org.apache.linkis.monitor.request +package org.apache.linkis.monitor.jobhistory.jobtime -trait UserAction extends org.apache.linkis.httpclient.request.UserAction { - private var user: String = _ +import org.apache.linkis.monitor.core.ob.SingleObserverEvent - override def setUser(user: String): Unit = this.user = user - - override def getUser: String = user -} +class StarrocksTimeKillHitEvent extends SingleObserverEvent diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala new file mode 100644 index 0000000000..3c7b0c03c3 --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/AnalyzeJobResultAction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.response + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult +import org.apache.linkis.httpclient.dws.response.DWSResult + +import java.util + +import scala.beans.BeanProperty + +@DWSHttpMessageResult("/api/rest_j/v\\d+/jobhistory/diagnosis-query") +class AnalyzeJobResultAction extends DWSResult { + + @BeanProperty + var messages: util.ArrayList[util.Map[String, AnyRef]] = _ + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala new file mode 100644 index 0000000000..34289b188e --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KeyvalueResult.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.response + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult +import org.apache.linkis.httpclient.dws.response.DWSResult + +import java.util + +import scala.beans.BeanProperty + +@DWSHttpMessageResult("/api/rest_j/v\\d+/configuration/keyvalue") +class KeyvalueResult extends DWSResult { + + @BeanProperty + var configValues: util.ArrayList[util.Map[String, AnyRef]] = _ + + @BeanProperty + var totalPage: Int = _ + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala new file mode 100644 index 0000000000..8d8392d94f --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/response/KillJobResultAction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.response + +import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult +import org.apache.linkis.httpclient.dws.response.DWSResult + +import java.util + +import scala.beans.BeanProperty + +@DWSHttpMessageResult("/api/rest_j/v\\d+/entrance/(\\S+)/killJobs") +class KillJobResultAction extends DWSResult { + + @BeanProperty + var messages: util.ArrayList[util.Map[String, AnyRef]] = _ + +} diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala index 798ed62c95..a0e94abc04 100644 --- a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/alert/ims/MonitorAlertUtils.scala @@ -120,7 +120,10 @@ object MonitorAlertUtils extends Logging { val eccReceivers = { val set: util.Set[String] = new util.HashSet[String] - if (StringUtils.isNotBlank(data.eccReceivers)) { + if ( + StringUtils + .isNotBlank(data.eccReceivers) && (!data.eccReceivers.equals("$eccAlertUser")) + ) { data.eccReceivers.split(",").map(r => set.add(r)) } if (!repaceParams.containsKey("$eccAlertUser")) { @@ -130,10 +133,7 @@ object MonitorAlertUtils extends Logging { } }) } else { - set.add(repaceParams.get("$eccAlertUser")) - } - if (StringUtils.isNotBlank(repaceParams.get("eccReceiver"))) { - repaceParams.get("eccReceiver").split(",").map(r => set.add(r)) + repaceParams.get("$eccAlertUser").split(",").map(r => set.add(r)) } set } @@ -141,14 +141,25 @@ object MonitorAlertUtils extends Logging { val subSystemId = repaceParams.getOrDefault("subSystemId", Constants.ALERT_SUB_SYSTEM_ID) val alertTitle = "集群[" + Constants.LINKIS_CLUSTER_NAME + "]" + repaceParams .getOrDefault("title", data.alertTitle) - val alertLevel = - if (StringUtils.isNotBlank(data.alertLevel) && StringUtils.isNumeric(data.alertLevel)) { - ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel", data.alertLevel)) - } else { + val alertLevel = { + if ( + repaceParams.containsKey("$alterLevel") && StringUtils.isNumeric( + repaceParams.get("$alterLevel") + ) + ) { ImsAlertLevel.withName( - repaceParams.getOrDefault("monitorLevel", ImsAlertLevel.WARN.toString) + repaceParams.getOrDefault("monitorLevel", repaceParams.get("$alterLevel")) ) + } else { + if (StringUtils.isNotBlank(data.alertLevel) && StringUtils.isNumeric(data.alertLevel)) { + ImsAlertLevel.withName(repaceParams.getOrDefault("monitorLevel", data.alertLevel)) + } else { + ImsAlertLevel.withName( + repaceParams.getOrDefault("monitorLevel", ImsAlertLevel.WARN.toString) + ) + } } + } val alertDesc = Utils.tryAndWarn( ImsAlertDesc( diff --git a/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala new file mode 100644 index 0000000000..deedf5847b --- /dev/null +++ b/linkis-extensions/linkis-et-monitor/src/main/scala/org/apache/linkis/monitor/utils/job/JohistoryUtils.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.monitor.utils.job + +import org.apache.linkis.monitor.core.pac.ScannedData + +import java.util + +import scala.collection.JavaConverters._ + +object JohistoryUtils { + + def getJobhistorySanData(data: util.List[ScannedData]): List[Any] = { + if (data == null) { + return List.empty[ScannedData] + } + val scalaData = data.asScala + val result = scalaData.flatMap { dataList => + if (dataList != null && dataList.getData() != null) { + dataList.getData().asScala + } else { + List.empty[ScannedData] + } + }.toList + result + } + +}