Skip to content

PavelAgarkov/rate-envelope-queue

Repository files navigation

rate-envelope-queue

A lightweight, goroutine-safe wrapper around k8s.io/client-go/util/workqueue for managing tasks (envelopes) with a fixed worker pool, periodic scheduling, deadlines, hooks, and stamps (middleware). Adds a safe queue lifecycle (Start/Stop/Start), buffering before first start, and queue capacity limiting.

Under the hood it uses workqueue.TypedRateLimitingInterface. Deduplication happens by pointer to the element: repeated Add of the same pointer while it is in-flight is ignored.


Table of Contents


Features

Goroutine-safe in-memory queue for your service:

  • All public methods are safe to call from multiple goroutines. Send can be called concurrently. Multiple workers are supported via limit. Start/Stop are serialized internally.
  • This is not a distributed queue: there are no guarantees across processes/hosts. Ensure your hooks/invokers are thread-safe around shared state.

What you get:

  • A clear lifecycle FSM: init → running → stopping → stopped
  • Both one-off and periodic tasks
  • Middleware chain via Stamp
  • Hooks: before/after/failure/success
  • Capacity accounting (quota)
  • Graceful or fast stop (Drain / Stop) and restartable queues (Start() after Stop())

Error/Hook semantics:

  • ErrStopEnvelope — intentional stop of a specific envelope:
    • the envelope is forgotten, not rescheduled;
    • if raised in beforeHook/invoke, the afterHook still runs (within a time-bounded context); successHook does not run.
  • context.Canceled / context.DeadlineExceeded — not a success:
    • envelope is forgotten; periodic ones are rescheduled, one-off ones are not.
  • Any other error:
    • periodic → rescheduled (if queue is alive);
    • one-off → defer to failureHook decision (RetryNow / RetryAfter / Drop).
  • Each hook runs with its own timeout: a fraction frac=0.5 of the envelope's deadline, but at least hardHookLimit (800ms). Hook timeouts are derived from the task context tctx, so hooks never outlive the envelope deadline.

Concurrency controls (brief):

  • stateMu guards the FSM state (RLock read / Lock write)
  • lifecycleMu serializes Start/Stop/queue swap
  • queueMu guards the inner workqueue pointer
  • pendingMu guards the pre-start buffer
  • run is an atomic fast flag for “queue alive”
  • Capacity accounting is atomic via tryReserve/inc/dec/unreserve

Other highlights:

  • Worker pool via WithLimitOption(n)
  • Start/Stop/Start: tasks sent before first start are buffered and flushed on Start()
  • Periodic vs one-off: interval > 0 means periodic; interval == 0 means one-off
  • Deadlines: deadline > 0 bounds invoke time via context.WithTimeout in the worker
  • Stamps: both global (queue-level) and per-envelope (task-level), with predictable execution order
  • Panic safety: panics inside task are handled (Forget+Done) and logged with stack; worker keeps running
  • Prometheus metrics: use client-go workqueue metrics

Installation

go get github.com/PavelAgarkov/rate-envelope-queue

Recommended pins (compatible with this package):

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

Requires: Go 1.24+.


Quick Start

See full examples in examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Capacity scenarios (accounting correctness):

Drain + waiting=true — wait for all workers; all dec() happen; no remainder.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true — after wg.Wait() we subtract the “tail” (cur - pend), the counter converges.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

Unlimited capacityWithAllowedCapacityOption(0) removes admission limits; the currentCapacity metric still reflects actual occupancy.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

Minimal API sketch:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // one-off
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // periodic
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // restart if needed

Concepts & Contracts

Envelope

