Skip to content

Commit 1ef1895

Browse files
support to execute bml2linkis node type.
1 parent 2ea86ff commit 1ef1895

File tree

2 files changed

+67
-135
lines changed

2 files changed

+67
-135
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.webank.wedatasphere.dss.linkis.node.execution.parser;
2+
3+
import com.google.gson.reflect.TypeToken;
4+
import com.webank.wedatasphere.dss.linkis.node.execution.entity.BMLResource;
5+
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException;
6+
import com.webank.wedatasphere.dss.linkis.node.execution.job.AppConnLinkisJob;
7+
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
8+
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;
9+
10+
import java.util.Map;
11+
12+
public class BML2LinkisJobParser extends CodeParser {
13+
14+
@Override
15+
public void parseJob(Job job) throws Exception {
16+
if(!(job instanceof AppConnLinkisJob)) {
17+
return;
18+
}
19+
AppConnLinkisJob appConnLinkisJob = (AppConnLinkisJob) job;
20+
String runType = appConnLinkisJob.getRunType();
21+
// 只处理包含 bml2linkis 的 AppConnLinkisJob,例如:linkis.appconn.<appconnName>.bml2linkis
22+
if(!runType.toLowerCase().contains("bml2linkis")) {
23+
return;
24+
}
25+
job.getLogObj().info(String.format("AppConn %s try to generate Linkis jobContent from code %s.", runType,
26+
job.getCode()));
27+
Map<String, Object> script = LinkisJobExecutionUtils.gson.fromJson(job.getCode(), new TypeToken<Map<String, Object>>() {}.getType());
28+
if(!script.containsKey("resourceId") || !script.containsKey("version") || !script.containsKey("fileName")) {
29+
job.getLogObj().error("the code error, resourceId, version or fileName is not exists.");
30+
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent.");
31+
}
32+
BMLResource bmlResource = new BMLResource();
33+
bmlResource.setResourceId((String) script.get("resourceId"));
34+
bmlResource.setVersion((String) script.get("version"));
35+
// fileName 的格式为 ${resourceId}.${engineType}.${runType}
36+
bmlResource.setFileName((String) script.get("fileName"));
37+
getAndSetCode(bmlResource, appConnLinkisJob);
38+
String[] fileNameArray = bmlResource.getFileName().split("\\.");
39+
if(fileNameArray.length < 3) {
40+
job.getLogObj().error(String.format("cannot recognize fileName %s, the fileName format must be ${resourceId}.${engineType}.${runType}", bmlResource.getFileName()));
41+
throw new LinkisJobExecutionErrorException(90100, "cannot recognize fileName from jobContent.");
42+
}
43+
String realEngineType = fileNameArray[fileNameArray.length - 2];
44+
String realRunType = fileNameArray[fileNameArray.length - 1];
45+
setEngineType(job, realEngineType, realRunType);
46+
}
47+
48+
protected void setEngineType(Job job, String realEngineType, String realRunType) {
49+
job.getLogObj().warn(String.format("switch job from engineType %s with runType %s to engineType %s with runType %s",
50+
job.getEngineType(), job.getRunType(), realEngineType, realRunType));
51+
job.setEngineType(realEngineType);
52+
job.setRunType(realRunType);
53+
}
54+
55+
}

dss-orchestrator/orchestrators/dss-workflow/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/CodeParser.java

Lines changed: 12 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,17 @@
2222
import com.webank.wedatasphere.dss.linkis.node.execution.exception.LinkisJobExecutionErrorException;
2323
import com.webank.wedatasphere.dss.linkis.node.execution.job.CommonLinkisJob;
2424
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
25+
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
2526
import com.webank.wedatasphere.dss.linkis.node.execution.service.LinkisURLService;
2627
import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils;
2728
import org.apache.linkis.filesystem.WorkspaceClientFactory;
2829
import org.apache.linkis.filesystem.request.WorkspaceClient;
2930
import org.apache.linkis.filesystem.response.ScriptFromBMLResponse;
3031

31-
import java.util.ArrayList;
3232
import java.util.HashMap;
3333
import java.util.List;
3434
import java.util.Map;
35-
import java.util.Optional;
36-
import java.util.regex.Matcher;
3735
import java.util.regex.Pattern;
38-
import org.apache.commons.lang3.StringUtils;
3936

4037

