-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Bug summary
The stream trimming function prefect_redis.messaging._trim_stream_to_lowest_delivered_id
trims the events
stream to the earliest message id delivered to the consumer groups registered with the stream.
It seems that when a consumer group becomes inactive - such as the process suddenly getting killed and the finalizer in prefect_redis.messaging.ephemeral_subscriptuion
fails to execute - stream trimming becomes "stuck" on that group's last delivered message, preventing trimming, and leading to eventual memory exhaustion due to the stream's size.
Reproducing this is a bit involved but fairly easy:
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
- Connect to redis, verify all stream consumers have lag close to 0:
127.0.0.1:6379> xinfo groups events
1) 1) "name"
2) "ephemeral-031b648d3ed1-12cb42420e20485aa22bfd4e601137ce"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
2) 1) "name"
2) "ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
3) 1) "name"
2) "ephemeral-f5a5e6a40792-6f1ff2aaf3284c1bbb6a13765529dff3"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
4) 1) "name"
2) "event-persister"
3) "consumers"
4) (integer) 4
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
5) 1) "name"
2) "reactive-triggers"
3) "consumers"
4) (integer) 4
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
6) 1) "name"
2) "task-run-recorder"
3) "consumers"
4) (integer) 4
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 0
- Kill one of the api containers:
docker kill --signal=9 prefect-ha-prefect-api-2
- Run deployment a couple of times again
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
prefect deployment run test-flow/test-deployment
- Verify that one of the consumer groups is lagging:
127.0.0.1:6379> xinfo groups events
1) 1) "name"
2) "ephemeral-031b648d3ed1-12cb42420e20485aa22bfd4e601137ce"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
2) 1) "name"
2) "ephemeral-6dd294b0c059-40eb93da14da4aa1b85701730fb225f8"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
3) 1) "name"
2) "ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801606636-0"
9) "entries-read"
10) (integer) 36
11) "lag"
12) (integer) 70
4) 1) "name"
2) "ephemeral-f5a5e6a40792-6f1ff2aaf3284c1bbb6a13765529dff3"
3) "consumers"
4) (integer) 1
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
5) 1) "name"
2) "event-persister"
3) "consumers"
4) (integer) 5
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
6) 1) "name"
2) "reactive-triggers"
3) "consumers"
4) (integer) 5
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
7) 1) "name"
2) "task-run-recorder"
3) "consumers"
4) (integer) 5
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1753801734537-0"
9) "entries-read"
10) (integer) 106
11) "lag"
12) (integer) 0
ephemeral-6dd294b0c059-bd998f55f4b74db8b1b38e072880cbf4
lagging seems to prevent further trimming; for a deployment with a large volume of events this is very visible - deleting lagging groups results in an immediate drop in memory usage:

services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: prefect
POSTGRES_PASSWORD: prefect
POSTGRES_DB: prefect
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7
ports:
- 6379:6379
migrate:
image: prefecthq/prefect:3.4.10-python3.12
depends_on: [postgres]
command: prefect server database upgrade -y
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
prefect-api:
image: prefecthq/prefect:3.4.10-python3.12
depends_on: [migrate, postgres, redis]
deploy:
replicas: 2
command: prefect server start --host 0.0.0.0 --no-services
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
PREFECT_API_DATABASE_MIGRATE_ON_START: "false"
PREFECT_MESSAGING_BROKER: prefect_redis.messaging
PREFECT_MESSAGING_CACHE: prefect_redis.messaging
PREFECT_REDIS_MESSAGING_HOST: redis
PREFECT_REDIS_MESSAGING_PORT: "6379"
ports:
- "4200-4202:4200" # Maps to different ports for each replica
prefect-background:
image: prefecthq/prefect:3.4.10-python3.12
depends_on: [migrate, postgres, redis]
command: prefect server services start
environment:
PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://prefect:prefect@postgres:5432/prefect
PREFECT_API_DATABASE_MIGRATE_ON_START: "false"
PREFECT_MESSAGING_BROKER: prefect_redis.messaging
PREFECT_MESSAGING_CACHE: prefect_redis.messaging
PREFECT_REDIS_MESSAGING_HOST: redis
PREFECT_REDIS_MESSAGING_PORT: "6379"
volumes:
postgres_data:
from prefect import flow, task, serve
@task
def generate_data() -> list[int]:
return list(range(100))
@flow
def test_flow():
results = generate_data()
return results
if __name__ == "__main__":
test_deployment = test_flow.to_deployment(name="test-deployment")
serve(test_deployment)
Version info
Version: 3.4.10
API version: 0.8.4
Python version: 3.12.11
Git commit: fb979941
Built: Tue, Jul 22, 2025 07:25 PM
OS/Arch: linux/aarch64
Profile: ephemeral
Server type: ephemeral
Pydantic version: 2.11.7
Server:
Database: postgresql
Integrations:
prefect-redis: 0.2.2
Additional context
I've tested this with newer prefect-redis
, pulled directly from git, with the same result.