Skip to content

Commit 274697a

Browse files
authored
Merge branch 'master' into master
2 parents 70a8052 + a691b7c commit 274697a

File tree

5 files changed

+125
-7
lines changed

5 files changed

+125
-7
lines changed

dss-appconn/appconns/dss-dolphinscheduler-appconn/src/main/resources/init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ VALUES(@dolphinscheduler_appconnId, @dolphinscheduler_menuId,'dolphinscheduler',
2020

2121
delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds";
2222
delete from dss_workspace_dictionary where dic_key = "pom_work_flow_ds_DAG";
23+
delete from dss_workspace_dictionary where dic_key = "pdp_scheduler_center";
2324
insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'p_orchestrator_mode','DS工作流','Workflow_DS','pom_work_flow_ds','radio',NULL,NULL,NULL,NULL,0,'gongzuoliu-icon',1,'工程编排模式-DS工作流','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35');
2425
insert into `dss_workspace_dictionary`(`workspace_id`,`parent_key`,`dic_name`,`dic_name_en`,`dic_key`,`dic_value`,`dic_value_en`,`title`,`title_en`,`url`,`url_type`,`icon`,`order_num`,`remark`,`create_user`,`create_time`,`update_user`,`update_time`) values (0,'pom_work_flow_ds','DAG','DAG','pom_work_flow_ds_DAG',NULL,NULL,NULL,NULL,NULL,0,NULL,1,'工程编排模式-DS工作流-DAG','SYSTEM','2022-03-21 14:25:35',NULL,'2022-03-21 14:25:35');
2526

2627
INSERT INTO dss_workspace_dictionary
2728
(workspace_id, parent_key, dic_name, dic_name_en, dic_key, dic_value, dic_value_en, title, title_en, url, url_type, icon, order_num, remark, create_user, create_time, update_user, update_time)
28-
VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0');
29+
VALUES(0, 'p_develop_process', '调度中心', 'Scheduler Center', 'pdp_scheduler_center', 'scheduler', NULL, NULL, NULL, NULL, 0, 'kaifa-icon', 1, '工程开发流程-调度中心', 'SYSTEM', '2020-12-28 17:32:35.0', NULL, '2021-02-22 17:49:02.0');

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/job/OrchestratorCopyJob.java

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
package com.webank.wedatasphere.dss.orchestrator.server.job;
22

