Skip to content

Commit 00ec12e

Browse files
committed
refactor: move function from reader to DBContext
1 parent 27fb54e commit 00ec12e

File tree

2 files changed

+25
-25
lines changed

2 files changed

+25
-25
lines changed

db_context.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,30 @@ func (c *DBContext) getCurrentTimestampInUTC() string {
110110
}
111111
}
112112

113+
func (c *DBContext) buildSelectMessagesQuery() string {
114+
limitPlaceholder := c.getSQLPlaceholder(1)
115+
116+
switch c.dialect {
117+
case SQLDialectOracle:
118+
return fmt.Sprintf(`SELECT id, payload, created_at, scheduled_at, metadata, times_attempted
119+
FROM outbox
120+
WHERE scheduled_at <= %s
121+
ORDER BY created_at ASC FETCH FIRST %s ROWS ONLY`, c.getCurrentTimestampInUTC(), limitPlaceholder)
122+
123+
case SQLDialectSQLServer:
124+
return fmt.Sprintf(`SELECT TOP (%s) id, payload, created_at, scheduled_at, metadata, times_attempted
125+
FROM outbox
126+
WHERE scheduled_at <= %s
127+
ORDER BY created_at ASC`, limitPlaceholder, c.getCurrentTimestampInUTC())
128+
129+
default:
130+
return fmt.Sprintf(`SELECT id, payload, created_at, scheduled_at, metadata, times_attempted
131+
FROM outbox
132+
WHERE scheduled_at <= %s
133+
ORDER BY created_at ASC LIMIT %s`, c.getCurrentTimestampInUTC(), limitPlaceholder)
134+
}
135+
}
136+
113137
// txAdapter is a wrapper around a sql.Tx that implements the Tx interface.
114138
type txAdapter struct {
115139
tx *sql.Tx

reader.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ func (r *Reader) readOutboxMessages() ([]*Message, error) {
538538
defer cancel()
539539

540540
// nolint:gosec
541-
query := r.buildSelectMessagesQuery()
541+
query := r.dbCtx.buildSelectMessagesQuery()
542542
rows, err := r.dbCtx.db.QueryContext(ctx, query, r.maxMessages)
543543
if err != nil {
544544
return nil, fmt.Errorf("failed to query outbox messages: %w", err)
@@ -560,27 +560,3 @@ func (r *Reader) readOutboxMessages() ([]*Message, error) {
560560
}
561561
return messages, nil
562562
}
563-
564-
func (r *Reader) buildSelectMessagesQuery() string {
565-
limitPlaceholder := r.dbCtx.getSQLPlaceholder(1)
566-
567-
switch r.dbCtx.dialect {
568-
case SQLDialectOracle:
569-
return fmt.Sprintf(`SELECT id, payload, created_at, scheduled_at, metadata, times_attempted
570-
FROM outbox
571-
WHERE scheduled_at <= %s
572-
ORDER BY created_at ASC FETCH FIRST %s ROWS ONLY`, r.dbCtx.getCurrentTimestampInUTC(), limitPlaceholder)
573-
574-
case SQLDialectSQLServer:
575-
return fmt.Sprintf(`SELECT TOP (%s) id, payload, created_at, scheduled_at, metadata, times_attempted
576-
FROM outbox
577-
WHERE scheduled_at <= %s
578-
ORDER BY created_at ASC`, limitPlaceholder, r.dbCtx.getCurrentTimestampInUTC())
579-
580-
default:
581-
return fmt.Sprintf(`SELECT id, payload, created_at, scheduled_at, metadata, times_attempted
582-
FROM outbox
583-
WHERE scheduled_at <= %s
584-
ORDER BY created_at ASC LIMIT %s`, r.dbCtx.getCurrentTimestampInUTC(), limitPlaceholder)
585-
}
586-
}

0 commit comments

Comments
 (0)