Skip to content

Commit 5775fcc

Browse files
committed
Fix the problem of killing workflow.
1 parent a6cf02e commit 5775fcc

File tree

2 files changed

+56
-3
lines changed
  • dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main

2 files changed

+56
-3
lines changed

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowEntranceRestfulApi.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.webank.wedatasphere.dss.flow.execution.entrance.restful;
1818

19+
import com.fasterxml.jackson.databind.JsonNode;
1920
import com.webank.wedatasphere.dss.common.entity.DSSWorkspace;
2021
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
2122
import com.webank.wedatasphere.dss.standard.sso.utils.SSOHelper;
@@ -29,15 +30,19 @@
2930
import org.apache.linkis.protocol.constants.TaskConstant;
3031
import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
3132
import org.apache.linkis.rpc.Sender;
33+
import org.apache.linkis.scheduler.listener.LogListener;
3234
import org.apache.linkis.scheduler.queue.Job;
35+
import org.apache.linkis.scheduler.queue.SchedulerEventState;
3336
import org.apache.linkis.server.Message;
3437
import org.apache.linkis.server.security.SecurityFilter;
3538
import org.slf4j.Logger;
3639
import org.slf4j.LoggerFactory;
3740
import org.springframework.web.bind.annotation.*;
41+
import scala.Function0;
3842
import scala.Option;
3943

4044
import javax.servlet.http.HttpServletRequest;
45+
import java.util.ArrayList;
4146
import java.util.Map;
4247

4348

@@ -126,6 +131,54 @@ public Message status(@PathVariable("id") String id, @RequestParam(required = fa
126131
return message;
127132
}
128133

134+
@Override
135+
@RequestMapping(path = {"/{id}/kill"},method = {RequestMethod.GET})
136+
public Message kill(@PathVariable("id") String id, @RequestParam(value = "taskID",required = false) Long taskID) {
137+
String realId = ZuulEntranceUtils.parseExecID(id)[3];
138+
Option job = Option.apply((Object)null);
139+
try {
140+
job = this.entranceServer.getJob(realId);
141+
} catch (Exception var10) {
142+
logger.warn("can not find a job in entranceServer, will force to kill it", var10);
143+
JobHistoryHelper.forceKill(taskID);
144+
Message message = Message.ok("Forced Kill task (强制杀死任务)");
145+
message.setMethod("/api/entrance/" + id + "/kill");
146+
message.setStatus(0);
147+
return message;
148+
}
149+
Message message = null;
150+
if (job.isEmpty()) {
151+
logger.warn("can not find a job in entranceServer, will force to kill it");
152+
JobHistoryHelper.forceKill(taskID);
153+
message = Message.ok("Forced Kill task (强制杀死任务)");
154+
message.setMethod("/api/entrance/" + id + "/kill");
155+
message.setStatus(0);
156+
return message;
157+
} else {
158+
try {
159+
logger.info("begin to kill job {} ", ((Job)job.get()).getId());
160+
((Job)job.get()).kill();
161+
message = Message.ok("Successfully killed the job(成功kill了job)");
162+
message.setMethod("/api/entrance/" + id + "/kill");
163+
message.setStatus(0);
164+
message.data("execID", id);
165+
if (job.get() instanceof EntranceJob) {
166+
EntranceJob entranceJob = (EntranceJob)job.get();
167+
JobRequest jobReq = entranceJob.getJobRequest();
168+
entranceJob.updateJobRequestStatus(SchedulerEventState.Cancelled().toString());
169+
this.entranceServer.getEntranceContext().getOrCreatePersistenceManager().createPersistenceEngine().updateIfNeeded(jobReq);
170+
}
171+
logger.info("end to kill job {} ", ((Job)job.get()).getId());
172+
} catch (Throwable var9) {
173+
logger.error("kill job {} failed ", ((Job)job.get()).getId(), var9);
174+
message = Message.error("An exception occurred while killing the job, kill failed(kill job的时候出现了异常,kill失败)");
175+
message.setMethod("/api/entrance/" + id + "/kill");
176+
message.setStatus(1);
177+
}
178+
return message;
179+
}
180+
}
181+
129182
private void pushLog(String log, Job job) {
130183
entranceServer.getEntranceContext().getOrCreateLogManager().onLogUpdate(job, log);
131184
}

dss-orchestrator/orchestrators/dss-workflow/dss-flow-execution-server/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/job/FlowEntranceJob.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,15 +137,15 @@ class FlowEntranceJob(persistManager:PersistenceManager) extends EntranceExecuti
137137
if(! SchedulerEventState.isCompleted(this.getState)){
138138
super.kill()
139139
Utils.tryAndWarn(this.killNodes)
140-
transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
141-
}
140+
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"execute job(${getId}) failed!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
141+
}
142142
}
143143

144144
override def cancel(): Unit = if (! SchedulerEventState.isCompleted(this.getState)) this synchronized {
145145
if(! SchedulerEventState.isCompleted(this.getState)){
146146
Utils.tryAndWarn(this.killNodes)
147147
super.cancel()
148-
transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user") ))
148+
Utils.tryAndWarn(transitionCompleted(ErrorExecuteResponse(s"cancel job(${getId}) execution!", new FlowExecutionErrorException(90101, s"This Flow killed by user"))))
149149
}
150150
}
151151

0 commit comments

Comments
 (0)