33
import com.google.common.collect.Lists;
4+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationCreationOperation;
5+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.OrchestrationService;
6+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.DSSOrchestrationContentRequestRef;
7+
import com.webank.wedatasphere.dss.appconn.scheduler.structure.orchestration.ref.OrchestrationResponseRef;
48
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
59
import com.webank.wedatasphere.dss.common.label.DSSLabel;
610
import com.webank.wedatasphere.dss.common.utils.MapUtils;
7-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorCopyInfo;
8-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo;
9-
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion;
11+
import com.webank.wedatasphere.dss.orchestrator.common.entity.*;
1012
import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant;
1113
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
1214
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
15+
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
1316
import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils;
1417
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorCopyVo;
18+
import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService;
19+
import com.webank.wedatasphere.dss.orchestrator.server.service.impl.OrchestratorFrameworkServiceImpl;
1520
import com.webank.wedatasphere.dss.standard.app.development.operation.RefCopyOperation;
1621
import com.webank.wedatasphere.dss.standard.app.development.ref.CopyRequestRef;
1722
import com.webank.wedatasphere.dss.standard.app.development.ref.RefJobContentResponseRef;
@@ -33,6 +38,12 @@ public class OrchestratorCopyJob implements Runnable {
3338

3439
protected OrchestratorCopyEnv orchestratorCopyEnv;
3540

41+
private OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl;
42+
43+
private OrchestratorService orchestratorService;
44+
45+
private OrchestratorMapper orchestratorMapper;
46+
3647
private DSSOrchestratorCopyInfo orchestratorCopyInfo = new DSSOrchestratorCopyInfo(UUID.randomUUID().toString());
3748

3849

@@ -66,9 +77,10 @@ private void copyOrchestrator() {
6677
newOrchestrator.setDesc("copy from " + sourceOrchestrator.getName());
6778
newOrchestrator.setUpdateTime(null);
6879
newOrchestrator.setUpdateUser(null);
80+
DSSOrchestratorVersion dssOrchestratorVersion = null;
6981

7082
try {
71-
doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
83+
dssOrchestratorVersion = doOrchestratorCopy(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
7284
orchestratorCopyVo.getTargetProjectName(), Lists.newArrayList(orchestratorCopyVo.getDssLabel()), appId);
7385
} catch (Exception e) {
7486
//保存错误信息
@@ -91,9 +103,37 @@ private void copyOrchestrator() {
91103
orchestratorCopyInfo.setSuccessNode(Lists.newArrayList("All"));
92104
orchestratorCopyInfo.setStatus(1);
93105
orchestratorCopyEnv.getOrchestratorCopyJobMapper().updateCopyStatus(orchestratorCopyInfo);
106+
107+
List<DSSLabel> dssLabels = new ArrayList<>();
108+
dssLabels.add(orchestratorCopyVo.getDssLabel());
109+
110+
//2.如果调度系统要求同步创建工作流,向调度系统发送创建工作流的请求
111+
OrchestrationResponseRef orchestrationResponseRef = orchestratorFrameworkServiceImpl.tryOrchestrationOperation(dssLabels, true, orchestratorCopyVo.getUsername(),
112+
orchestratorCopyVo.getTargetProjectName(), orchestratorCopyVo.getWorkspace(), newOrchestrator,
113+
OrchestrationService::getOrchestrationCreationOperation,
114+
(structureOperation, structureRequestRef) -> ((OrchestrationCreationOperation) structureOperation)
115+
.createOrchestration((DSSOrchestrationContentRequestRef) structureRequestRef), "create");
116+
117+
try {
118+
orchestratorService.copyOrchestrator(orchestratorCopyVo.getUsername(), orchestratorCopyVo.getWorkspace(), orchestratorCopyVo.getTargetProjectName(),
119+
orchestratorCopyVo.getTargetProjectId(), newOrchestrator.getDesc(), newOrchestrator, dssLabels);
120+
} catch (Exception e) {
121+
throw new RuntimeException("error happened when copying orc.", e);
122+
}
123+
124+
Long orchestratorId = newOrchestrator.getId();
125+
Long orchestratorVersionId = dssOrchestratorVersion.getId();
126+
//4.将工程和orchestrator的关系存储到的数据库中
127+
if (orchestrationResponseRef != null) {
128+
Long refProjectId = (Long) orchestrationResponseRef.toMap().get("refProjectId");
129+
orchestratorMapper.addOrchestratorRefOrchestration(new DSSOrchestratorRefOrchestration(orchestratorId, refProjectId, orchestrationResponseRef.getRefOrchestrationId()));
130+
} else {
131+
LOGGER.info("copy orchestration {} with orchestratorId is {}, and versionId is {}, and orchestrationResponseRef is null.", newOrchestrator.getName(), orchestratorId, orchestratorVersionId);
132+
133+
}
94134
}
95135

96-
private void doOrchestratorCopy(String userName,
136+
private DSSOrchestratorVersion doOrchestratorCopy(String userName,
97137
Workspace workspace,
98138
DSSOrchestratorInfo dssOrchestratorInfo,
99139
String projectName,
@@ -159,4 +199,28 @@ public DSSOrchestratorCopyInfo getOrchestratorCopyInfo() {
159199
public void setOrchestratorCopyInfo(DSSOrchestratorCopyInfo orchestratorCopyInfo) {
160200
this.orchestratorCopyInfo = orchestratorCopyInfo;
161201
}
202+
203+
public OrchestratorFrameworkServiceImpl getOrchestratorFrameworkServiceImpl() {
204+
return orchestratorFrameworkServiceImpl;
205+
}
206+
207+
public void setOrchestratorFrameworkServiceImpl(OrchestratorFrameworkServiceImpl orchestratorFrameworkServiceImpl) {
208+
this.orchestratorFrameworkServiceImpl = orchestratorFrameworkServiceImpl;
209+
}
210+
211+
public OrchestratorService getOrchestratorService() {
212+
return orchestratorService;
213+
}
214+
215+
public void setOrchestratorService(OrchestratorService orchestratorService) {
216+
this.orchestratorService = orchestratorService;
217+
}
218+
219+
public OrchestratorMapper getOrchestratorMapper() {
220+
return orchestratorMapper;
221+
}
222+
223+
public void setOrchestratorMapper(OrchestratorMapper orchestratorMapper) {
224+
this.orchestratorMapper = orchestratorMapper;
225+
}
162226
}

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/OrchestratorService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,18 @@ void deleteOrchestrator(String userName,
7070
String projectName,
7171
Long orchestratorInfoId,
7272
List<DSSLabel> dssLabels) throws Exception;
73+
/**
74+
* 复制编排
75+
*
76+
*/
77+
OrchestratorVo copyOrchestrator(String userName,
78+
Workspace workspace,
79+
String projectName,
80+
Long projectId,
81+
String description,
82+
DSSOrchestratorInfo dssOrchestratorInfo,
83+
List<DSSLabel> dssLabels) throws Exception;
84+
7385

7486
/**
7587
* 解锁编排对应的工作流

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorFrameworkServiceImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public CommonOrchestratorVo createOrchestrator(String username, OrchestratorCrea
180180
return commonOrchestratorVo;
181181
}
182182

183-
private <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
183+
public <K extends StructureRequestRef, V extends ResponseRef> V tryOrchestrationOperation(List<DSSLabel> dssLabels, Boolean askProjectSender, String userName, String projectName,
184184
Workspace workspace, DSSOrchestratorInfo dssOrchestrator,
185185
Function<OrchestrationService, StructureOperation> getOrchestrationOperation,
186186
BiFunction<StructureOperation, K, V> responseRefConsumer, String operationName) {

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/service/impl/OrchestratorServiceImpl.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,47 @@ public void deleteOrchestrator(String userName,
240240
orchestratorMapper.deleteOrchestrator(orchestratorInfoId);
241241
}
242242

243+
@Override
244+
@Transactional(rollbackFor = Exception.class)
245+
public OrchestratorVo copyOrchestrator(String userName,
246+
Workspace workspace,
247+
String projectName,
248+
Long projectId,
249+
String description,
250+
DSSOrchestratorInfo dssOrchestratorInfo,
251+
List<DSSLabel> dssLabels) throws Exception {
252+
OrchestratorVo orchestratorVo = new OrchestratorVo();
253+
//todo 增加校验
254+
String uuid = UUID.randomUUID().toString();
255+
256+
//作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。
257+
dssOrchestratorInfo.setUUID(uuid);
258+
259+
String version = OrchestratorUtils.generateNewVersion();
260+
String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName);
261+
LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName());
262+
//1. 访问DSS工作流微模块创建工作流
263+
RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null,
264+
developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(),
265+
dssContextRequestRef -> dssContextRequestRef.setContextId(contextId),
266+
projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId),
267+
(developmentOperation, developmentRequestRef) -> {
268+
DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName,
269+
workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels);
270+
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
271+
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
272+
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
273+
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
274+
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
275+
.map(Object::toString).orElse("NULL")).build();
276+
DSSJobContentRequestRef requestRef = (DSSJobContentRequestRef) developmentRequestRef;
277+
requestRef.setDSSJobContent(dssJobContent);
278+
return ((RefCreationOperation) developmentOperation).createRef(requestRef);
279+
}, "create");
280+
281+
return orchestratorVo;
282+
}
283+
243284
@Override
244285
public OrchestratorUnlockVo unlockOrchestrator(String userName,
245286
Workspace workspace,

0 commit comments

Comments
 (0)