Skip to content

Commit f6399f4

Browse files
Fix the bug that the lower DolphinScheduler versions will block the thread until the yarn application is completed.
1 parent f63a15b commit f6399f4

File tree

3 files changed

+34
-15
lines changed
  • dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion
  • dss-commons/dss-common/src/main/java/com/webank/wedatasphere/dss/common/conf
  • plugins/dolphinscheduler/dss-dolphinscheduler-client/src/main/scala/com/webank/wedatasphere/dss/plugins/dolphinscheduler/linkis/client

3 files changed

+34
-15
lines changed

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/NodeConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public DolphinSchedulerTask convertNode(DolphinSchedulerConvertedRel dolphinSche
7373
addObjectLine.accept("JOB_SOURCE", sourceMap);
7474
addLine.accept("CONTEXT_ID", workflow.getContextID());
7575
addLine.accept("LINKIS_GATEWAY_URL", Configuration.getGateWayURL());
76+
addLine.accept("DS_VERSION", DolphinSchedulerConf.DS_VERSION.getValue());
7677
//todo
7778
addLine.accept("RUN_DATE", "${global_run_date}");
7879
addObjectLine.accept("JOB_LABELS", new EnvDSSLabel(SchedulerConf.JOB_LABEL.getValue()).getValue());

dss-commons/dss-common/src/main/java/com/webank/wedatasphere/dss/common/conf/DSSCommonConf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ public class DSSCommonConf {
2424

2525
public static final CommonVars<String> DSS_EXPORT_URL = CommonVars.apply("wds.dss.server.export.url", "/appcom/tmp/dss");
2626

27-
public static final CommonVars<String> DSS_ORCHESTRATOR_FRAMEWORK_APP_CONN_NAME = CommonVars.apply("wds.dss.orchestrator.framework.appconn", "orchestrator-framework");
28-
2927
public static final CommonVars<Integer> DSS_DOMAIN_LEVEL = CommonVars.apply("wds.linkis.gateway.domain.level", 3);
3028

31-
// public static final CommonVars<String> DSS_TOKEN_TICKET_KEY = CommonVars.apply("wds.dss.user.ticket.key", "bdp-user-ticket-id");
29+
/**
30+
* The old value is bdp-user-ticket-id
31+
*/
3232
public static final CommonVars<String> DSS_TOKEN_TICKET_KEY = CommonVars.apply("wds.dss.user.ticket.key", "linkis_user_session_ticket_id_v1");
3333

3434
}

plugins/dolphinscheduler/dss-dolphinscheduler-client/src/main/scala/com/webank/wedatasphere/dss/plugins/dolphinscheduler/linkis/client/DSSDolphinSchedulerClient.scala

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,39 @@ import org.apache.linkis.common.conf.CommonVars
1414
import org.apache.linkis.common.utils.{Logging, Utils}
1515

1616
object DSSDolphinSchedulerClient extends Logging {
17-
val DEFAULT_PROPERTY_FILE_NAME = "linkis.properties"
18-
val DEFAULT_CONFIG_DIR = "conf"
19-
val CHARSET_NAME = "utf-8"
20-
21-
val logObj: LinkisJobExecutionLog = new LinkisJobExecutionLog {
22-
override def info(message: Object, t: Throwable): Unit =
23-
if(message != null) logger.info(message.toString, t) else if(t != null) logger.info(null, t)
24-
override def warn(message: Object, t: Throwable): Unit =
25-
if(message != null) logger.warn(message.toString, t) else if(t != null) logger.warn(null, t)
26-
override def error(message: Object, t: Throwable): Unit =
27-
if(message != null) logger.error(message.toString, t) else if(t != null) logger.error(null, t)
28-
}
2917

3018
def main(args: Array[String]): Unit = {
3119

20+
val dsVersion = CommonVars("DS_VERSION", "").getValue
21+
info(s"The DolphinScheduler Version => $dsVersion.")
22+
23+
val logObj: LinkisJobExecutionLog = if(StringUtils.isNotBlank(dsVersion) && dsVersion.startsWith("2.")) {
24+
new LinkisJobExecutionLog {
25+
override def info(message: Object, t: Throwable): Unit =
26+
if(message != null) logger.info(message.toString, t) else if(t != null) logger.info(null, t)
27+
override def warn(message: Object, t: Throwable): Unit =
28+
if(message != null) logger.warn(message.toString, t) else if(t != null) logger.warn(null, t)
29+
override def error(message: Object, t: Throwable): Unit =
30+
if(message != null) logger.error(message.toString, t) else if(t != null) logger.error(null, t)
31+
}
32+
} else {
33+
warn("For the lower DolphinScheduler versions(< 2.0), we will parse the yarn applicationId to application__\\d+_\\d+.")
34+
warn("Why? Because the lower DolphinScheduler versions will block the thread until the yarn application is completed.")
35+
// For more information, please visit the class AbstractCommandExecutor in DolphinScheduler (line 222 ~ 231).
36+
new LinkisJobExecutionLog {
37+
private implicit def parseYarnApplication(message: Object): String = message match {
38+
case str: String if str.contains(" application_") =>
39+
str.replaceAll(" application_", " application__")
40+
case str: String => str
41+
case obj if obj != null => obj.toString
42+
case _ => ""
43+
}
44+
override def info(message: Object, t: Throwable): Unit = logger.info(message, t)
45+
override def warn(message: Object, t: Throwable): Unit = logger.warn(message, t)
46+
override def error(message: Object, t: Throwable): Unit = logger.error(message, t)
47+
}
48+
}
49+
3250
val jobProps = new util.HashMap[String, String]
3351
val getAndSet = (fromKey: String, toKey: String) => {
3452
val value = CommonVars(fromKey, "").getValue

0 commit comments

Comments
 (0)