Skip to content

Commit f08cbfa

Browse files
committed
Adapt to create workflow when have not scheduler.
1 parent 9363042 commit f08cbfa

File tree

3 files changed

+32
-17
lines changed

3 files changed

+32
-17
lines changed

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.webank.wedatasphere.dss.orchestrator.server.service.impl;
1818

19+
import com.webank.wedatasphere.dss.appconn.core.AppConn;
1920
import com.webank.wedatasphere.dss.appconn.scheduler.SchedulerAppConn;
2021
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation;
2122
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationDeletionOperation;
@@ -41,6 +42,7 @@
4142
import com.webank.wedatasphere.dss.orchestrator.core.type.DSSOrchestratorRelationManager;
4243
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
4344
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
45+
import com.webank.wedatasphere.dss.orchestrator.loader.LinkedAppConnResolver;
4446
import com.webank.wedatasphere.dss.orchestrator.loader.OrchestratorManager;
4547
import com.webank.wedatasphere.dss.orchestrator.server.entity.request.OrchestratorCreateRequest;
4648
import com.webank.wedatasphere.dss.orchestrator.server.entity.request.OrchestratorDeleteRequest;
@@ -64,6 +66,7 @@
6466
import java.util.Collections;
6567
import java.util.Date;
6668
import java.util.List;
69+
import java.util.Optional;
6770
import java.util.function.BiFunction;
6871
import java.util.function.Function;
6972
import java.util.stream.Collectors;
@@ -79,7 +82,8 @@ public class OrchestratorFrameworkServiceImpl implements OrchestratorFrameworkSe
7982
private OrchestratorService newOrchestratorService;
8083
@Autowired
8184
private OrchestratorManager orchestratorManager;
82-
85+
@Autowired
86+
private LinkedAppConnResolver linkedAppConnResolver;
8387
/**
8488
* 1.拿到的dss orchestrator的appconn
8589
* 2.然后创建
@@ -145,7 +149,7 @@ private <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestratio
145149
Long refProjectId, refOrchestrationId;
146150
if (askProjectSender) {
147151
ProjectRefIdResponse projectRefIdResponse = (ProjectRefIdResponse) DSSSenderServiceFactory.getOrCreateServiceInstance().getProjectServerSender()
148-
.ask(new ProjectRefIdRequest(orchestrationPair.getValue().getId(), dssOrchestrator.getProjectId()));
152+
.ask(new ProjectRefIdRequest(Optional.ofNullable(orchestrationPair).map(ImmutablePair::getValue).map(AppInstance::getId).orElse(null), dssOrchestrator.getProjectId()));
149153
refProjectId = projectRefIdResponse.getRefProjectId();
150154
refOrchestrationId = null;
151155
} else {
@@ -244,11 +248,16 @@ protected ImmutablePair<OrchestrationService, AppInstance> getOrchestrationServi
244248
dssOrchestratorInfo.setLinkedAppConnNames(dssOrchestrator.getLinkedAppConn().stream().map(appConn -> appConn.getAppDesc().getAppName()).collect(Collectors.toList()));
245249
}
246250
SchedulerAppConn appConn = dssOrchestrator.getSchedulerAppConn();
247-
if (appConn == null) {
248-
throw new ExternalOperationWarnException(50322, "DSSOrchestrator " + dssOrchestrator.getName() + " has no SchedulerAppConn.");
251+
// if (appConn == null) {
252+
// throw new ExternalOperationWarnException(50322, "DSSOrchestrator " + dssOrchestrator.getName() + " has no SchedulerAppConn.");
253+
// }
254+
if (appConn != null) {
255+
AppInstance appInstance = appConn.getAppDesc().getAppInstances().get(0);
256+
return new ImmutablePair<>(appConn.getOrCreateStructureStandard().getOrchestrationService(appInstance), appInstance);
257+
} else {
258+
return new ImmutablePair<>(null, null);
249259
}
250-
AppInstance appInstance = appConn.getAppDesc().getAppInstances().get(0);
251-
return new ImmutablePair<>(appConn.getOrCreateStructureStandard().getOrchestrationService(appInstance), appInstance);
260+
252261
}
253262

254263
/**

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.webank.wedatasphere.dss.orchestrator.server.service.impl;
1818

19+
import com.webank.wedatasphere.dss.appconn.core.AppConn;
1920
import com.webank.wedatasphere.dss.common.constant.project.ProjectUserPrivEnum;
2021
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
2122
import com.webank.wedatasphere.dss.common.label.DSSLabel;
@@ -51,6 +52,7 @@
5152
import com.webank.wedatasphere.dss.standard.app.development.service.RefQueryService;
5253
import com.webank.wedatasphere.dss.standard.app.development.standard.DevelopmentIntegrationStandard;
5354
import com.webank.wedatasphere.dss.standard.app.sso.Workspace;
55+
import com.webank.wedatasphere.dss.standard.common.desc.AppDesc;
5456
import com.webank.wedatasphere.dss.standard.common.desc.AppInstance;
5557
import com.webank.wedatasphere.dss.standard.common.entity.ref.ResponseRef;
5658
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationWarnException;
@@ -110,7 +112,9 @@ public OrchestratorVo createOrchestrator(String userName,
110112
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
111113
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
112114
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
113-
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, dssOrchestrator.getSchedulerAppConn().getAppDesc().getAppName()).build();
115+
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
116+
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
117+
.map(Object::toString).orElse("NULL")).build();
114118
DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef;
115119
requestRef.setDSSJobContent(dssJobContent);
116120
return ((RefCreationOperation) developmentOperation).createRef(requestRef);

dss-orchestrator/dss-orchestrator-loader/src/main/java/com/webank/wedatasphere/dss/orchestrator/loader/DefaultOrchestratorLoader.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,19 @@ protected DSSOrchestratorContext createOrchestratorContext() {
6363
appConnList.stream().filter(relation.isLinkedAppConn()).forEach(dssOrchestrator::addLinkedAppConn);
6464
AppConn appConn = AppConnManager.getAppConnManager().getAppConn(relation.getBindingAppConnName());
6565
dssOrchestrator.setAppConn(appConn);
66-
List<SchedulerAppConn> schedulerAppConns = AppConnManager.getAppConnManager().listAppConns(SchedulerAppConn.class);
67-
SchedulerAppConn schedulerAppConn;
68-
if(StringUtils.isBlank(relation.getBindingSchedulerAppConnName())) {
69-
schedulerAppConn = schedulerAppConns.get(0);
70-
} else {
71-
schedulerAppConn = schedulerAppConns.stream().filter(appConn1 -> appConn1.getAppDesc().getAppName().equals(relation.getBindingSchedulerAppConnName()))
72-
.findAny().orElseThrow(() -> new ExternalOperationFailedException(50032, "cannot find the matched SchedulerAppConn with name " + relation.getBindingSchedulerAppConnName()));
66+
if (appConnList.stream().anyMatch(t -> t instanceof SchedulerAppConn)) {
67+
List<SchedulerAppConn> schedulerAppConns = AppConnManager.getAppConnManager().listAppConns(SchedulerAppConn.class);
68+
SchedulerAppConn schedulerAppConn;
69+
if(StringUtils.isBlank(relation.getBindingSchedulerAppConnName())) {
70+
schedulerAppConn = schedulerAppConns.get(0);
71+
} else {
72+
schedulerAppConn = schedulerAppConns.stream().filter(appConn1 -> appConn1.getAppDesc().getAppName().equals(relation.getBindingSchedulerAppConnName()))
73+
.findAny().orElseThrow(() -> new ExternalOperationFailedException(50032, "cannot find the matched SchedulerAppConn with name " + relation.getBindingSchedulerAppConnName()));
74+
}
75+
dssOrchestrator.setSchedulerAppConn(schedulerAppConn);
76+
LOGGER.info("Load dss orchestrator: {}, with the binding AppConn: {} and binding SchedulerAppConn {}.",
77+
typeName, relation.getBindingAppConnName(), schedulerAppConn.getAppDesc().getAppName());
7378
}
74-
dssOrchestrator.setSchedulerAppConn(schedulerAppConn);
75-
LOGGER.info("Load dss orchestrator: {}, with the binding AppConn: {} and binding SchedulerAppConn {}.",
76-
typeName, relation.getBindingAppConnName(), schedulerAppConn.getAppDesc().getAppName());
7779
dssLabels.forEach(dssOrchestrator::addLinkedDssLabels);
7880
return dssOrchestrator;
7981
}

0 commit comments

Comments
 (0)