Skip to content

Commit 3e30600

Browse files
authored
Merge pull request #998 from WeDataSphere/master
Fix some bug abut intergrating DolphinScheduler.
2 parents d002a92 + 20b8fed commit 3e30600

File tree

5 files changed

+25
-132
lines changed

5 files changed

+25
-132
lines changed

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55
import com.webank.wedatasphere.dss.appconn.dolphinscheduler.entity.DolphinSchedulerWorkflow;
66
import com.webank.wedatasphere.dss.common.entity.node.DSSNode;
77
import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException;
8+
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
89
import com.webank.wedatasphere.dss.workflow.conversion.entity.ConvertedRel;
910
import com.webank.wedatasphere.dss.workflow.conversion.entity.PreConversionRel;
1011
import com.webank.wedatasphere.dss.workflow.conversion.operation.WorkflowToRelConverter;
1112
import com.webank.wedatasphere.dss.workflow.core.entity.Workflow;
1213
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode;
1314
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNodeEdge;
14-
import org.apache.commons.lang3.StringUtils;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
1717
import org.springframework.beans.BeanUtils;
@@ -31,6 +31,12 @@ public class WorkflowToDolphinSchedulerRelConverter implements WorkflowToRelConv
3131
@Override
3232
public ConvertedRel convertToRel(PreConversionRel rel) {
3333
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)rel;
34+
List<WorkflowNode> workflowNodes = dolphinSchedulerConvertedRel.getWorkflow().getWorkflowNodes();
35+
for (WorkflowNode workflowNode : workflowNodes) {
36+
if ("workflow.subflow".equals(workflowNode.getNodeType())) {
37+
throw new ExternalOperationFailedException(90021, workflowNode.getName() + "当前不支持将subFlow节点发布到DolphinScheduler!");
38+
}
39+
}
3440
Workflow dolphinSchedulerWorkflow = convertWorkflow(dolphinSchedulerConvertedRel);
3541
dolphinSchedulerConvertedRel.setWorkflow(dolphinSchedulerWorkflow);
3642
return dolphinSchedulerConvertedRel;
@@ -59,14 +65,20 @@ private DolphinSchedulerWorkflow convertWorkflow(DolphinSchedulerConvertedRel do
5965
put("direct", "IN");
6066
}});
6167
Map<String, DolphinSchedulerWorkflow.LocationInfo> locations = new HashMap<>();
68+
Map<String, String> nameToIdMap = workflow.getWorkflowNodes().stream()
69+
.collect(Collectors.toMap(WorkflowNode::getName, WorkflowNode::getId));
6270
for (WorkflowNode workflowNode : workflow.getWorkflowNodes()) {
6371
DSSNode node = workflowNode.getDSSNode();
72+
if (node.getLayout() == null) {
73+
continue;
74+
}
6475
DolphinSchedulerTask dolphinSchedulerTask = nodeConverter.convertNode(dolphinSchedulerConvertedRel, node);
6576
processDefinitionJson.addTask(dolphinSchedulerTask);
6677

6778
DolphinSchedulerWorkflow.LocationInfo locationInfo = new DolphinSchedulerWorkflow.LocationInfo();
6879
locationInfo.setName(node.getName());
69-
locationInfo.setTargetarr(StringUtils.join(node.getDependencys(), ","));
80+
String targetArr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(","));
81+
locationInfo.setTargetarr(targetArr);
7082
locationInfo.setX((int)node.getLayout().getX());
7183
locationInfo.setY((int)node.getLayout().getY());
7284
locations.put(node.getId(), locationInfo);

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void syncToRel(ConvertedRel convertedRel) {
3636
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)convertedRel;
3737
OrchestrationToRelConversionRequestRef requestRef = dolphinSchedulerConvertedRel.getDSSToRelConversionRequestRef();
3838
Workflow workflow = dolphinSchedulerConvertedRel.getWorkflow();
39-
checkSchedulerProject(workflow);
39+
// checkSchedulerProject(workflow);
4040
Long dolphinSchedulerWorkflowId = requestRef.getRefOrchestrationId();
4141
DolphinSchedulerAppConn appConn = (DolphinSchedulerAppConn) dssToRelConversionOperation.getConversionService().getAppStandard().getAppConn();
4242
OrchestrationUpdateOperation updateOperation = appConn.getOrCreateStructureStandard().getOrchestrationService(dssToRelConversionOperation.getConversionService().getAppInstance())
@@ -47,14 +47,14 @@ public void syncToRel(ConvertedRel convertedRel) {
4747
updateOperation.updateOrchestration(ref);
4848
}
4949

50-
private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
51-
List<WorkflowNode> nodes = flow.getWorkflowNodes();
52-
for (WorkflowNode node : nodes) {
53-
DSSNode dssNode = node.getDSSNode();
54-
if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
55-
throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
56-
}
57-
}
58-
}
50+
// private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
51+
// List<WorkflowNode> nodes = flow.getWorkflowNodes();
52+
// for (WorkflowNode node : nodes) {
53+
// DSSNode dssNode = node.getDSSNode();
54+
// if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
55+
// throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
56+
// }
57+
// }
58+
// }
5959

6060
}

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/scala/com/webank/wedatasphere/dss/appconn/schedulis/conf/SchedulisConf.scala

Lines changed: 0 additions & 32 deletions
This file was deleted.

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/scala/com/webank/wedatasphere/dss/appconn/schedulis/http/SchedulisHttpAction.scala

Lines changed: 0 additions & 87 deletions
This file was deleted.

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Job build() throws Exception {
5555
LinkisJob job = null;
5656
String jobType = getJobType();
5757
String[] jobTypeSplit = jobType.split("\\.");
58-
if (jobTypeSplit.length < 3) {
58+
if (jobTypeSplit.length < 2) {
5959
throw new LinkisJobExecutionErrorException(90100, "This is not Linkis job type, this jobtype is " + jobType);
6060
}
6161
String engineType = jobTypeSplit[1];

0 commit comments

Comments
 (0)