e, err := NewEnvelope(
    WithId(123), // optional, for logs
    WithType("my_task"), // optional, for logs
    WithScheduleModeInterval(time.Second), // 0 = one-off
    WithDeadline(500*time.Millisecond),    // 0 = no deadline
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // required
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // Drop by default
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Validation:

  • invoke is required; interval >= 0; deadline >= 0
  • For periodic: deadline must not exceed intervalErrAdditionEnvelopeToQueueBadIntervals

Special error:

  • ErrStopEnvelope — gracefully stops this envelope only (no reschedule)

Queue

q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = unlimited
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded on overflow
q.Stop()

Pre-start buffer. In init, Send() pushes envelopes into an internal buffer; on Start() they are flushed into the workqueue.

Stamps (middleware)

type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

Order: global stamps (outer) wrap per-envelope stamps (inner), then Envelope.invoke.

A sample LoggingStamp() is provided for demonstration.


Worker Behavior

Result / condition Queue action
invoke returns nil Forget; if interval > 0 and alive → AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; if periodic and alive → AddAfter(interval)
ErrStopEnvelope Forget; no reschedule
Error on periodic Forget; if alive → AddAfter(interval)
Error on one-off + failureHook Use decision: RetryNow / RetryAfter(d) / Drop
Panic in task Forget + Done + stack log; worker continues

“Queue is alive” = run == true, state is running, base context not done, and workqueue not shutting down.


Stop Modes

Waiting \ StopMode Drain (graceful) Stop (fast)
true Wait for workers; ShutDownWithDrain() Wait for workers; ShutDown()
false No wait; ShutDownWithDrain() Immediate stop; ShutDown()

After Stop() you can call Start() again: a fresh inner workqueue will be created.


Capacity Limiting

WithAllowedCapacityOption(cap) limits the total number of in-flight/queued/delayed items (including reschedules).
If the limit would be exceeded, Send() returns ErrAllowedQueueCapacityExceeded.
currentCapacity is updated on add, reschedule, and completion.

  • cap == 0unlimited admission; the currentCapacity metric still tracks actual occupancy.
  • Stop + waiting=false + StopMode=Stop — documented tail leakage in accounting. Use Drain or waiting=true for accurate capacity convergence.

Benchmarks

Command examples:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Numbers provided by the author (your CPU/env will vary):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

Metrics (Prometheus)

Workqueue metrics are enabled via blank import:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Your queue name (QueueConfig.Name) is included in workqueue metric labels (workqueue_*: adds, depth, work_duration, retries, etc.).


Examples

See the examples/ folder for runnable snippets covering one-off jobs, periodic schedules, combined modes, and dynamic dispatch.


License

MIT — see LICENSE.



rate-envelope-queue (Русская версия)

Лёгкая, потокобезопасная обёртка над k8s.io/client-go/util/workqueue для управления задачами (envelopes) с фиксированным пулом воркеров, периодическим планированием, дедлайнами, хуками и stamps (middleware). Добавляет безопасный жизненный цикл очереди (Start/Stop/Start), буферизацию задач до старта и ограничение ёмкости очереди.

В основе — workqueue.TypedRateLimitingInterface. Дедупликация происходит по указателю на элемент: повторный Add того же указателя пока он в обработке — игнорируется.


Содержание


Ключевые возможности

Потокобезопасная локальная очередь в памяти приложения:

  • Все публичные методы безопасны при вызовах из нескольких горутин. Send можно вызывать параллельно. Воркеров может быть несколько (limit). Вызовы Start/Stop сериализуются внутри.
  • Это не распределённая очередь: гарантий между разными процессами/узлами нет. Код хуков/инвокеров должен сам обеспечивать потокобезопасность при доступе к общим ресурсам.

Что внутри:

  • Прозрачный автомат состояний: init → running → stopping → stopped
  • Одноразовые и периодические задачи
  • Цепочка middleware через Stamp
  • Хуки: before/after/failure/success
  • Учёт ёмкости (quota)
  • Мягкий или быстрый останов (Drain / Stop) и повторный старт (Start() после Stop())

Семантика ошибок и хуков:

  • ErrStopEnvelope — намеренная остановка конкретного конверта:
    • конверт забывается, не перепланируется;
    • если ошибка возникла в beforeHook/invoke, afterHook всё равно вызовется (с ограниченным временем); successHook не вызывается.
  • context.Canceled / context.DeadlineExceeded — это не успех:
    • конверт забывается; периодический — перепланируется, одноразовый — нет.
  • Любая другая ошибка:
    • периодическая → перепланируется (если очередь «жива»);
    • одноразовая → решение через failureHook (RetryNow / RetryAfter / Drop).
  • Каждый хук выполняется с собственным таймаутом: доля frac=0.5 от deadline конверта, но не меньше hardHookLimit (800мс). Таймауты «висят» на tctx, т.е. хуки никогда не переживут дедлайн конверта.

Потокобезопасность (коротко):

  • stateMu — чтение/запись состояния
  • lifecycleMu — сериализация Start/Stop/смены очереди
  • queueMu — доступ к внутренней очереди
  • pendingMu — буфер задач до старта
  • run — атомарный флаг «жива ли очередь»
  • Учёт ёмкости — атомарные операции tryReserve/inc/dec/unreserve

Прочее:

  • Пул воркеров: WithLimitOption(n)
  • Start/Stop/Start: задачи, добавленные до первого Start(), буферизуются и переливаются в очередь при старте
  • Периодические / одноразовые: interval > 0 — периодическая; interval == 0 — одноразовая
  • Дедлайны: deadline > 0 ограничивает время invoke через context.WithTimeout
  • Stamps: глобальные и на уровне конверта, порядок выполнения предсказуем
  • Защита от паник: паника в задаче → Forget+Done и лог стека; воркер продолжает работу
  • Метрики Prometheus: из client-go workqueue

Установка

go get github.com/PavelAgarkov/rate-envelope-queue

Рекомендуемые версии:

go get k8s.io/client-go@v0.34.0
go get k8s.io/component-base@v0.34.0

Требования: Go 1.24+.


Быстрый старт

Смотрите каталог examples/:

  • queue_with_simple_start_stop_dynamic_execute.go
  • simple_queue_with_simple_preset_envelopes.go
  • simple_queue_with_simple_schedule_envelopes.go
  • simple_queue_with_simple_dynamic_envelopes.go
  • simple_queue_with_simple_combine_envelopes.go

Сценарии ёмкости (корректность учёта):

Drain + waiting=true — дожидаемся всех воркеров; все dec() прошли; остатка нет.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(50),
)

