Skip to content

Commit 5293252

Browse files
committed
Fixes Compi hang when loop iterations of binded foreach abort
When a more than one loop iteration inside a foreach abort, the dependant binded iterations were not notified (only the dependants of the first one iteration), so they were waiting forever to start, leading Compi to hang. This commit fixes this bug.
1 parent c47ec24 commit 5293252

File tree

12 files changed

+162
-34
lines changed

12 files changed

+162
-34
lines changed

cli/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<parent>
1010
<groupId>org.sing_group</groupId>
1111
<artifactId>compi</artifactId>
12-
<version>1.4.0</version>
12+
<version>1.4.1</version>
1313
<!--
1414
WARNING: change version using (in the parent project):
1515
mvn versions:set -DnewVersion=[new_version]

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<parent>
1111
<groupId>org.sing_group</groupId>
1212
<artifactId>compi</artifactId>
13-
<version>1.4.0</version>
13+
<version>1.4.1</version>
1414
<!--
1515
WARNING: change version using (in the parent project):
1616
mvn versions:set -DnewVersion=[new_version]

core/src/main/java/org/sing_group/compi/core/CompiApp.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ public void taskAborted(final Task task, final CompiTaskAbortedException e) {
553553
synchronized (syncMonitor) {
554554
notifyTaskAborted(task, e);
555555
taskManager.setAborted(task, e);
556-
abortDependencies(task, e);
556+
abortDependencies(task, e, false);
557557
syncMonitor.notify();
558558
}
559559
}
@@ -610,29 +610,33 @@ public void taskIterationAborted(ForeachIteration iteration, CompiTaskAbortedExc
610610
!iteration.getParentForeachTask().isAborted()
611611
&& !foreachAbortedNotificationsSent.contains(iteration.getParentForeachTask())
612612
) {
613-
this.notifyTaskAborted(iteration.getParentForeachTask(), e);
614-
taskManager.setAborted(iteration.getParentForeachTask(), e);
615-
616-
abortDependencies(iteration, e);
617-
foreachAbortedNotificationsSent.add(iteration.getParentForeachTask());
618-
syncMonitor.notify();
613+
abortParentForeach(e, iteration);
614+
abortDependencies(iteration, e, false);
615+
} else {
616+
abortDependencies(iteration, e, true);
619617
}
618+
syncMonitor.notify();
620619
}
621620
}
622621

