Skip to content

Commit 159f2aa

Browse files
authored
fix: Fixed tasks using Batch or Subtask plugin getting stuck in the WAITING state (#561)
Signed-off-by: Ruben Tordjman <144785435+DrRebus@users.noreply.github.com>
1 parent 087b139 commit 159f2aa

File tree

5 files changed

+223
-43
lines changed

5 files changed

+223
-43
lines changed

engine/collector_retry.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err
5353
SELECT id
5454
FROM "resolution"
5555
WHERE ((instance_id = $1 AND state = $2) OR
56-
((state = $3 OR state = $4) AND next_retry < NOW()))
56+
((state = $3 OR state = $4) AND next_retry < NOW()) OR
57+
(state = $5 AND next_retry > last_start AND next_retry < NOW()))
5758
LIMIT 1
5859
FOR UPDATE SKIP LOCKED
5960
)
@@ -62,7 +63,16 @@ func getUpdateErrorResolution(dbp zesty.DBProvider) (*resolution.Resolution, err
6263
var r resolution.Resolution
6364

6465
instanceID := utask.InstanceID
65-
if err := dbp.DB().SelectOne(&r, sqlStmt, instanceID, resolution.StateRetry, resolution.StateError, resolution.StateToAutorunDelayed); err != nil {
66+
err := dbp.DB().SelectOne(
67+
&r,
68+
sqlStmt,
69+
instanceID,
70+
resolution.StateRetry,
71+
resolution.StateError,
72+
resolution.StateToAutorunDelayed,
73+
resolution.StateWaiting,
74+
)
75+
if err != nil {
6676
return nil, pgjuju.Interpret(err)
6777
}
6878

engine/engine.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -610,12 +610,13 @@ forLoop:
610610

611611
utask.ReleaseResource("template:" + t.TemplateName)
612612
utask.ReleaseExecutionSlot()
613-
if err := resumeParentTask(dbp, t, sm, debugLogger); err != nil {
613+
if err := wakeParentTask(dbp, t, debugLogger); err != nil {
614614
debugLogger.WithError(err).Debugf("Engine: resolver(): failed to resume parent task: %s", err)
615615
}
616616
}
617617

618-
func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphore.Weighted, debugLogger *logrus.Entry) error {
618+
// wakeParentTask wakes up the current task's parent if needed by changing it's next_retry to now.
619+
func wakeParentTask(dbp zesty.DBProvider, currentTask *task.Task, debugLogger *logrus.Entry) error {
619620
parentTask, err := taskutils.ShouldResumeParentTask(dbp, currentTask)
620621
if err != nil {
621622
return err
@@ -625,9 +626,17 @@ func resumeParentTask(dbp zesty.DBProvider, currentTask *task.Task, sm *semaphor
625626
return nil
626627
}
627628

628-
debugLogger.Debugf("resuming parent task %q resolution %q", parentTask.PublicID, *parentTask.Resolution)
629-
debugLogger.WithFields(logrus.Fields{"task_id": parentTask.PublicID, "resolution_id": *parentTask.Resolution}).Debugf("resuming resolution %q as child task %q state changed", *parentTask.Resolution, currentTask.PublicID)
630-
return GetEngine().Resolve(*parentTask.Resolution, sm)
629+
res, err := resolution.LoadFromPublicID(dbp, *parentTask.Resolution)
630+
if err != nil {
631+
return err
632+
}
633+
634+
if _, err := res.UpdateNextRetry(dbp, time.Now()); err != nil {
635+
return errors.Annotatef(err, "next_retry update failure for parent task '%s'", parentTask.PublicID)
636+
}
637+
638+
debugLogger.Debugf("updated parent task %q resolution %q next_retry", parentTask.PublicID, *parentTask.Resolution)
639+
return nil
631640
}
632641

633642
func commit(dbp zesty.DBProvider, res *resolution.Resolution, t *task.Task) error {

engine/engine_test.go

Lines changed: 86 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1258,6 +1258,11 @@ func TestResolveSubTask(t *testing.T) {
12581258
require.NotNil(t, res)
12591259
assert.Equal(t, resolution.StateWaiting, res.State)
12601260

1261+
nextRetryBeforeRun := time.Time{}
1262+
if res.NextRetry != nil {
1263+
nextRetryBeforeRun = *res.NextRetry
1264+
}
1265+
12611266
for _, subtaskName := range []string{"subtaskCreation", "jsonInputSubtask", "templatingJsonInputSubtask"} {
12621267
subtaskCreationOutput := res.Steps[subtaskName].Output.(map[string]interface{})
12631268
subtaskPublicID := subtaskCreationOutput["id"].(string)
@@ -1285,27 +1290,25 @@ func TestResolveSubTask(t *testing.T) {
12851290
assert.Equal(t, res.TaskID, parentTaskToResume.ID)
12861291
}
12871292

1288-
// checking if the parent task is picked up after that the subtask is resolved.
1289-
// need to sleep a bit because the parent task is resumed asynchronously
1290-
ti := time.Second
1291-
i := time.Duration(0)
1292-
for i < ti {
1293-
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1294-
require.Nil(t, err)
1295-
if res.State != resolution.StateWaiting {
1296-
break
1297-
}
1293+
// checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved.
1294+
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1295+
require.Nil(t, err)
1296+
assert.NotNil(t, res.NextRetry)
1297+
assert.False(t, res.NextRetry.IsZero())
1298+
assert.True(t, res.NextRetry.After(nextRetryBeforeRun))
1299+
assert.True(t, res.NextRetry.Before(time.Now()))
12981300

1299-
time.Sleep(time.Millisecond * 10)
1300-
i += time.Millisecond * 10
1301-
}
1301+
// Starting the RetryCollector to resume the parent task
1302+
ctx, cancelFunc := context.WithCancel(context.Background())
1303+
defer cancelFunc()
1304+
engine.RetryCollector(ctx)
13021305

1303-
ti = time.Second
1304-
i = time.Duration(0)
1306+
ti := time.Second
1307+
i := time.Duration(0)
13051308
for i < ti {
13061309
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
13071310
require.Nil(t, err)
1308-
if res.State != resolution.StateRunning {
1311+
if res.State == resolution.StateDone {
13091312
break
13101313
}
13111314

@@ -1409,6 +1412,11 @@ func TestBatch(t *testing.T) {
14091412
require.NotNil(t, res)
14101413
assert.Equal(t, resolution.StateWaiting, res.State)
14111414

1415+
nextRetryBeforeRun := time.Time{}
1416+
if res.NextRetry != nil {
1417+
nextRetryBeforeRun = *res.NextRetry
1418+
}
1419+
14121420
for _, batchStepName := range []string{"batchJsonInputs", "batchYamlInputs"} {
14131421
batchStepMetadataRaw, ok := res.Steps[batchStepName].Metadata.(string)
14141422
assert.True(t, ok, "wrong type of metadata for step '%s'", batchStepName)
@@ -1463,32 +1471,79 @@ func TestBatch(t *testing.T) {
14631471
}
14641472
}
14651473

1466-
// checking if the parent task is picked up after the subtask is resolved.
1467-
// We need to sleep a bit because the parent task is resumed asynchronously
1474+
// checking whether the parent task will be picked up by the RetryCollector after the subtask is resolved.
1475+
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1476+
require.Nil(t, err)
1477+
assert.NotNil(t, res.NextRetry)
1478+
assert.False(t, res.NextRetry.IsZero())
1479+
assert.True(t, res.NextRetry.After(nextRetryBeforeRun))
1480+
assert.True(t, res.NextRetry.Before(time.Now()))
1481+
1482+
// Starting the RetryCollector to resume the parent task
1483+
ctx, cancelFunc := context.WithCancel(context.Background())
1484+
defer cancelFunc()
1485+
engine.RetryCollector(ctx)
1486+
14681487
ti := time.Second
14691488
i := time.Duration(0)
14701489
for i < ti {
14711490
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
14721491
require.Nil(t, err)
1473-
if res.State != resolution.StateWaiting {
1492+
if res.State == resolution.StateDone {
14741493
break
14751494
}
14761495

14771496
time.Sleep(time.Millisecond * 10)
14781497
i += time.Millisecond * 10
14791498
}
1499+
assert.Equal(t, resolution.StateDone, res.State)
1500+
}
14801501

1481-
ti = time.Second
1482-
i = time.Duration(0)
1483-
for i < ti {
1484-
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1485-
require.Nil(t, err)
1486-
if res.State != resolution.StateRunning {
1487-
break
1488-
}
1502+
func TestWakeParent(t *testing.T) {
1503+
dbp, err := zesty.NewDBProvider(utask.DBName)
1504+
require.Nil(t, err)
14891505

1490-
time.Sleep(time.Millisecond * 10)
1491-
i += time.Millisecond * 10
1492-
}
1493-
assert.Equal(t, resolution.StateDone, res.State)
1506+
// Create a task that spawns two simple subtasks
1507+
_, err = templateFromYAML(dbp, "no-output.yaml")
1508+
require.Nil(t, err)
1509+
1510+
res, err := createResolution("noOutputSubtask.yaml", map[string]interface{}{}, nil)
1511+
require.Nil(t, err, "failed to create resolution: %s", err)
1512+
1513+
res, err = runResolution(res)
1514+
require.Nil(t, err)
1515+
require.NotNil(t, res)
1516+
require.Equal(t, resolution.StateWaiting, res.State)
1517+
assert.True(t, res.NextRetry.IsZero())
1518+
1519+
// Force the parent task to the RUNNING state
1520+
res.SetState(resolution.StateRunning)
1521+
res.Update(dbp)
1522+
1523+
// Create and run one of the subtasks
1524+
subtaskCreationOutput := res.Steps["subtaskCreation"].Output.(map[string]interface{})
1525+
subtaskPublicID := subtaskCreationOutput["id"].(string)
1526+
1527+
subtask, err := task.LoadFromPublicID(dbp, subtaskPublicID)
1528+
require.Nil(t, err)
1529+
require.Equal(t, task.StateTODO, subtask.State)
1530+
1531+
subtaskResolution, err := resolution.Create(dbp, subtask, nil, "", false, nil)
1532+
require.Nil(t, err)
1533+
1534+
beforeRun := time.Now()
1535+
subtaskResolution, err = runResolution(subtaskResolution)
1536+
require.Nil(t, err)
1537+
require.Equal(t, task.StateDone, subtaskResolution.State)
1538+
afterRun := time.Now()
1539+
1540+
// Refreshing parent resolution to check its next_retry value
1541+
res, err = resolution.LoadFromPublicID(dbp, res.PublicID)
1542+
require.Nil(t, err)
1543+
assert.Equal(t, res.State, resolution.StateRunning) // Parent should still be RUNNING
1544+
1545+
// The parent's next_retry should have been updated so that the RetryCollector would pick it up
1546+
assert.NotNil(t, res.NextRetry)
1547+
assert.True(t, res.NextRetry.After(beforeRun))
1548+
assert.True(t, res.NextRetry.Before(afterRun))
14941549
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name: noOutputSubtaskTemplate
2+
description: Template that spawns two simple subtasks
3+
title_format: "[test] simple subtasks template test"
4+
5+
steps:
6+
subtaskCreation:
7+
description: creating a subtask
8+
action:
9+
type: subtask
10+
configuration:
11+
template: no-output
12+
13+
otherSubtaskCreation:
14+
description: creating another subtask
15+
action:
16+
type: subtask
17+
configuration:
18+
template: no-output

models/resolution/resolution.go

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import (
66
"encoding/json"
77
"time"
88

9+
"github.com/Masterminds/squirrel"
10+
"github.com/gofrs/uuid"
11+
"github.com/juju/errors"
12+
"github.com/loopfz/gadgeto/zesty"
13+
914
"github.com/ovh/utask"
1015
"github.com/ovh/utask/db/pgjuju"
1116
"github.com/ovh/utask/db/sqlgenerator"
@@ -17,11 +22,6 @@ import (
1722
"github.com/ovh/utask/pkg/compress"
1823
"github.com/ovh/utask/pkg/now"
1924
"github.com/ovh/utask/pkg/utils"
20-
21-
"github.com/Masterminds/squirrel"
22-
"github.com/gofrs/uuid"
23-
"github.com/juju/errors"
24-
"github.com/loopfz/gadgeto/zesty"
2525
)
2626

2727
// all valid resolution states
@@ -410,6 +410,22 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) {
410410
// force empty to stop using old crypto code
411411
r.CryptKey = []byte{}
412412

413+
var nextRetry time.Time
414+
if r.NextRetry == nil || r.NextRetry.IsZero() {
415+
// No local next_retry value. It may have been set in DB since the last read, so we need to refresh it
416+
nextRetry, err = getNextRetry(dbp, r.ID)
417+
if err != nil {
418+
return err
419+
}
420+
} else {
421+
// Updating the next_retry individually to prevent overriding it with a nil value or a later date
422+
nextRetry, err = r.UpdateNextRetry(dbp, *r.NextRetry)
423+
if err != nil {
424+
return errors.Annotatef(err, "failed to update resolution's next_retry")
425+
}
426+
}
427+
r.NextRetry = &nextRetry
428+
413429
rows, err := dbp.DB().Update(&r.DBModel)
414430
if err != nil {
415431
return pgjuju.Interpret(err)
@@ -420,6 +436,63 @@ func (r *Resolution) Update(dbp zesty.DBProvider) (err error) {
420436
return nil
421437
}
422438

439+
// UpdateNextRetry updates the Resolution's next_retry field while respecting the current next_retry value in DB. It
440+
// can only shorten the time before the resolution will be retried next, not increase it.
441+
func (r *Resolution) UpdateNextRetry(dbp zesty.DBProvider, newNextRetry time.Time) (time.Time, error) {
442+
sp, err := dbp.TxSavepoint()
443+
if err != nil {
444+
return time.Time{}, err
445+
}
446+
defer dbp.RollbackTo(sp) //nolint:errcheck
447+
448+
// Using the Resolution's ID to soft lock the update and prevent concurrent updates
449+
if _, err := dbp.DB().Exec(`SELECT pg_advisory_xact_lock($1)`, r.ID); err != nil {
450+
return time.Time{}, err
451+
}
452+
453+
query, params, err := sqlgenerator.PGsql.
454+
Update("resolution").
455+
Where(squirrel.Eq{"public_id": r.PublicID}).
456+
// Is the new next_retry valid
457+
Where("? > last_start", newNextRetry).
458+
// And, is the current next_retry outdated or further in time than the new one
459+
Where("(next_retry < last_start OR ? < next_retry OR next_retry IS NULL)", newNextRetry).
460+
Set("next_retry", newNextRetry).
461+
Suffix("RETURNING next_retry").
462+
ToSql()
463+
if err != nil {
464+
return time.Time{}, err
465+
}
466+
467+
res, err := dbp.DB().Query(query, params...)
468+
if err != nil {
469+
return time.Time{}, err
470+
}
471+
defer res.Close()
472+
473+
if !res.Next() {
474+
// Update returned no result
475+
if err := res.Err(); err != nil {
476+
// An error happened when reading the query's result
477+
return time.Time{}, err
478+
}
479+
480+
// The nextRetry wasn't updated, we need to fetch its current value
481+
nextRetry, err := getNextRetry(dbp, r.ID)
482+
if err != nil {
483+
return time.Time{}, err
484+
}
485+
return nextRetry, nil
486+
}
487+
488+
var nextRetry time.Time
489+
if err := res.Scan(&nextRetry); err != nil {
490+
return time.Time{}, err
491+
}
492+
493+
return nextRetry, dbp.Commit()
494+
}
495+
423496
// Delete removes the Resolution from DB
424497
func (r *Resolution) Delete(dbp zesty.DBProvider) (err error) {
425498
defer errors.DeferredAnnotatef(&err, "Failed to update resolution")
@@ -581,6 +654,21 @@ func (r *Resolution) SetInput(input map[string]interface{}) {
581654
r.ResolverInput = input
582655
}
583656

657+
// getNextRetry fetches from the database the current next_retry value of the resolution with given ID
658+
func getNextRetry(dbp zesty.DBProvider, resolutionID int64) (time.Time, error) {
659+
var tmpRes Resolution
660+
err := dbp.DB().SelectOne(&tmpRes, `SELECT next_retry FROM resolution WHERE id = $1`, resolutionID)
661+
if err != nil {
662+
return time.Time{}, err
663+
}
664+
665+
if tmpRes.NextRetry == nil {
666+
return time.Time{}, nil
667+
}
668+
669+
return *tmpRes.NextRetry, nil
670+
}
671+
584672
var rSelector = sqlgenerator.PGsql.Select(
585673
`"resolution".id, "resolution".public_id, "resolution".id_task, "resolution".resolver_username, "resolution".state, "resolution".instance_id, "resolution".created, "resolution".last_start, "resolution".last_stop, "resolution".next_retry, "resolution".run_count, "resolution".run_max, "resolution".crypt_key, "resolution".encrypted_steps, "resolution".steps_compression_alg, "resolution".encrypted_resolver_input, "resolution".base_configurations, "task".public_id as task_public_id, "task".title as task_title`,
586674
).From(

0 commit comments

Comments
 (0)