Skip to content

Inactive consumer groups prevent events stream trimming #18606

@kzvezdarov

Description

@kzvezdarov

Bug summary

The stream trimming function prefect_redis.messaging._trim_stream_to_lowest_delivered_id trims the events stream to the earliest message id delivered to the consumer groups registered with the stream.

It seems that when a consumer group becomes inactive - such as the process suddenly getting killed and the finalizer in prefect_redis.messaging.ephemeral_subscriptuion fails to execute - stream trimming becomes "stuck" on that group's last delivered message, preventing trimming, and leading to eventual memory exhaustion due to the stream's size.

Reproducing this is a bit involved but fairly easy:

  1. Start docker compose stack1, create deployment2:
  2. Run deployment a couple of times:
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
  1. Connect to redis, verify all stream consumers have lag close to 0:
127.0.0.1:6379> xinfo groups events
1)  1) "name"
    2) "ephemeral-031b648d3ed1-12cb42420e20485aa22bfd4e601137ce"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
3)  1) "name"
    2) "ephemeral-f5a5e6a40792-6f1ff2aaf3284c1bbb6a13765529dff3"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
4)  1) "name"
    2) "event-persister"
    3) "consumers"
    4) (integer) 4
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
5)  1) "name"
    2) "reactive-triggers"
    3) "consumers"
    4) (integer) 4
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
6)  1) "name"
    2) "task-run-recorder"
    3) "consumers"
    4) (integer) 4
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 0
  1. Kill one of the api containers:
docker kill --signal=9 prefect-ha-prefect-api-2
  1. Run deployment a couple of times again
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
  1. Verify that one of the consumer groups is lagging:
127.0.0.1:6379> xinfo groups events
1)  1) "name"
    2) "ephemeral-031b648d3ed1-12cb42420e20485aa22bfd4e601137ce"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0
2)  1) "name"
    2) "ephemeral-6dd294b0c059-40eb93da14da4aa1b85701730fb225f8"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0
3)  1) "name"
    2) "ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801606636-0"
    9) "entries-read"
   10) (integer) 36
   11) "lag"
   12) (integer) 70
4)  1) "name"
    2) "ephemeral-f5a5e6a40792-6f1ff2aaf3284c1bbb6a13765529dff3"
    3) "consumers"
    4) (integer) 1
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0
5)  1) "name"
    2) "event-persister"
    3) "consumers"
    4) (integer) 5
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0
6)  1) "name"
    2) "reactive-triggers"
    3) "consumers"
    4) (integer) 5
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0
7)  1) "name"
    2) "task-run-recorder"
    3) "consumers"
    4) (integer) 5
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1753801734537-0"
    9) "entries-read"
   10) (integer) 106
   11) "lag"
   12) (integer) 0

ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4 lagging seems to prevent further trimming; for a deployment with a large volume of events this is very visible - deleting lagging groups results in an immediate drop in memory usage:

Image
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: prefect
      POSTGRES_PASSWORD: prefect
      POSTGRES_DB: prefect
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7
    ports:
      - 6379:6379

  migrate:
    image: prefecthq/prefect:3.4.10-python3.12
    depends_on: [postgres]
    command: prefect server database upgrade -y
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect

  prefect-api:
    image: prefecthq/prefect:3.4.10-python3.12
    depends_on: [migrate, postgres, redis]
    deploy:
      replicas: 2
    command: prefect server start --host 0.0.0.0 --no-services
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
      PREFECT_API_DATABASE_MIGRATE_ON_START: "false"
      PREFECT_MESSAGING_BROKER: prefect_redis.messaging
      PREFECT_MESSAGING_CACHE: prefect_redis.messaging
      PREFECT_REDIS_MESSAGING_HOST: redis
      PREFECT_REDIS_MESSAGING_PORT: "6379"
    ports:
      - "4200-4202:4200"  # Maps to different ports for each replica

  prefect-background:
    image: prefecthq/prefect:3.4.10-python3.12
    depends_on: [migrate, postgres, redis]
    command: prefect server services start
    environment:
      PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
      PREFECT_API_DATABASE_MIGRATE_ON_START: "false"
      PREFECT_MESSAGING_BROKER: prefect_redis.messaging
      PREFECT_MESSAGING_CACHE: prefect_redis.messaging
      PREFECT_REDIS_MESSAGING_HOST: redis
      PREFECT_REDIS_MESSAGING_PORT: "6379"

volumes:
  postgres_data:
from prefect import flow, task, serve


@task
def generate_data() -> list[int]:
    return list(range(100))


@flow
def test_flow():
    results = generate_data()
    return results


if __name__ == "__main__":
    test_deployment = test_flow.to_deployment(name="test-deployment")

    serve(test_deployment)

Version info

Version:             3.4.10
API version:         0.8.4
Python version:      3.12.11
Git commit:          fb979941
Built:               Tue, Jul 22, 2025 07:25 PM
OS/Arch:             linux/aarch64
Profile:             ephemeral
Server type:         ephemeral
Pydantic version:    2.11.7
Server:
  Database:          postgresql
Integrations:
  prefect-redis:     0.2.2

Additional context

I've tested this with newer prefect-redis, pulled directly from git, with the same result.

Footnotes

  1. Docker compose stack in HA mode.

  2. Deployment script.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinggreat writeupThis is a wonderful example of our standards

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions