Skip to content

Commit 5eadbba

Browse files
Merge remote-tracking branch 'origin/master'
2 parents f124e1f + 20b8fed commit 5eadbba

File tree

3 files changed

+23
-13
lines changed

3 files changed

+23
-13
lines changed

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerRelConverter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.webank.wedatasphere.dss.appconn.dolphinscheduler.entity.DolphinSchedulerWorkflow;
66
import com.webank.wedatasphere.dss.common.entity.node.DSSNode;
77
import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException;
8+
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
89
import com.webank.wedatasphere.dss.workflow.conversion.entity.ConvertedRel;
910
import com.webank.wedatasphere.dss.workflow.conversion.entity.PreConversionRel;
1011
import com.webank.wedatasphere.dss.workflow.conversion.operation.WorkflowToRelConverter;
@@ -30,6 +31,12 @@ public class WorkflowToDolphinSchedulerRelConverter implements WorkflowToRelConv
3031
@Override
3132
public ConvertedRel convertToRel(PreConversionRel rel) {
3233
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)rel;
34+
List<WorkflowNode> workflowNodes = dolphinSchedulerConvertedRel.getWorkflow().getWorkflowNodes();
35+
for (WorkflowNode workflowNode : workflowNodes) {
36+
if ("workflow.subflow".equals(workflowNode.getNodeType())) {
37+
throw new ExternalOperationFailedException(90021, workflowNode.getName() + "当前不支持将subFlow节点发布到DolphinScheduler!");
38+
}
39+
}
3340
Workflow dolphinSchedulerWorkflow = convertWorkflow(dolphinSchedulerConvertedRel);
3441
dolphinSchedulerConvertedRel.setWorkflow(dolphinSchedulerWorkflow);
3542
return dolphinSchedulerConvertedRel;
@@ -62,13 +69,16 @@ private DolphinSchedulerWorkflow convertWorkflow(DolphinSchedulerConvertedRel do
6269
.collect(Collectors.toMap(WorkflowNode::getName, WorkflowNode::getId));
6370
for (WorkflowNode workflowNode : workflow.getWorkflowNodes()) {
6471
DSSNode node = workflowNode.getDSSNode();
72+
if (node.getLayout() == null) {
73+
continue;
74+
}
6575
DolphinSchedulerTask dolphinSchedulerTask = nodeConverter.convertNode(dolphinSchedulerConvertedRel, node);
6676
processDefinitionJson.addTask(dolphinSchedulerTask);
6777

6878
DolphinSchedulerWorkflow.LocationInfo locationInfo = new DolphinSchedulerWorkflow.LocationInfo();
6979
locationInfo.setName(node.getName());
70-
String targetarr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(","));
71-
locationInfo.setTargetarr(targetarr);
80+
String targetArr = node.getDependencys().stream().map(nameToIdMap::get).collect(Collectors.joining(","));
81+
locationInfo.setTargetarr(targetArr);
7282
locationInfo.setX((int)node.getLayout().getX());
7383
locationInfo.setY((int)node.getLayout().getY());
7484
locations.put(node.getId(), locationInfo);

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/dolphinscheduler/conversion/WorkflowToDolphinSchedulerSynchronizer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void syncToRel(ConvertedRel convertedRel) {
3636
DolphinSchedulerConvertedRel dolphinSchedulerConvertedRel = (DolphinSchedulerConvertedRel)convertedRel;
3737
OrchestrationToRelConversionRequestRef requestRef = dolphinSchedulerConvertedRel.getDSSToRelConversionRequestRef();
3838
Workflow workflow = dolphinSchedulerConvertedRel.getWorkflow();
39-
checkSchedulerProject(workflow);
39+
// checkSchedulerProject(workflow);
4040
Long dolphinSchedulerWorkflowId = requestRef.getRefOrchestrationId();
4141
DolphinSchedulerAppConn appConn = (DolphinSchedulerAppConn) dssToRelConversionOperation.getConversionService().getAppStandard().getAppConn();
4242
OrchestrationUpdateOperation updateOperation = appConn.getOrCreateStructureStandard().getOrchestrationService(dssToRelConversionOperation.getConversionService().getAppInstance())
@@ -47,14 +47,14 @@ public void syncToRel(ConvertedRel convertedRel) {
4747
updateOperation.updateOrchestration(ref);
4848
}
4949

50-
private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
51-
List<WorkflowNode> nodes = flow.getWorkflowNodes();
52-
for (WorkflowNode node : nodes) {
53-
DSSNode dssNode = node.getDSSNode();
54-
if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
55-
throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
56-
}
57-
}
58-
}
50+
// private void checkSchedulerProject(Workflow flow) throws ExternalOperationFailedException {
51+
// List<WorkflowNode> nodes = flow.getWorkflowNodes();
52+
// for (WorkflowNode node : nodes) {
53+
// DSSNode dssNode = node.getDSSNode();
54+
// if (CollectionUtils.isEmpty(dssNode.getResources()) && dssNode.getJobContent().isEmpty()) {
55+
// throw new ExternalOperationFailedException(90021, dssNode.getName() + "节点内容不能为空");
56+
// }
57+
// }
58+
// }
5959

6060
}

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/job/Builder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public Job build() throws Exception {
5555
LinkisJob job = null;
5656
String jobType = getJobType();
5757
String[] jobTypeSplit = jobType.split("\\.");
58-
if (jobTypeSplit.length < 3) {
58+
if (jobTypeSplit.length < 2) {
5959
throw new LinkisJobExecutionErrorException(90100, "This is not Linkis job type, this jobtype is " + jobType);
6060
}
6161
String engineType = jobTypeSplit[1];

0 commit comments

Comments
 (0)