Skip to content

Commit 096dd3c

Browse files
task queue worker: enhance cron mode & sql persistent tools
1 parent 4587528 commit 096dd3c

File tree

9 files changed

+139
-73
lines changed

9 files changed

+139
-73
lines changed

codebase/app/task_queue_worker/graphql_schema.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ type JobResolver {
145145
status: String!
146146
created_at: String!
147147
finished_at: String!
148-
next_retry_at: String!
148+
next_running_at: String!
149+
is_cron_mode: Boolean!
149150
current_progress: Int!
150151
max_progress: Int!
151152
meta: JoDetailMetaResolver!

codebase/app/task_queue_worker/graphql_types.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"github.com/golangid/candi/candihelper"
9-
cronexpr "github.com/golangid/candi/candiutils/cronparser"
109
)
1110

1211
type (
@@ -109,9 +108,10 @@ type (
109108
ErrorStack string
110109
TraceID string
111110
RetryHistories []RetryHistory
112-
NextRetryAt string
113-
CurrentProgress int
114-
MaxProgress int
111+
NextRunningAt string
112+
IsCronMode bool
113+
CurrentProgress int64
114+
MaxProgress int64
115115
Meta struct {
116116
IsCloseSession bool
117117
Page int
@@ -280,20 +280,16 @@ func (j *JobResolver) ParseFromJob(job *Job, maxArgsLength int) {
280280
j.ErrorStack = job.ErrorStack
281281
j.TraceID = job.TraceID
282282
j.RetryHistories = job.RetryHistories
283-
j.NextRetryAt = job.NextRetryAt
284283
j.CurrentProgress = job.CurrentProgress
285284
j.MaxProgress = job.MaxProgress
286285
j.RetryHistories = job.RetryHistories
287286
if job.Status == string(StatusSuccess) {
288287
j.Error = ""
289288
}
289+
j.IsCronMode = job.IsCronMode()
290290

291291
if job.Status == string(StatusQueueing) {
292-
if delay, err := time.ParseDuration(job.Interval); err == nil {
293-
j.NextRetryAt = time.Now().Add(delay).In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
294-
} else if schedule, err := cronexpr.Parse(job.Interval); err == nil {
295-
j.NextRetryAt = schedule.Next(time.Now()).Format(time.RFC3339)
296-
}
292+
j.NextRunningAt = job.NextRunningAt.Format(time.RFC3339)
297293
}
298294
j.CreatedAt = job.CreatedAt.In(candihelper.AsiaJakartaLocalTime).Format(time.RFC3339)
299295
if !job.FinishedAt.IsZero() {

codebase/app/task_queue_worker/job_operation.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"math"
99
"net/http"
10+
"strconv"
1011
"strings"
1112
"time"
1213

@@ -26,6 +27,7 @@ type (
2627
MaxRetry int `json:"max_retry"`
2728
Args []byte `json:"args"`
2829
RetryInterval time.Duration `json:"retry_interval"`
30+
StartAt time.Time `json:"start_at"`
2931
CronExpression string `json:"cron_expression"`
3032

3133
direct bool `json:"-"`
@@ -95,6 +97,9 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
9597
if req.RetryInterval > 0 {
9698
newJob.Interval = req.RetryInterval.String()
9799
}
100+
if !req.StartAt.IsZero() {
101+
newJob.NextRunningAt = req.StartAt
102+
}
98103
if req.CronExpression != "" {
99104
if totalJob := engine.opt.persistent.CountAllJob(ctx, &Filter{
100105
TaskName: req.TaskName, MaxRetry: candihelper.WrapPtr(0),
@@ -103,7 +108,10 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
103108
return jobID, fmt.Errorf("there is running cron job in task '%s'", req.TaskName)
104109
}
105110
newJob.Interval = req.CronExpression
106-
newJob.NextRetryAt = req.schedule.Next(time.Now()).Format(time.RFC3339)
111+
if req.StartAt.IsZero() {
112+
req.StartAt = time.Now()
113+
}
114+
newJob.NextRunningAt = req.schedule.Next(req.StartAt)
107115
}
108116
newJob.Status = string(StatusQueueing)
109117
newJob.CreatedAt = time.Now()
@@ -243,8 +251,9 @@ func RetryJob(ctx context.Context, jobID string) error {
243251
if (job.Status == string(StatusFailure)) || (job.Retries >= job.MaxRetry) {
244252
job.Retries = 0
245253
}
254+
job.ParseNextRunningInterval()
246255
matched, affected, err := engine.opt.persistent.UpdateJob(ctx, &Filter{JobID: &job.ID}, map[string]any{
247-
"status": job.Status, "interval": job.Interval, "retries": job.Retries,
256+
"status": job.Status, "interval": job.Interval, "retries": job.Retries, "next_running_at": job.NextRunningAt,
248257
})
249258
if err != nil {
250259
logger.LogE(err.Error())
@@ -364,7 +373,7 @@ func RecalculateSummary(ctx context.Context) {
364373
}
365374

366375
// UpdateProgressJob api for update progress job
367-
func UpdateProgressJob(ctx context.Context, jobID string, numProcessed, maxProcess int) error {
376+
func UpdateProgressJob[T int | int64](ctx context.Context, jobID string, numProcessed, maxProcess T) error {
368377
if engine == nil {
369378
return errWorkerInactive
370379
}
@@ -396,3 +405,14 @@ func UpdateProgressJob(ctx context.Context, jobID string, numProcessed, maxProce
396405
}
397406
return nil
398407
}
408+
409+
// GetContextHeader get all header value from context
410+
func GetContextHeader(ctx *candishared.EventContext) (header ContextHeader) {
411+
ctxHeader := ctx.Header()
412+
header.Retries, _ = strconv.Atoi(ctxHeader[HeaderRetries])
413+
header.MaxRetries, _ = strconv.Atoi(ctxHeader[HeaderMaxRetries])
414+
header.Interval = ctxHeader[HeaderInterval]
415+
header.CurrentProgress, _ = strconv.ParseInt(ctxHeader[HeaderCurrentProgress], 10, 64)
416+
header.MaxProgress, _ = strconv.ParseInt(ctxHeader[HeaderMaxProgress], 10, 64)
417+
return
418+
}

codebase/app/task_queue_worker/persistent_model.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,18 @@ type Job struct {
172172
CreatedAt time.Time `bson:"created_at" json:"created_at"`
173173
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
174174
FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
175+
NextRunningAt time.Time `bson:"next_running_at" json:"next_running_at"`
175176
Status string `bson:"status" json:"status"`
176177
Error string `bson:"error" json:"error"`
177178
ErrorStack string `bson:"-" json:"error_stack"`
178179
Result string `bson:"result" json:"result"`
179180
TraceID string `bson:"trace_id" json:"trace_id"`
180-
CurrentProgress int `bson:"current_progress" json:"current_progress"`
181-
MaxProgress int `bson:"max_progress" json:"max_progress"`
181+
CurrentProgress int64 `bson:"current_progress" json:"current_progress"`
182+
MaxProgress int64 `bson:"max_progress" json:"max_progress"`
182183
RetryHistories []RetryHistory `bson:"retry_histories" json:"retry_histories"`
183-
NextRetryAt string `bson:"-" json:"-"`
184184

185185
direct bool `bson:"-" json:"-"`
186-
schedule cronexpr.Schedule `json:"-"`
186+
schedule cronexpr.Schedule `bson:"-" json:"-"`
187187
}
188188

189189
// RetryHistory model
@@ -212,6 +212,28 @@ func (job *Job) toMap() map[string]any {
212212
}
213213
}
214214

215+
func (j *Job) IsCronMode() bool {
216+
_, err := time.ParseDuration(j.Interval)
217+
return err != nil
218+
}
219+
220+
func (j *Job) ParseNextRunningInterval() (interval time.Duration, err error) {
221+
if !j.NextRunningAt.IsZero() && j.NextRunningAt.After(time.Now()) {
222+
interval = j.NextRunningAt.Sub(time.Now())
223+
return
224+
}
225+
interval, err = time.ParseDuration(j.Interval)
226+
if err != nil || interval <= 0 {
227+
schedule, err := cronexpr.Parse(j.Interval)
228+
if err != nil {
229+
return interval, err
230+
}
231+
interval = schedule.NextInterval(time.Now())
232+
}
233+
j.NextRunningAt = time.Now().Add(interval)
234+
return interval, nil
235+
}
236+
215237
// Configuration model
216238
type Configuration struct {
217239
Key string `bson:"key" json:"key"`

codebase/app/task_queue_worker/persistent_sql.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (s *SQLPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []
6464
sort = "DESC"
6565
}
6666
query := "SELECT " +
67-
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress") +
67+
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress", "next_running_at") +
6868
" FROM " + jobModelName + " " + where + " ORDER BY " + s.formatColumnName(strings.TrimPrefix(filter.Sort, "-")) + " " + sort
6969
if !filter.ShowAll {
7070
query += fmt.Sprintf(` LIMIT %d OFFSET %d `, filter.Limit, filter.CalculateOffset())
@@ -79,38 +79,42 @@ func (s *SQLPersistent) FindAllJob(ctx context.Context, filter *Filter) (jobs []
7979
for rows.Next() {
8080
var job Job
8181
var createdAt string
82-
var finishedAt, result sql.NullString
82+
var finishedAt, result, nextRunningAt sql.NullString
8383
if err := rows.Scan(
8484
&job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &createdAt,
8585
&finishedAt, &job.Status, &job.Error, &result, &job.TraceID, &job.CurrentProgress, &job.MaxProgress,
86+
&nextRunningAt,
8687
); err != nil {
8788
logger.LogE(err.Error())
8889
return
8990
}
9091
job.CreatedAt = s.parseDateString(createdAt).Time
9192
job.FinishedAt = s.parseDateString(finishedAt.String).Time
93+
job.NextRunningAt = s.parseDateString(nextRunningAt.String).Time
9294
job.Result = result.String
9395
jobs = append(jobs, job)
9496
}
9597

9698
return
9799
}
98100
func (s *SQLPersistent) FindJobByID(ctx context.Context, id string, filterHistory *Filter) (job Job, err error) {
99-
var finishedAt, result sql.NullString
101+
var finishedAt, result, nextRunningAt sql.NullString
100102
var createdAt string
101103
err = s.db.QueryRowContext(ctx, `SELECT `+
102-
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress")+
104+
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress", "next_running_at")+
103105
` FROM `+jobModelName+` WHERE id='`+id+`'`).
104106
Scan(
105107
&job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &createdAt,
106108
&finishedAt, &job.Status, &job.Error, &result, &job.TraceID, &job.CurrentProgress, &job.MaxProgress,
109+
&nextRunningAt,
107110
)
108111
if err != nil {
109112
logger.LogE(err.Error())
110113
return job, err
111114
}
112115
job.CreatedAt = s.parseDateString(createdAt).Time
113116
job.FinishedAt = s.parseDateString(finishedAt.String).Time
117+
job.NextRunningAt = s.parseDateString(nextRunningAt.String).Time
114118
job.Result = result.String
115119

116120
if filterHistory != nil {
@@ -194,10 +198,10 @@ func (s *SQLPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ..
194198
job.CreatedAt = time.Now()
195199
args = []any{
196200
job.ID, job.TaskName, job.Arguments, job.Retries, job.MaxRetry, job.Interval, s.parseDate(job.CreatedAt), s.parseDate(time.Now()), s.parseDate(job.FinishedAt),
197-
job.Status, job.Error, job.Result, job.TraceID, job.CurrentProgress, job.MaxProgress,
201+
job.Status, job.Error, job.Result, job.TraceID, job.CurrentProgress, job.MaxProgress, job.NextRunningAt,
198202
}
199203
query = "INSERT INTO " + jobModelName + " (" +
200-
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "updated_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress") +
204+
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "updated_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress", "next_running_at") +
201205
") VALUES (" + s.parameterize(len(args)) + ")"
202206
} else {
203207
args = []any{
@@ -288,16 +292,19 @@ func (s *SQLPersistent) CleanJob(ctx context.Context, filter *Filter) (affectedR
288292
return
289293
}
290294
func (s *SQLPersistent) DeleteJob(ctx context.Context, id string) (job Job, err error) {
291-
var createdAt, finishedAt, result sql.NullString
295+
var createdAt, finishedAt, result, nextRunningAt sql.NullString
292296
err = s.db.QueryRowContext(ctx, `SELECT `+
293-
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at", "status", "error", "result", "trace_id", "current_progress", "max_progress")+
297+
s.formatColumnName("id", "task_name", "arguments", "retries", "max_retry", "interval", "created_at", "finished_at",
298+
"status", "error", "result", "trace_id", "current_progress", "max_progress", "next_running_at")+
294299
` FROM `+jobModelName+` WHERE id=`+s.parameterize(1), id).
295300
Scan(
296301
&job.ID, &job.TaskName, &job.Arguments, &job.Retries, &job.MaxRetry, &job.Interval, &createdAt,
297302
&finishedAt, &job.Status, &job.Error, &result, &job.TraceID, &job.CurrentProgress, &job.MaxProgress,
303+
&nextRunningAt,
298304
)
299305
job.CreatedAt = s.parseDateString(createdAt.String).Time
300306
job.FinishedAt = s.parseDateString(finishedAt.String).Time
307+
job.NextRunningAt = s.parseDateString(nextRunningAt.String).Time
301308
job.Result = result.String
302309
logger.LogIfError(err)
303310
_, err = s.db.Exec(`DELETE FROM ` + jobModelName + ` WHERE id='` + id + `'`)

codebase/app/task_queue_worker/persistent_sql_tools.go

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,31 @@ import (
1010
"github.com/golangid/candi/logger"
1111
)
1212

13-
func generateAddColumnQuery(driverName, tableName, newColumnName, columnType string) (q []string) {
13+
type sqlQueryMigration struct {
14+
conditionQuery string
15+
executionQuery string
16+
}
17+
18+
func generateAdditionalColumnQuery(driverName, tableName, newColumnName, dataType string) (q sqlQueryMigration) {
1419
switch driverName {
1520
case "postgres":
16-
q = []string{
17-
`SELECT column_name FROM information_schema.columns ` +
18-
`WHERE table_name='` + tableName + `' AND column_name='` + newColumnName + `' AND table_catalog=(SELECT current_database());`,
19-
`ALTER TABLE ` + tableName + ` ADD COLUMN IF NOT EXISTS "` + newColumnName + `" ` + columnType,
20-
}
21+
q.conditionQuery = `SELECT column_name FROM information_schema.columns ` +
22+
`WHERE table_name='` + tableName + `' AND column_name='` + newColumnName + `' AND table_catalog=(SELECT current_database());`
23+
q.executionQuery = `ALTER TABLE ` + tableName + ` ADD COLUMN IF NOT EXISTS "` + newColumnName + `" ` + dataType
2124

2225
case "sqlite3":
23-
q = []string{
24-
`SELECT name FROM pragma_table_info('` + tableName + `') WHERE name='` + newColumnName + `'`,
25-
`ALTER TABLE ` + tableName + ` ADD COLUMN "` + newColumnName + `" ` + columnType,
26-
}
26+
q.conditionQuery = `SELECT name FROM pragma_table_info('` + tableName + `') WHERE name='` + newColumnName + `'`
27+
q.executionQuery = `ALTER TABLE ` + tableName + ` ADD COLUMN "` + newColumnName + `" ` + dataType
2728

2829
case "mysql":
29-
q = []string{
30-
"SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " +
31-
"WHERE `TABLE_NAME` = '" + tableName + "' AND `COLUMN_NAME` = '" + newColumnName + "' AND `TABLE_SCHEMA` = (SELECT DATABASE());",
32-
`ALTER TABLE ` + tableName + " ADD COLUMN `" + newColumnName + "` " + columnType,
30+
if d, ok := map[string]string{
31+
"TIMESTAMPTZ": "DATETIME(3)",
32+
}[dataType]; ok {
33+
dataType = d
3334
}
35+
q.conditionQuery = "SELECT `COLUMN_NAME` FROM `INFORMATION_SCHEMA`.`COLUMNS` " +
36+
"WHERE `TABLE_NAME` = '" + tableName + "' AND `COLUMN_NAME` = '" + newColumnName + "' AND `TABLE_SCHEMA` = (SELECT DATABASE());"
37+
q.executionQuery = `ALTER TABLE ` + tableName + " ADD COLUMN `" + newColumnName + "` " + dataType
3438
}
3539

3640
return q
@@ -55,8 +59,8 @@ func (s *SQLPersistent) initTable(db *sql.DB) {
5559
status VARCHAR(255) NOT NULL DEFAULT '',
5660
error TEXT NOT NULL DEFAULT '',
5761
trace_id VARCHAR(255) NOT NULL DEFAULT '',
58-
current_progress INTEGER NOT NULL DEFAULT 0,
59-
max_progress INTEGER NOT NULL DEFAULT 0
62+
current_progress BIGINT NOT NULL DEFAULT 0,
63+
max_progress BIGINT NOT NULL DEFAULT 0
6064
);
6165
CREATE INDEX IF NOT EXISTS idx_created_at ON ` + jobModelName + ` (created_at);
6266
CREATE INDEX IF NOT EXISTS idx_args_err ON ` + jobModelName + ` (arguments, error);
@@ -83,8 +87,8 @@ func (s *SQLPersistent) initTable(db *sql.DB) {
8387
status VARCHAR(255) NOT NULL DEFAULT '',
8488
error TEXT NOT NULL DEFAULT '',
8589
trace_id VARCHAR(255) NOT NULL DEFAULT '',
86-
start_at TIMESTAMP,
87-
end_at TIMESTAMP
90+
start_at TIMESTAMPTZ,
91+
end_at TIMESTAMPTZ
8892
);
8993
CREATE INDEX IF NOT EXISTS idx_job_id_history ON ` + jobHistoryModel + ` (job_id);
9094
CREATE INDEX IF NOT EXISTS idx_start_at_history ON ` + jobHistoryModel + ` (start_at);`,
@@ -112,8 +116,8 @@ func (s *SQLPersistent) initTable(db *sql.DB) {
112116
"`status` VARCHAR(255) NOT NULL," +
113117
"`error` TEXT NOT NULL," +
114118
"`trace_id` VARCHAR(255) NOT NULL," +
115-
"`current_progress` INTEGER NOT NULL," +
116-
"`max_progress` INTEGER NOT NULL," +
119+
"`current_progress` BIGINT NOT NULL," +
120+
"`max_progress` BIGINT NOT NULL," +
117121
`INDEX (created_at),
118122
INDEX (arguments(255), error(255)),
119123
INDEX (task_name, status, created_at),
@@ -157,21 +161,21 @@ func (s *SQLPersistent) initTable(db *sql.DB) {
157161
}
158162
}
159163

160-
extraQueries := [][]string{
161-
generateAddColumnQuery(s.driverName, jobModelName, "result", "TEXT"),
162-
generateAddColumnQuery(s.driverName, jobHistoryModel, "result", "TEXT"),
163-
generateAddColumnQuery(s.driverName, jobSummaryModelName, "is_hold", "BOOLEAN"),
164-
generateAddColumnQuery(s.driverName, jobSummaryModelName, "hold", "INTEGER"),
164+
extraQueries := []sqlQueryMigration{
165+
generateAdditionalColumnQuery(s.driverName, jobModelName, "result", "TEXT"),
166+
generateAdditionalColumnQuery(s.driverName, jobHistoryModel, "result", "TEXT"),
167+
generateAdditionalColumnQuery(s.driverName, jobSummaryModelName, "is_hold", "BOOLEAN"),
168+
generateAdditionalColumnQuery(s.driverName, jobSummaryModelName, "hold", "INTEGER"),
169+
generateAdditionalColumnQuery(s.driverName, jobModelName, "next_running_at", "TIMESTAMPTZ"),
165170
}
166-
for _, queries := range extraQueries {
167-
if queries[0] != "" {
171+
for _, q := range extraQueries {
172+
if q.conditionQuery != "" {
168173
var columnName string
169-
err := db.QueryRow(queries[0]).Scan(&columnName)
170-
if err == nil {
174+
if err := db.QueryRow(q.conditionQuery).Scan(&columnName); err == nil {
171175
continue
172176
}
173177
}
174-
if _, err := db.Exec(queries[1]); err != nil {
178+
if _, err := db.Exec(q.executionQuery); err != nil {
175179
logger.LogE(err.Error())
176180
}
177181
}

0 commit comments

Comments
 (0)