623-
private void abortDependencies(Task task, CompiTaskAbortedException e) {
622+
private void abortDependencies(Task task, CompiTaskAbortedException e, boolean onlyForeachIterations) {
624623
for (final Task taskToAbort : taskManager.getDependantTasks(task)) {
625624
if (taskManager.getTasksLeft().contains(taskToAbort)) {
626625
if (!taskToAbort.isSkipped()) {
627626
if (taskToAbort instanceof ForeachIteration) {
627+
ForeachIteration iteration = (ForeachIteration) taskToAbort;
628+
if (!foreachAbortedNotificationsSent.contains(iteration.getParentForeachTask())) {
629+
abortParentForeach(e, iteration);
630+
}
631+
628632
notifyTaskIterationAborted(
629633
(ForeachIteration) taskToAbort,
630634
new CompiTaskAbortedException(
631635
"Aborted because a dependency of this task has aborted (" + e.getTask().getId() + ")",
632636
e, taskToAbort, new LinkedList<>(), new LinkedList<>()
633637
)
634638
);
635-
} else {
639+
} else if (!onlyForeachIterations) {
636640
notifyTaskAborted(
637641
taskToAbort,
638642
new CompiTaskAbortedException(
@@ -647,6 +651,12 @@ private void abortDependencies(Task task, CompiTaskAbortedException e) {
647651
}
648652
}
649653

654+
private void abortParentForeach(CompiTaskAbortedException e, ForeachIteration iteration) {
655+
this.notifyTaskAborted(iteration.getParentForeachTask(), e);
656+
taskManager.setAborted(iteration.getParentForeachTask(), e);
657+
foreachAbortedNotificationsSent.add(iteration.getParentForeachTask());
658+
}
659+
650660
/**
651661
* Indicates that a {@link Task} is started to an external
652662
* {@link TaskExecutionHandler}

core/src/main/java/org/sing_group/compi/core/TaskManager.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,8 @@ public Set<Task> getDependantTasks(Task task) {
131131
* the running foreach
132132
*
133133
* @return The iterations of the given foreach
134-
* @throws IllegalStateException
135-
* if the foreach is not running yet. It should running before
136-
* calling this method (call {@link TaskManager#setRunning(Task)}
137-
* before
138134
*/
139135
public List<ForeachIteration> getForeachIterations(Foreach foreach) {
140-
if (!foreach.isRunning()) {
141-
throw new IllegalStateException("Foreach is not running yet");
142-
}
143136
return unmodifiableList(forEachTasks.get(foreach));
144137
}
145138

core/src/main/java/org/sing_group/compi/core/TaskRunnable.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,6 @@ public void run() {
104104
try {
105105
if (!this.task.isSkipped()) {
106106
taskStarted(this.task);
107-
108-
if (
109-
this.task instanceof ForeachIteration &&
110-
((ForeachIteration) task).getParentForeachTask().isAborted()
111-
) {
112-
if (!this.task.isAborted()) {
113-
taskAborted(this.task, ((ForeachIteration) task).getParentForeachTask().getAbortionCause());
114-
}
115-
}
116-
117107
this.process = this.getProcess(this.task);
118108
openLogBuffers(this.process);
119109
waitForProcess(this.process);

core/src/test/java/org/sing_group/compi/tests/PipelineTest.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,48 @@ public void testPipelineIterationBindedWithAbortedIterations() throws Exception
451451

452452
}
453453

454+
@Test
455+
public void testPipelineIterationBindedWithAbortedIterations2() throws Exception {
456+
final String pipelineFile =
457+
ClassLoader.getSystemResource("testPipelineIterationBindedWithAbortedIterations2.xml").getFile();
458+
459+
final CompiApp compi =
460+
new CompiApp(
461+
forPipeline(fromFile(new File(pipelineFile)), new File(pipelineFile)).whichRunsAMaximumOf(1)
462+
.build()
463+
);
464+
465+
TestExecutionHandler handler = new TestExecutionHandler();
466+
compi.addTaskExecutionHandler(handler);
467+
468+
compi.run();
469+
assertTrue(handler.getFinishedTasksIncludingLoopChildren().size() == 0);
470+
471+
}
472+
473+
@Test
474+
public void testPipelineIterationBindedWithAbortedIterations3() throws Exception {
475+
final String pipelineFile =
476+
ClassLoader.getSystemResource("testPipelineIterationBindedWithAbortedIterations3.xml").getFile();
477+
478+
final CompiApp compi =
479+
new CompiApp(
480+
forPipeline(fromFile(new File(pipelineFile)), new File(pipelineFile)).whichRunsAMaximumOf(1)
481+
.build()
482+
);
483+
484+
TestExecutionHandler handler = new TestExecutionHandler();
485+
compi.addTaskExecutionHandler(handler);
486+
487+
compi.run();
488+
assertTrue(handler.getFinishedTasksIncludingLoopChildren().size() == 3);
489+
assertTrue(handler.getAbortedTasks().contains("ID-1"));
490+
assertTrue(handler.getAbortedTasks().contains("ID-2"));
491+
assertTrue(handler.getAbortedTasks().contains("ID-3"));
492+
assertTrue(handler.getAbortedTasks().contains("ID-4"));
493+
assertTrue(handler.getAbortedTasks().contains("ID-5"));
494+
}
495+
454496
@Test
455497
public void testTaskExecutionHandler() throws Exception {
456498
final String pipelineFile = ClassLoader.getSystemResource("testExecutionHandler.xml").getFile();
@@ -467,6 +509,7 @@ public void testTaskExecutionHandler() throws Exception {
467509
compi.getPipeline().getTasks().stream().collect(Collectors.toMap(Task::getId, Function.identity()));
468510

469511
// TestExecutionHandler handler = new TestExecutionHandler();
512+
compi.addTaskExecutionHandler(new TestExecutionHandler());
470513

471514
@SuppressWarnings("unchecked")
472515
final Capture<ForeachIteration>[] capturesForTaskID1 = new Capture[6];
@@ -538,11 +581,13 @@ public void testTaskExecutionHandler() throws Exception {
538581
handler.taskAborted(eq(tasksById.get("ID3")), anyObject());
539582
expectLastCall();
540583

541-
// ID3: iteration 2, which aborts because ID3 aborted
584+
// ID3: iteration 2, runs normally
542585
handler.taskIterationStarted(capture(capturesForTaskID3[2]));
543586
expectLastCall();
544-
handler.taskIterationAborted(capture(capturesForTaskID3[3]), anyObject());
545-
expectLastCall();
587+
/*
588+
* handler.taskIterationAborted(capture(capturesForTaskID3[3]),
589+
* anyObject()); expectLastCall();
590+
*/
546591

547592
// task ID4 aborts because it depends on ID3
548593
handler.taskAborted(eq(tasksById.get("ID4")), anyObject());

core/src/test/java/org/sing_group/compi/tests/TestExecutionHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,21 @@ public class TestExecutionHandler implements TaskExecutionHandler {
4141

4242
@Override
4343
synchronized public void taskStarted(Task task) {
44+
// System.out.println("S_" + task.getId());
4445
startedTasks.add(task.getId());
4546
}
4647

4748
@Override
4849
public void taskFinished(Task task) {
50+
// System.out.println("E_" + task.getId());
4951
finishedTasks.add(task.getId());
5052
if (!(task instanceof Foreach))
5153
finishedTasksIncludingLoopChildren.add(task.getId());
5254
}
5355

5456
@Override
5557
public void taskAborted(Task task, CompiTaskAbortedException e) {
58+
// System.out.println("A_" + task.getId());
5659
abortedTasks.add(task.getId());
5760
}
5861

@@ -78,18 +81,24 @@ public List<String> getFinishedTasksIncludingLoopChildren() {
7881

7982
@Override
8083
public void taskIterationStarted(ForeachIteration iteration) {
84+
// System.out.println("S_" + iteration.getParentForeachTask().getId() + "_"
85+
// + iteration.getIterationIndex());
8186
startedForeachs.add(iteration.getParentForeachTask().getId());
8287
loopIterations.add("S_" + iteration.getParentForeachTask().getId() + "_" + iteration.getIterationIndex());
8388
}
8489

8590
@Override
8691
public void taskIterationFinished(ForeachIteration iteration) {
92+
// System.out.println("E_" + iteration.getParentForeachTask().getId() + "_"
93+
// + iteration.getIterationIndex());
8794
finishedTasksIncludingLoopChildren.add(iteration.getParentForeachTask().getId());
8895
loopIterations.add("E_" + iteration.getParentForeachTask().getId() + "_" + iteration.getIterationIndex());
8996
}
9097

9198
@Override
9299
public void taskIterationAborted(ForeachIteration iteration, CompiTaskAbortedException e) {
100+
// System.out.println("A_" + iteration.getParentForeachTask().getId() + "_"
101+
// + iteration.getIterationIndex()+": "+e.getMessage());
93102
// finishedTasksIncludingLoopChildren.add(iteration.getParentForeachTask().getId());
94103
loopIterations.add("A_" + iteration.getParentForeachTask().getId() + "_" + iteration.getIterationIndex());
95104

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
#%L
4+
Compi Core
5+
%%
6+
Copyright (C) 2016 - 2018 Daniel Glez-Peña, Osvaldo Graña-Castro, Hugo
7+
López-Fernández, Jesús Álvarez Casanova
8+
%%
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
#L%
21+
-->
22+
23+
<pipeline xmlns="http://www.sing-group.org/compi/pipeline-1.0"
24+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
25+
<version>1.0.0</version>
26+
<tasks>
27+
<foreach id="ID-1" of="list" in="-j,-j" as="param">
28+
ls ${param} /tmp
29+
</foreach>
30+
31+
<foreach id="ID-2" after="*ID-1" of="list" in="-l,-l" as="param">
32+
ls ${param} /tmp
33+
</foreach>
34+
</tasks>
35+
</pipeline>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
#%L
4+
Compi Core
5+
%%
6+
Copyright (C) 2016 - 2018 Daniel Glez-Peña, Osvaldo Graña-Castro, Hugo
7+
López-Fernández, Jesús Álvarez Casanova
8+
%%
9+
Licensed under the Apache License, Version 2.0 (the "License");
10+
you may not use this file except in compliance with the License.
11+
You may obtain a copy of the License at
12+
13+
http://www.apache.org/licenses/LICENSE-2.0
14+
15+
Unless required by applicable law or agreed to in writing, software
16+
distributed under the License is distributed on an "AS IS" BASIS,
17+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
See the License for the specific language governing permissions and
19+
limitations under the License.
20+
#L%
21+
-->
22+
23+
<pipeline xmlns="http://www.sing-group.org/compi/pipeline-1.0"
24+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
25+
<version>1.0.0</version>
26+
<tasks>
27+
<foreach id="ID-1" of="list" in="-j,-l,-j" as="param">
28+
ls ${param} /tmp
29+
</foreach>
30+
31+
<foreach id="ID-2" after="*ID-1" of="list" in="-l,-l,-l" as="param">
32+
ls ${param} /tmp
33+
</foreach>
34+
<task id="ID-3" after="ID-2">
35+
#should not run
36+
echo "should not run"
37+
</task>
38+
<task id="ID-4" after="ID-1">
39+
#should not run
40+
echo "should not run"
41+
</task>
42+
<foreach id="ID-5" after="*ID-2" of="list" in="-l,-l,-l" as="param">
43+
ls ${param} /tmp
44+
</foreach>
45+
</tasks>
46+
</pipeline>

dk/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<parent>
1010
<groupId>org.sing_group</groupId>
1111
<artifactId>compi</artifactId>
12-
<version>1.4.0</version>
12+
<version>1.4.1</version>
1313
<!--
1414
WARNING: change version using (in the parent project):
1515
mvn versions:set -DnewVersion=[new_version]

e2e-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<parent>
1010
<groupId>org.sing_group</groupId>
1111
<artifactId>compi</artifactId>
12-
<version>1.4.0</version>
12+
<version>1.4.1</version>
1313
<!--
1414
WARNING: change version using (in the parent project):
1515
mvn versions:set -DnewVersion=[new_version]

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<groupId>org.sing_group</groupId>
77
<artifactId>compi</artifactId>
88
<packaging>pom</packaging>
9-
<version>1.4.0</version>
9+
<version>1.4.1</version>
1010
<!-- WARNING: change version using (in the parent project): mvn versions:set -DnewVersion=[new_version] mvn versions:commit This will change the version
1111
in all modules at-once -->
1212

0 commit comments

Comments
 (0)