4138
public class CodeParser implements JobParser {
@@ -44,9 +41,10 @@ public class CodeParser implements JobParser {
4441
private volatile WorkspaceClient client1X = null;
4542
private volatile WorkspaceClient client0X = null;
4643
private final Object clientLocker = new Object();
44+
4745
@Override
4846
public void parseJob(Job job) throws Exception{
49-
if (! ( job instanceof CommonLinkisJob) ) {
47+
if (! (job instanceof CommonLinkisJob) ) {
5048
return ;
5149
}
5250
CommonLinkisJob linkisAppConnJob = (CommonLinkisJob) job;
@@ -66,8 +64,11 @@ public void parseJob(Job job) throws Exception{
6664
if(null == scriptResource) {
6765
throw new LinkisJobExecutionErrorException(90102,"Failed to get script resource");
6866
}
67+
getAndSetCode(scriptResource, linkisAppConnJob);
68+
}
6969

70-
Map<String, Object> executionParams = getExecutionParams(scriptResource, linkisAppConnJob);
70+
protected void getAndSetCode(BMLResource bmlResource, LinkisJob linkisAppConnJob) {
71+
Map<String, Object> executionParams = getExecutionParams(bmlResource, linkisAppConnJob);
7172
if (executionParams.get("executionCode") != null) {
7273
String executionCode = (String) executionParams.get("executionCode");
7374
linkisAppConnJob.getLogObj().info("************************************SUBMIT CODE************************************");
@@ -81,9 +82,12 @@ public void parseJob(Job job) throws Exception{
8182
linkisAppConnJob.getParams().putAll( (Map<String, Object>)executionParams.get("params"));
8283
}
8384
}
85+
dealExecutionParams(linkisAppConnJob, executionParams);
8486
}
8587

86-
private Map<String, Object> getExecutionParams(BMLResource bmlResource, CommonLinkisJob linkisAppConnJob) {
88+
protected void dealExecutionParams(LinkisJob linkisAppConnJob, Map<String, Object> executionParams) {}
89+
90+
protected Map<String, Object> getExecutionParams(BMLResource bmlResource, LinkisJob linkisAppConnJob) {
8791
Map<String, Object> map = new HashMap<>();
8892
ScriptFromBMLResponse response = getOrCreateWorkSpaceClient(linkisAppConnJob).requestOpenScriptFromBML(bmlResource.getResourceId(), bmlResource.getVersion(), bmlResource.getFileName());
8993
linkisAppConnJob.getLogObj().info("Get execution code from workspace client,bml resource id "+bmlResource.getResourceId()+", version is "+bmlResource.getVersion());
@@ -92,7 +96,7 @@ private Map<String, Object> getExecutionParams(BMLResource bmlResource, CommonL
9296
return map;
9397
}
9498

95-
private WorkspaceClient getOrCreateWorkSpaceClient(CommonLinkisJob linkisAppConnJob) {
99+
private WorkspaceClient getOrCreateWorkSpaceClient(LinkisJob linkisAppConnJob) {
96100
Map<String, String> props = linkisAppConnJob.getJobProps();
97101
if(LinkisJobExecutionConfiguration.isLinkis1_X(props)) {
98102
if (null == client1X) {
@@ -119,131 +123,4 @@ private WorkspaceClient getOrCreateWorkSpaceClient(CommonLinkisJob linkisAppConn
119123
}
120124
}
121125

122-
private ArrayList<String> getResourceNames(String code){
123-
ArrayList<String> bmlResourceNames = new ArrayList<String>();
124-
Matcher mb = pb.matcher(code);
125-
while (mb.find()) {
126-
bmlResourceNames.add(mb.group().trim());
127-
}
128-
return bmlResourceNames;
129-
}
130-
131-
132-
/**
133-
* 1.Find the project file used in the script
134-
* 2.Find the node file used in the script
135-
* 3.Recursively find the flow file used in the script
136-
* 4.Replace file name with prefixed name
137-
* @param resourceNames
138-
* @param linkisAppConnJob
139-
* @return
140-
*/
141-
private ArrayList<BMLResource> getResourcesByNames(ArrayList<String> resourceNames, CommonLinkisJob linkisAppConnJob) {
142-
143-
ArrayList<BMLResource> bmlResourceArrayList = new ArrayList<>();
144-
145-
String jobName = linkisAppConnJob.getJobName();
146-
String flowName = linkisAppConnJob.getSource().get("flowName");
147-
String projectName = linkisAppConnJob.getSource().get("projectName");
148-
149-
150-
List<BMLResource> projectResourceList = linkisAppConnJob.getProjectResourceList();
151-
152-
153-
List<BMLResource> jobResourceList = linkisAppConnJob.getJobResourceList();
154-
for (String resourceName : resourceNames) {
155-
String[] resourceNameSplit = resourceName.split("://");
156-
String prefix = resourceNameSplit[0].toLowerCase();
157-
String fileName = resourceNameSplit[1];
158-
BMLResource resource = null;
159-
String afterFileName = fileName;
160-
switch (prefix) {
161-
case "project":
162-
resource = findResource(projectResourceList, fileName);
163-
afterFileName = LinkisJobExecutionConfiguration.PROJECT_PREFIX + "_" + projectName + "_" + fileName;
164-
break;
165-
case "flow":
166-
resource = findFlowResource(linkisAppConnJob, fileName, flowName);
167-
break;
168-
case "node":
169-
resource = findResource(jobResourceList, fileName);
170-
afterFileName = LinkisJobExecutionConfiguration.JOB_PREFIX + "_" + jobName + "_" + fileName;
171-
break;
172-
default:
173-
}
174-
if (null == resource) {
175-
linkisAppConnJob.getLogObj().error("Failed to find the " + prefix + " resource file of " + fileName);
176-
throw new RuntimeException("Failed to find the " + prefix + " resource file of " + fileName);
177-
}
178-
if (!afterFileName.equals(fileName)) {
179-
resource.setFileName(afterFileName);
180-
}
181-
bmlResourceArrayList.add(resource);
182-
}
183-
return bmlResourceArrayList;
184-
}
185-
186-
187-
/**
188-
* Recursively find the flow file used in the script
189-
* Recursive exit condition is top-level flow
190-
*
191-
*/
192-
private BMLResource findFlowResource(CommonLinkisJob linkisAppConnJob, String fileName, String flowName) {
193-
194-
String fullFlowName = "";
195-
Map<String, List<BMLResource>> fLowNameAndResources = linkisAppConnJob.getFlowNameAndResources();
196-
if (fLowNameAndResources == null){
197-
return null;
198-
}
199-
Optional<Map.Entry<String, List<BMLResource>>> first = fLowNameAndResources.entrySet().stream().filter(fLowNameAndResource -> fLowNameAndResource.getKey().endsWith(flowName + LinkisJobExecutionConfiguration.RESOURCES_NAME)).findFirst();
200-
201-
if(first.isPresent()){
202-
fullFlowName = first.get().getKey();
203-
BMLResource resource = findResource(first.get().getValue(), fileName);
204-
if (resource != null) {
205-
resource.setFileName(flowName + "_" + fileName);
206-
return resource;
207-
}
208-
}
209-
210-
String firstFlow = "flow." + flowName + LinkisJobExecutionConfiguration.RESOURCES_NAME;
211-
if (firstFlow.equals(fullFlowName)) {
212-
return null;
213-
}
214-
//getParentFlowName:flow.flows1.test.resources return:flows1
215-
String parentFlowName = StringUtils.substringAfterLast(StringUtils.substringBefore(fullFlowName, "." + flowName
216-
+ LinkisJobExecutionConfiguration.RESOURCES_NAME), ".");
217-
if (StringUtils.isEmpty(parentFlowName)) {
218-
return null;
219-
}
220-
221-
return findFlowResource(linkisAppConnJob, fileName, parentFlowName);
222-
}
223-
224-
225-
private String replaceCodeResourceNames(String code, ArrayList<String> resourceNameList, ArrayList<BMLResource> resourceList){
226-
if(resourceList.size() != resourceNameList.size()){
227-
throw new RuntimeException("Failed to parsed resource file");
228-
}
229-
230-
String[] names = resourceNameList.toArray(new String[]{});
231-
232-
String[] afterNames = new String[resourceList.size()];
233-
for (int i=0 ; i < afterNames.length ; i++){
234-
afterNames[i] = resourceList.get(i).getFileName();
235-
}
236-
return StringUtils.replaceEach(code, names, afterNames);
237-
}
238-
239-
private BMLResource findResource(List<BMLResource> resourceArrayList, String fileName){
240-
if(resourceArrayList != null && !resourceArrayList.isEmpty()) {
241-
for(BMLResource resource : resourceArrayList){
242-
if(resource.getFileName().equals(fileName)){
243-
return resource;
244-
}
245-
}
246-
}
247-
return null;
248-
}
249126
}

0 commit comments

Comments
 (0)