Skip to content

Commit 64c5891

Browse files
committed
Adapt to import workflow from hdfs in the development center.
1 parent 2991a39 commit 64c5891

File tree

1 file changed

+44
-31
lines changed
  • dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/restful

1 file changed

+44
-31
lines changed

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/restful/OrchestratorIERestful.java

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package com.webank.wedatasphere.dss.orchestrator.server.restful;
1818

19+
import com.webank.wedatasphere.dss.common.entity.BmlResource;
1920
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
21+
import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException;
2022
import com.webank.wedatasphere.dss.common.label.DSSLabel;
2123
import com.webank.wedatasphere.dss.common.label.EnvDSSLabel;
2224
import com.webank.wedatasphere.dss.common.label.LabelKeyConvertor;
@@ -31,8 +33,11 @@
3133
import com.webank.wedatasphere.dss.standard.app.sso.Workspace;
3234
import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper;
3335
import org.apache.commons.io.IOUtils;
36+
import org.apache.linkis.common.io.FsPath;
37+
import org.apache.linkis.filesystem.service.FsService;
3438
import org.apache.linkis.server.Message;
3539
import org.apache.linkis.server.security.SecurityFilter;
40+
import org.apache.linkis.storage.fs.FileSystem;
3641
import org.slf4j.Logger;
3742
import org.slf4j.LoggerFactory;
3843
import org.springframework.beans.factory.annotation.Autowired;
@@ -57,6 +62,8 @@ public class OrchestratorIERestful {
5762
@Autowired
5863
private BMLService bmlService;
5964
@Autowired
65+
private FsService fsService;
66+
@Autowired
6067
OrchestratorService orchestratorService;
6168
@Autowired
6269
private DSSOrchestratorContext orchestratorContext;
@@ -66,37 +73,43 @@ public Message importOrcFile(HttpServletRequest req,
6673
@RequestParam(required = false, name = "projectName") String projectName,
6774
@RequestParam(required = false, name = "projectID") Long projectID,
6875
@RequestParam(required = false, name = "labels") String labels,
69-
@RequestParam(required = false, name = "packageFile") List<MultipartFile> packageFile,
70-
@RequestParam(required = false, name = "packageUri") List<MultipartFile> packageUri) throws Exception {
71-
List<MultipartFile> files;
72-
if (packageFile != null) {
73-
files = packageFile;
74-
} else {
75-
files = packageUri;
76-
}
77-
if (null == files || files.size() == 0) {
76+
@RequestParam(required = false, name = "packageFile") MultipartFile packageFile,
77+
@RequestParam(required = false, name = "packageUri") String packageUri) throws Exception {
78+
79+
if (null == packageFile && packageUri == null) {
7880
throw new DSSErrorException(100788, "Import orchestrator failed for files is empty");
7981
}
80-
Long importOrcId = 0L;
81-
for (MultipartFile p : files) {
82-
InputStream inputStream = p.getInputStream();
83-
String fileName = new String(p.getOriginalFilename().getBytes("ISO8859-1"), "UTF-8");
84-
String userName = SecurityFilter.getLoginUsername(req);
85-
//调用工具类生产label
86-
List<DSSLabel> dssLabelList = getDSSLabelList(labels);
87-
Workspace workspace = SSOHelper.getWorkspace(req);
82+
String userName = SecurityFilter.getLoginUsername(req);
83+
//调用工具类生产label
84+
List<DSSLabel> dssLabelList = getDSSLabelList(labels);
85+
Workspace workspace = SSOHelper.getWorkspace(req);
86+
InputStream inputStream;
87+
String fileName;
88+
if (packageFile != null) {
89+
inputStream = packageFile.getInputStream();
90+
fileName = new String(packageFile.getOriginalFilename().getBytes("ISO8859-1"), "UTF-8");
8891
//3、打包新的zip包上传BML
89-
Map<String, Object> resultMap = bmlService.upload(userName, inputStream,
90-
fileName, projectName);
91-
try {
92-
RequestImportOrchestrator importRequest = new RequestImportOrchestrator(userName, projectName,
93-
projectID, resultMap.get("resourceId").toString(),
94-
resultMap.get("version").toString(), null, dssLabelList, workspace);
95-
importOrcId = orchestratorContext.getDSSOrchestratorPlugin(ImportDSSOrchestratorPlugin.class).importOrchestrator(importRequest);
96-
} catch (Exception e) {
97-
logger.error("Import orchestrator failed for ", e);
98-
throw new DSSErrorException(100789, "Import orchestrator failed for " + e.getMessage());
92+
logger.info("User {} begin to import orchestrator file", userName);
93+
} else {
94+
FsPath fsPath = new FsPath(packageUri);
95+
FileSystem fileSystem = fsService.getFileSystem(userName, fsPath);
96+
if ( !fileSystem.exists(fsPath) ) {
97+
throw new DSSRuntimeException("路径上不存在文件!");
9998
}
99+
inputStream = fileSystem.read(fsPath);
100+
fileName = packageUri.substring(packageUri.lastIndexOf('/') + 1);
101+
}
102+
103+
BmlResource resultMap = bmlService.upload(userName, inputStream, fileName, projectName);
104+
Long importOrcId;
105+
try {
106+
RequestImportOrchestrator importRequest = new RequestImportOrchestrator(userName, projectName,
107+
projectID, resultMap.getResourceId(),
108+
resultMap.getVersion(), null, dssLabelList, workspace);
109+
importOrcId = orchestratorContext.getDSSOrchestratorPlugin(ImportDSSOrchestratorPlugin.class).importOrchestrator(importRequest);
110+
} catch (Exception e) {
111+
logger.error("Import orchestrator failed for ", e);
112+
throw new DSSErrorException(100789, "Import orchestrator failed for " + e.getMessage());
100113
}
101114
return Message.ok().data("importOrcId", importOrcId);
102115
}
@@ -118,7 +131,7 @@ public void exportOrcFile(HttpServletRequest req,
118131
Workspace workspace = SSOHelper.getWorkspace(req);
119132
String userName = SecurityFilter.getLoginUsername(req);
120133
List<DSSLabel> dssLabelList = getDSSLabelList(labels);
121-
Map<String, Object> res = null;
134+
BmlResource res;
122135
OrchestratorVo orchestratorVo;
123136
if (orcVersionId != null) {
124137
orchestratorVo = orchestratorService.getOrchestratorVoByIdAndOrcVersionId(orchestratorId, orcVersionId);
@@ -129,15 +142,15 @@ public void exportOrcFile(HttpServletRequest req,
129142
logger.info("export orchestrator orchestratorId " + orchestratorId + ",orcVersionId:" + orcVersionId);
130143
try {
131144
res = orchestratorContext.getDSSOrchestratorPlugin(ExportDSSOrchestratorPlugin.class).exportOrchestrator(userName,
132-
orchestratorId, orcVersionId, projectName, dssLabelList, addOrcVersion, workspace);
145+
orchestratorId, orcVersionId, projectName, dssLabelList, addOrcVersion, workspace).getBmlResource();
133146
} catch (Exception e) {
134147
logger.error("export orchestrator failed for ", e);
135148
throw new DSSErrorException(100789, "export orchestrator failed for " + e.getMessage());
136149
}
137150
if (null != res) {
138151
Map<String, Object> downRes = bmlService.download(userName,
139-
res.get("resourceId").toString(),
140-
res.get("version").toString());
152+
res.getResourceId(),
153+
res.getVersion());
141154

142155
InputStream inputStream = (InputStream) downRes.get("is");
143156
try {

0 commit comments

Comments
 (0)