Stop + waiting=true — после wg.Wait() снимается «хвост» (cur - pend), счётчик сходится.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Stop),
    WithAllowedCapacityOption(50),
)

Безлимитная ёмкостьWithAllowedCapacityOption(0) убирает ограничение приёма; метрика currentCapacity отражает фактическую занятость.

envelopeQueue := NewRateEnvelopeQueue(
    parent,
    "test_queue",
    WithLimitOption(5),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(0),
)

Мини‑пример:

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// тут будут другие примеры
q := NewRateEnvelopeQueue(
    ctx,
    "emails",
    WithLimitOption(4),
    WithWaitingOption(true),
    WithStopModeOption(Drain),
    WithAllowedCapacityOption(1000),
    WithStamps(LoggingStamp()),
)

emailOnce, _ := NewEnvelope(
    WithId(1),
    WithType("email"),
    WithScheduleModeInterval(0),          // одноразовая
    WithDeadline(3*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

ticker, _ := NewEnvelope(
    WithId(2),
    WithType("metrics"),
    WithScheduleModeInterval(5*time.Second), // периодическая
    WithDeadline(2*time.Second),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }),
)

q.Start()
_ = q.Send(emailOnce, ticker)
// ...
q.Stop()
q.Start() // при необходимости можно снова стартовать

Концепции и контракты

Envelope

e, err := NewEnvelope(
    WithId(123),                   // опционально, для логов
    WithType("my_task"),           // опционально, для логов
    WithScheduleModeInterval(time.Second), // 0 = одноразовая
    WithDeadline(500*time.Millisecond),    // 0 = без дедлайна
    WithBeforeHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithInvoke(func(ctx context.Context, e *Envelope) error { return nil }), // обязательно
    WithAfterHook(func(ctx context.Context, e *Envelope) error { return nil }),
    WithFailureHook(func(ctx context.Context, e *Envelope, err error) Decision {
        return DefaultOnceDecision()               // по умолчанию Drop
        // return RetryOnceAfterDecision(5 * time.Second)
        // return RetryOnceNowDecision()
    }),
    WithSuccessHook(func(ctx context.Context, e *Envelope) {}),
    WithStampsPerEnvelope(LoggingStamp()),
    WithPayload(myPayload),
)

