Skip to content

Commit c4d10fc

Browse files
SparkEtl AppConn optimization, support to query jumpUrl.
1 parent 1173afd commit c4d10fc

File tree

3 files changed

+48
-0
lines changed

3 files changed

+48
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.webank.wedatasphere.dss.appconn.sparketl.query;
2+
3+
import com.webank.wedatasphere.dss.standard.app.development.operation.AbstractDevelopmentOperation;
4+
import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation;
5+
import com.webank.wedatasphere.dss.standard.app.development.ref.QueryJumpUrlResponseRef;
6+
import com.webank.wedatasphere.dss.standard.app.development.ref.impl.OnlyDevelopmentRequestRef;
7+
import com.webank.wedatasphere.dss.standard.app.development.utils.QueryJumpUrlConstant;
8+
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
9+
import org.apache.commons.lang3.StringUtils;
10+
11+
public class SparkEtlRefQueryJumpUrlOperation extends AbstractDevelopmentOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef>
12+
implements RefQueryJumpUrlOperation<OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl, QueryJumpUrlResponseRef> {
13+
14+
@Override
15+
public QueryJumpUrlResponseRef query(OnlyDevelopmentRequestRef.QueryJumpUrlRequestRefImpl requestRef) throws ExternalOperationFailedException {
16+
String jumpUrl = mergeBaseUrl("#/sparketl?resourceId=%s&version=%s&%s=%s&%s=%s");
17+
String resourceId = (String) requestRef.getRefJobContent().get("resourceId");
18+
String version = (String) requestRef.getRefJobContent().get("version");
19+
if(StringUtils.isBlank(resourceId) || StringUtils.isBlank(version)) {
20+
logger.info("resourceId or version is empty, maybe user {} want to create a new node.", requestRef.getUserName());
21+
resourceId = "";
22+
version = "";
23+
}
24+
jumpUrl = String.format(jumpUrl, resourceId, version, QueryJumpUrlConstant.NODE_ID.getKey(),
25+
QueryJumpUrlConstant.NODE_ID.getValue(), QueryJumpUrlConstant.PROJECT_NAME.getKey(), QueryJumpUrlConstant.PROJECT_NAME.getValue());
26+
return QueryJumpUrlResponseRef.newBuilder().setJumpUrl(jumpUrl).build();
27+
}
28+
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.webank.wedatasphere.dss.appconn.sparketl.query;
2+
3+
import com.webank.wedatasphere.dss.standard.app.development.operation.RefQueryJumpUrlOperation;
4+
import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefQueryService;
5+
6+
public class SparkEtlRefQueryService extends AbstractRefQueryService {
7+
8+
@Override
9+
protected RefQueryJumpUrlOperation createRefQueryOperation() {
10+
return new SparkEtlRefQueryJumpUrlOperation();
11+
}
12+
}

dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/standard/SparkEtlDevelopmentStandard.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.webank.wedatasphere.dss.appconn.sparketl.standard;
22

33
import com.webank.wedatasphere.dss.appconn.sparketl.execution.SparkEtlExecutionService;
4+
import com.webank.wedatasphere.dss.appconn.sparketl.query.SparkEtlRefQueryService;
45
import com.webank.wedatasphere.dss.standard.app.development.service.RefExecutionService;
6+
import com.webank.wedatasphere.dss.standard.app.development.service.RefQueryService;
57
import com.webank.wedatasphere.dss.standard.app.development.standard.OnlyExecutionDevelopmentStandard;
68

79
public class SparkEtlDevelopmentStandard extends OnlyExecutionDevelopmentStandard {
@@ -11,6 +13,11 @@ protected RefExecutionService createRefExecutionService() {
1113
return new SparkEtlExecutionService();
1214
}
1315

16+
@Override
17+
protected RefQueryService createRefQueryService() {
18+
return new SparkEtlRefQueryService();
19+
}
20+
1421
@Override
1522
public void init() {
1623
}

0 commit comments

Comments
 (0)