Skip to content

Commit aad3bdb

Browse files
committed
Adapt to import workflow from hdfs in the development center.
1 parent 2991a39 commit aad3bdb

File tree

1 file changed

+39
-27
lines changed
  • dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/restful

1 file changed

+39
-27
lines changed

dss-framework/dss-framework-orchestrator-server/src/main/java/com/webank/wedatasphere/dss/orchestrator/server/restful/OrchestratorIERestful.java

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.webank.wedatasphere.dss.orchestrator.server.restful;
1818

1919
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
20+
import com.webank.wedatasphere.dss.common.exception.DSSRuntimeException;
2021
import com.webank.wedatasphere.dss.common.label.DSSLabel;
2122
import com.webank.wedatasphere.dss.common.label.EnvDSSLabel;
2223
import com.webank.wedatasphere.dss.common.label.LabelKeyConvertor;
@@ -31,8 +32,11 @@
3132
import com.webank.wedatasphere.dss.standard.app.sso.Workspace;
3233
import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper;
3334
import org.apache.commons.io.IOUtils;
35+
import org.apache.linkis.common.io.FsPath;
36+
import org.apache.linkis.filesystem.service.FsService;
3437
import org.apache.linkis.server.Message;
3538
import org.apache.linkis.server.security.SecurityFilter;
39+
import org.apache.linkis.storage.fs.FileSystem;
3640
import org.slf4j.Logger;
3741
import org.slf4j.LoggerFactory;
3842
import org.springframework.beans.factory.annotation.Autowired;
@@ -57,6 +61,8 @@ public class OrchestratorIERestful {
5761
@Autowired
5862
private BMLService bmlService;
5963
@Autowired
64+
private FsService fsService;
65+
@Autowired
6066
OrchestratorService orchestratorService;
6167
@Autowired
6268
private DSSOrchestratorContext orchestratorContext;
@@ -66,37 +72,43 @@ public Message importOrcFile(HttpServletRequest req,
6672
@RequestParam(required = false, name = "projectName") String projectName,
6773
@RequestParam(required = false, name = "projectID") Long projectID,
6874
@RequestParam(required = false, name = "labels") String labels,
69-
@RequestParam(required = false, name = "packageFile") List<MultipartFile> packageFile,
70-
@RequestParam(required = false, name = "packageUri") List<MultipartFile> packageUri) throws Exception {
71-
List<MultipartFile> files;
72-
if (packageFile != null) {
73-
files = packageFile;
74-
} else {
75-
files = packageUri;
76-
}
77-
if (null == files || files.size() == 0) {
75+
@RequestParam(required = false, name = "packageFile") MultipartFile packageFile,
76+
@RequestParam(required = false, name = "packageUri") String packageUri) throws Exception {
77+
78+
if (null == packageFile && packageUri == null) {
7879
throw new DSSErrorException(100788, "Import orchestrator failed for files is empty");
7980
}
80-
Long importOrcId = 0L;
81-
for (MultipartFile p : files) {
82-
InputStream inputStream = p.getInputStream();
83-
String fileName = new String(p.getOriginalFilename().getBytes("ISO8859-1"), "UTF-8");
84-
String userName = SecurityFilter.getLoginUsername(req);
85-
//调用工具类生产label
86-
List<DSSLabel> dssLabelList = getDSSLabelList(labels);
87-
Workspace workspace = SSOHelper.getWorkspace(req);
81+
String userName = SecurityFilter.getLoginUsername(req);
82+
//调用工具类生产label
83+
List<DSSLabel> dssLabelList = getDSSLabelList(labels);
84+
Workspace workspace = SSOHelper.getWorkspace(req);
85+
InputStream inputStream;
86+
String fileName;
87+
if (packageFile != null) {
88+
inputStream = packageFile.getInputStream();
89+
fileName = new String(packageFile.getOriginalFilename().getBytes("ISO8859-1"), "UTF-8");
8890
//3、打包新的zip包上传BML
89-
Map<String, Object> resultMap = bmlService.upload(userName, inputStream,
90-
fileName, projectName);
91-
try {
92-
RequestImportOrchestrator importRequest = new RequestImportOrchestrator(userName, projectName,
93-
projectID, resultMap.get("resourceId").toString(),
94-
resultMap.get("version").toString(), null, dssLabelList, workspace);
95-
importOrcId = orchestratorContext.getDSSOrchestratorPlugin(ImportDSSOrchestratorPlugin.class).importOrchestrator(importRequest);
96-
} catch (Exception e) {
97-
logger.error("Import orchestrator failed for ", e);
98-
throw new DSSErrorException(100789, "Import orchestrator failed for " + e.getMessage());
91+
logger.info("User {} begin to import orchestrator file", userName);
92+
} else {
93+
FsPath fsPath = new FsPath(packageUri);
94+
FileSystem fileSystem = fsService.getFileSystem(userName, fsPath);
95+
if ( !fileSystem.exists(fsPath) ) {
96+
throw new DSSRuntimeException("路径上不存在文件!");
9997
}
98+
inputStream = fileSystem.read(fsPath);
99+
fileName = packageUri.substring(packageUri.lastIndexOf('/') + 1);
100+
}
101+
102+
Map<String, Object> resultMap = bmlService.upload(userName, inputStream, fileName, projectName);
103+
Long importOrcId;
104+
try {
105+
RequestImportOrchestrator importRequest = new RequestImportOrchestrator(userName, projectName,
106+
projectID, resultMap.get("resourceId").toString(),
107+
resultMap.get("version").toString(), null, dssLabelList, workspace);
108+
importOrcId = orchestratorContext.getDSSOrchestratorPlugin(ImportDSSOrchestratorPlugin.class).importOrchestrator(importRequest);
109+
} catch (Exception e) {
110+
logger.error("Import orchestrator failed for ", e);
111+
throw new DSSErrorException(100789, "Import orchestrator failed for " + e.getMessage());
100112
}
101113
return Message.ok().data("importOrcId", importOrcId);
102114
}

0 commit comments

Comments
 (0)