Валидация:

  • invoke обязателен; interval >= 0; deadline >= 0
  • Для периодических: deadline не должен превышать intervalErrAdditionEnvelopeToQueueBadIntervals

Спец‑ошибка:

  • ErrStopEnvelope — корректно прекращает только этот конверт (без перепланирования)

Очередь

q := NewRateEnvelopeQueue(ctx, "queue-name",
    WithLimitOption(n),
    WithWaitingOption(true|false),
    WithStopModeOption(Drain|Stop),
    WithAllowedCapacityOption(cap),         // 0 = без лимита
    WithWorkqueueConfigOption(conf),
    WithLimiterOption(limiter),
    WithStamps(stamps...),
)
q.Start()
err := q.Send(e1, e2, e3)                   // ErrAllowedQueueCapacityExceeded при переполнении
q.Stop()

Буфер до старта. В состоянии init Send() складывает задачи во внутренний буфер; при Start() — они переливаются в workqueue.

Stamps (middleware)

type (
    Invoker  func(ctx context.Context, envelope *Envelope) error
    Stamp    func(next Invoker) Invoker
)

Порядок: сначала глобальные stamps (внешние), затем per‑envelope (внутренние), после — Envelope.invoke.

LoggingStamp() — пример для иллюстрации (не «серебряная пуля» для продакшена).


Поведение воркера

Событие / результат Действие очереди
invoke вернул nil Forget; если interval > 0 и очередь «жива» → AddAfter(interval)
context.Canceled / DeadlineExceeded Forget; если периодическая и очередь «жива» → AddAfter(interval)
ErrStopEnvelope Forget; не перепланируем
Ошибка у периодической Forget; если очередь «жива» → AddAfter(interval)
Ошибка у одноразовой + failureHook Решение пользователя: RetryNow / RetryAfter(d) / Drop
Паника в задаче Forget + Done + лог стека; воркер продолжает работу

«Очередь жива» = run == true, state == running, базовый контекст не завершён и workqueue не в shutdown.


Режимы остановки

Waiting \ StopMode Drain (мягкая) Stop (жёсткая)
true Ждать воркеров; ShutDownWithDrain() Ждать воркеров; ShutDown()
false Без ожидания воркеров; ShutDownWithDrain() Мгновенный останов; ShutDown()

После Stop() можно вызывать Start() повторно: создаётся новый внутренний workqueue.


Ограничение ёмкости

WithAllowedCapacityOption(cap) ограничивает суммарное число элементов в системе (включая перепланированные).
При попытке превышения лимита Send() возвращает ErrAllowedQueueCapacityExceeded.
currentCapacity обновляется при добавлении, перепланировании и завершении обработки.

  • cap == 0безлимит по приёму; метрика currentCapacity отражает фактическую занятость.
  • Stop + waiting=false + StopMode=Stop — документированная утечка «хвоста» в учёте. Для точной сходимости используйте Drain или waiting=true.

Бенчмарки

Как запускать:

go test -bench=BenchmarkQueueFull -benchmem
go test -bench=BenchmarkQueueInterval -benchmem

Цифры автора (зависят от CPU/окружения):

BenchmarkQueueFull-8         3212882               348.7 ns/op            40 B/op          1 allocs/op
BenchmarkQueueInterval-8      110313             12903 ns/op            1809 B/op         24 allocs/op

Метрики (Prometheus)

Метрики workqueue активируются бланк‑импортом:

import (
    _ "k8s.io/component-base/metrics/prometheus/workqueue"
    "k8s.io/component-base/metrics/legacyregistry"
    "net/http"
)

func serveMetrics() {
    mux := http.NewServeMux()
    mux.Handle("/metrics", legacyregistry.Handler())
    go http.ListenAndServe(":8080", mux)
}

Имя очереди (QueueConfig.Name) попадает в лейблы метрик (workqueue_*: adds, depth, work_duration, retries и т.д.).


Примеры

Смотрите каталог examples/ — там есть готовые варианты для одноразовых задач, периодических расписаний, комбинированных сценариев и динамического диспатча.


Лицензия

MIT — см. LICENSE.