Skip to content

Commit 00645d5

Browse files
committed
fixbug:解决复制flow后发布到调度平台失败的问题
1 parent a780e98 commit 00645d5

File tree

4 files changed

+124
-6
lines changed

4 files changed

+124
-6
lines changed

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package com.webank.wedatasphere.dss.orchestrator.server.job;
22

33
import com.google.common.collect.Lists;
4+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation;
5+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService;
6+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef;
7+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef;
48
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
59
import com.webank.wedatasphere.dss.common.label.DSSLabel;
610
import com.webank.wedatasphere.dss.common.utils.MapUtils;
7-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo;
8-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo;
9-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion;
11+
import com.webank.wedatasphere.dss.orchestrator.common.entity.*;
1012
import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant;
1113
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
1214
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
15+
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
1316
import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils;
1417
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo;
18+
import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService;
19+
import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl;
1520
import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation;
1621
import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef;
1722
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef;
@@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable {
3338

3439
protected OrchestratorCopyEnv orchestratorCopyEnv;
3540

41+
private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl;
42+
43+
private OrchestratorService orchestratorService;
44+
45+
private OrchestratorMapper orchestratorMapper;
46+
3647
private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString());
3748

3849

@@ -66,9 +77,10 @@ private void copyOrchestrator() {
6677
newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName());
6778
newOrchestrator.setUpdateTime(null);
6879
newOrchestrator.setUpdateUser(null);
80+
DSSOrchestratorVersion dssOrchestratorVersion = null;
6981

7082
try {
71-
doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
83+
dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
7284
orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId);
7385
} catch (Exception e) {
7486
//保存错误信息
@@ -91,9 +103,37 @@ private void copyOrchestrator() {
91103
orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All"));
92104
orchestratorCopyInfo.setStatus(1);
93105
orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo);
106+
107+
List<DSSLabel> dssLabels = new ArrayList<>();
108+
dssLabels.add(orchestratorCopyVo.getDssLabel());
109+
110+
//2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求
111+
OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(),
112+
orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
113+
OrchestrationService::getOrchestrationCreationOperation,
114+
(structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation)
115+
.createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create");
116+
117+
try {
118+
orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(),
119+
orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels);
120+
} catch (Exception e) {
121+
throw new RuntimeException("error happened when copying orc.", e);
122+
}
123+
124+
Long orchestratorId = newOrchestrator.getId();
125+
Long orchestratorVersionId = dssOrchestratorVersion.getId();
126+
//4.将工程和orchestrator的关系存储到的数据库中
127+
if (orchestrationResponseRef != null) {
128+
Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId");
129+
orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId()));
130+
} else {
131+
LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId);
132+
133+
}
94134
}
95135

96-
private void doOrchestratorCopy(String userName,
136+
private DSSOrchestratorVersion doOrchestratorCopy(String userName,
97137
Workspace workspace,
98138
DSSOrchestratorInfo dssOrchestratorInfo,
99139
String projectName,
@@ -132,6 +172,7 @@ private void doOrchestratorCopy(String userName,
132172
orchestratorCopyEnv.getOrchestratorMapper().addOrchestrator(dssOrchestratorInfo);
133173
dssOrchestratorVersion.setOrchestratorId(dssOrchestratorInfo.getId());
134174
orchestratorCopyEnv.getOrchestratorMapper().addOrchestratorVersion(dssOrchestratorVersion);
175+
return dssOrchestratorVersion;
135176
}
136177

137178

@@ -158,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() {
158199
public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) {
159200
this.orchestratorCopyInfo = orchestratorCopyInfo;
160201
}
202+
203+
public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() {
204+
return orchestratorFrameworkServiceImpl;
205+
}
206+
207+
public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) {
208+
this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl;
209+
}
210+
211+
public OrchestratorService getOrchestratorService() {
212+
return orchestratorService;
213+
}
214+
215+
public void setOrchestratorService(OrchestratorService orchestratorService) {
216+
this.orchestratorService = orchestratorService;
217+
}
218+
219+
public OrchestratorMapper getOrchestratorMapper() {
220+
return orchestratorMapper;
221+
}
222+
223+
public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) {
224+
this.orchestratorMapper = orchestratorMapper;
225+
}
161226
}

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ void deleteOrchestrator(String userName,
7070
String projectName,
7171
Long orchestratorInfoId,
7272
List<DSSLabel> dssLabels) throws Exception;
73+
/**
74+
* 复制编排
75+
*
76+
*/
77+
OrchestratorVo copyOrchestrator(String userName,
78+
Workspace workspace,
79+
String projectName,
80+
Long projectId,
81+
String description,
82+
DSSOrchestratorInfo dssOrchestratorInfo,
83+
List<DSSLabel> dssLabels) throws Exception;
84+
7385

7486
/**
7587
* 解锁编排对应的工作流

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea
177177
return commonOrchestratorVo;
178178
}
179179

180-
private <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
180+
public <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
181181
Workspace workspace, DSSOrchestratorInfo dssOrchestrator,
182182
Function<OrchestrationService, StructureOperation> getOrchestrationOperation,
183183
BiFunction<StructureOperation, K, V> responseRefConsumer, String operationName) {

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,47 @@ public void deleteOrchestrator(String userName,
238238
orchestratorMapper.deleteOrchestrator(orchestratorInfoId);
239239
}
240240

241+
@Override
242+
@Transactional(rollbackFor = Exception.class)
243+
public OrchestratorVo copyOrchestrator(String userName,
244+
Workspace workspace,
245+
String projectName,
246+
Long projectId,
247+
String description,
248+
DSSOrchestratorInfo dssOrchestratorInfo,
249+
List<DSSLabel> dssLabels) throws Exception {
250+
OrchestratorVo orchestratorVo = new OrchestratorVo();
251+
//todo 增加校验
252+
String uuid = UUID.randomUUID().toString();
253+
254+
//作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。
255+
dssOrchestratorInfo.setUUID(uuid);
256+
257+
String version = OrchestratorUtils.generateNewVersion();
258+
String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName);
259+
LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName());
260+
//1. 访问DSS工作流微模块创建工作流
261+
RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null,
262+
developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(),
263+
dssContextRequestRef -> dssContextRequestRef.setContextId(contextId),
264+
projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId),
265+
(developmentOperation, developmentRequestRef) -> {
266+
DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName,
267+
workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels);
268+
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
269+
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
270+
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
271+
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
272+
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
273+
.map(Object::toString).orElse("NULL")).build();
274+
DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef;
275+
requestRef.setDSSJobContent(dssJobContent);
276+
return ((RefCreationOperation) developmentOperation).createRef(requestRef);
277+
}, "create");
278+
279+
return orchestratorVo;
280+
}
281+
241282
@Override
242283
public OrchestratorUnlockVo unlockOrchestrator(String userName,
243284
Workspace workspace,

0 commit comments

Comments
 (0)