Skip to content

[Feat] Make batch size for maximum retention in spend logs a controllable parameter #11459

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/my-website/docs/proxy/config_settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,4 +650,5 @@ router_settings:
| USE_AWS_KMS | Flag to enable AWS Key Management Service for encryption
| USE_PRISMA_MIGRATE | Flag to use prisma migrate instead of prisma db push. Recommended for production environments.
| WEBHOOK_URL | URL for receiving webhooks from external services
| SPEND_LOG_RUN_LOOPS | Constant for setting how many runs of 1000 batch deletes should spend_log_cleanup task run
| SPEND_LOG_RUN_LOOPS | Constant for setting how many runs of 1000 batch deletes should spend_log_cleanup task run |
| SPEND_LOG_CLEANUP_BATCH_SIZE | Number of logs deleted per batch during cleanup. Default is 1000 |
8 changes: 5 additions & 3 deletions docs/my-website/docs/proxy/spend_logs_deletion.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,20 @@ If Redis is enabled, LiteLLM uses it to make sure only one instance runs the cle
Once cleanup starts:

- It calculates the cutoff date using the configured retention period
- Deletes logs older than the cutoff in **batches of 1000**
- Deletes logs older than the cutoff in batches (default size `1000`)
- Adds a short delay between batches to avoid overloading the database

### Default settings:
- **Batch size**: 1000 logs
- **Batch size**: 1000 logs (configurable via `SPEND_LOG_CLEANUP_BATCH_SIZE`)
- **Max batches per run**: 500
- **Max deletions per run**: 500,000 logs

You can change the number of batches using an environment variable:
You can change the cleanup parameters using environment variables:

```bash
SPEND_LOG_RUN_LOOPS=200
# optional: change batch size from the default 1000
SPEND_LOG_CLEANUP_BATCH_SIZE=2000
```

This would allow up to 200,000 logs to be deleted in one run.
Expand Down
4 changes: 3 additions & 1 deletion docs/my-website/docs/proxy/ui_logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ general_settings:

You can control how many logs are deleted per run using this environment variable:

`SPEND_LOG_RUN_LOOPS=200 # Deletes up to 200,000 logs in one run (batch size = 1000)`
`SPEND_LOG_RUN_LOOPS=200 # Deletes up to 200,000 logs in one run`

Set `SPEND_LOG_CLEANUP_BATCH_SIZE` to control how many logs are deleted per batch (default `1000`).

For detailed architecture and how it works, see [Spend Logs Deletion](../proxy/spend_logs_deletion).

Expand Down
1 change: 1 addition & 0 deletions litellm/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@
PROMETHEUS_EMIT_BUDGET_METRICS_JOB_NAME = "prometheus_emit_budget_metrics"
SPEND_LOG_CLEANUP_JOB_NAME = "spend_log_cleanup"
SPEND_LOG_RUN_LOOPS = int(os.getenv("SPEND_LOG_RUN_LOOPS", 500))
SPEND_LOG_CLEANUP_BATCH_SIZE = int(os.getenv("SPEND_LOG_CLEANUP_BATCH_SIZE", 1000))
DEFAULT_CRON_JOB_LOCK_TTL_SECONDS = int(
os.getenv("DEFAULT_CRON_JOB_LOCK_TTL_SECONDS", 60)
) # 1 minute
Expand Down
68 changes: 49 additions & 19 deletions litellm/proxy/db/db_transaction_queue/spend_log_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional
from litellm.proxy.utils import PrismaClient
from litellm.litellm_core_utils.duration_parser import duration_in_seconds

from litellm._logging import verbose_proxy_logger
from litellm.caching import RedisCache
from litellm.constants import SPEND_LOG_CLEANUP_JOB_NAME, SPEND_LOG_RUN_LOOPS
import asyncio
from litellm.constants import (
SPEND_LOG_CLEANUP_BATCH_SIZE,
SPEND_LOG_CLEANUP_JOB_NAME,
SPEND_LOG_RUN_LOOPS,
)
from litellm.litellm_core_utils.duration_parser import duration_in_seconds
from litellm.proxy.utils import PrismaClient


class SpendLogCleanup:
Expand All @@ -16,21 +21,26 @@ class SpendLogCleanup:
"""

def __init__(self, general_settings=None, redis_cache: Optional[RedisCache] = None):
self.batch_size = 1000
self.batch_size = SPEND_LOG_CLEANUP_BATCH_SIZE
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really the only change, the other deltas are my formatter.

self.retention_seconds: Optional[int] = None
from litellm.proxy.proxy_server import general_settings as default_settings

self.general_settings = general_settings or default_settings
from litellm.proxy.proxy_server import proxy_logging_obj

pod_lock_manager = proxy_logging_obj.db_spend_update_writer.pod_lock_manager
self.pod_lock_manager = pod_lock_manager
verbose_proxy_logger.info(f"SpendLogCleanup initialized with batch size: {self.batch_size}")
verbose_proxy_logger.info(
f"SpendLogCleanup initialized with batch size: {self.batch_size}"
)

def _should_delete_spend_logs(self) -> bool:
"""
Determines if logs should be deleted based on the max retention period in settings.
"""
retention_setting = self.general_settings.get("maximum_spend_logs_retention_period")
retention_setting = self.general_settings.get(
"maximum_spend_logs_retention_period"
)
verbose_proxy_logger.info(f"Checking retention setting: {retention_setting}")

if retention_setting is None:
Expand All @@ -41,15 +51,19 @@ def _should_delete_spend_logs(self) -> bool:
if isinstance(retention_setting, int):
retention_setting = str(retention_setting)
self.retention_seconds = duration_in_seconds(retention_setting)
verbose_proxy_logger.info(f"Retention period set to {self.retention_seconds} seconds")
verbose_proxy_logger.info(
f"Retention period set to {self.retention_seconds} seconds"
)
return True
except ValueError as e:
verbose_proxy_logger.error(
f"Invalid maximum_spend_logs_retention_period value: {retention_setting}, error: {str(e)}"
)
return False

async def _delete_old_logs(self, prisma_client: PrismaClient, cutoff_date: datetime) -> int:
async def _delete_old_logs(
self, prisma_client: PrismaClient, cutoff_date: datetime
) -> int:
"""
Helper method to delete old logs in batches.
Returns the total number of logs deleted.
Expand All @@ -58,7 +72,9 @@ async def _delete_old_logs(self, prisma_client: PrismaClient, cutoff_date: datet
run_count = 0
while True:
if run_count > SPEND_LOG_RUN_LOOPS:
verbose_proxy_logger.info("Max logs deleted - 1,00,000, rest of the logs will be deleted in next run")
verbose_proxy_logger.info(
"Max logs deleted - 1,00,000, rest of the logs will be deleted in next run"
)
break
# Step 1: Find logs to delete
logs_to_delete = await prisma_client.db.litellm_spendlogs.find_many(
Expand All @@ -68,7 +84,9 @@ async def _delete_old_logs(self, prisma_client: PrismaClient, cutoff_date: datet
verbose_proxy_logger.info(f"Found {len(logs_to_delete)} logs in this batch")

if not logs_to_delete:
verbose_proxy_logger.info(f"No more logs to delete. Total deleted: {total_deleted}")
verbose_proxy_logger.info(
f"No more logs to delete. Total deleted: {total_deleted}"
)
break

request_ids = [log.request_id for log in logs_to_delete]
Expand All @@ -80,7 +98,7 @@ async def _delete_old_logs(self, prisma_client: PrismaClient, cutoff_date: datet

total_deleted += len(logs_to_delete)
run_count += 1

# Add a small sleep to prevent overwhelming the database
await asyncio.sleep(0.1)

Expand All @@ -96,26 +114,36 @@ async def cleanup_old_spend_logs(self, prisma_client: PrismaClient) -> None:
verbose_proxy_logger.info(f"Cleanup job triggered at {datetime.now()}")

if not self._should_delete_spend_logs():
verbose_proxy_logger.info("Skipping cleanup — invalid or missing retention setting.")
verbose_proxy_logger.info(
"Skipping cleanup — invalid or missing retention setting."
)
return

if self.retention_seconds is None:
verbose_proxy_logger.error("Retention seconds is None, cannot proceed with cleanup")
verbose_proxy_logger.error(
"Retention seconds is None, cannot proceed with cleanup"
)
return

# If we have a pod lock manager, try to acquire the lock
if self.pod_lock_manager and self.pod_lock_manager.redis_cache:
lock_acquired = await self.pod_lock_manager.acquire_lock(
cronjob_id=SPEND_LOG_CLEANUP_JOB_NAME,
)
verbose_proxy_logger.info(f"Lock acquisition attempt: {'successful' if lock_acquired else 'failed'} at {datetime.now()}")

verbose_proxy_logger.info(
f"Lock acquisition attempt: {'successful' if lock_acquired else 'failed'} at {datetime.now()}"
)

if not lock_acquired:
verbose_proxy_logger.info("Another pod is already running cleanup")
return

cutoff_date = datetime.now(timezone.utc) - timedelta(seconds=float(self.retention_seconds))
verbose_proxy_logger.info(f"Deleting logs older than {cutoff_date.isoformat()}")
cutoff_date = datetime.now(timezone.utc) - timedelta(
seconds=float(self.retention_seconds)
)
verbose_proxy_logger.info(
f"Deleting logs older than {cutoff_date.isoformat()}"
)

# Perform the actual deletion
total_deleted = await self._delete_old_logs(prisma_client, cutoff_date)
Expand All @@ -127,5 +155,7 @@ async def cleanup_old_spend_logs(self, prisma_client: PrismaClient) -> None:
finally:
# Always release the lock if we have a pod lock manager
if self.pod_lock_manager and self.pod_lock_manager.redis_cache:
await self.pod_lock_manager.release_lock(cronjob_id=SPEND_LOG_CLEANUP_JOB_NAME)
await self.pod_lock_manager.release_lock(
cronjob_id=SPEND_LOG_CLEANUP_JOB_NAME
)
verbose_proxy_logger.info("Released cleanup lock")
21 changes: 21 additions & 0 deletions tests/test_litellm/proxy/test_spend_log_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,24 @@ async def test_cleanup_old_spend_logs_no_retention_period():

mock_prisma_client.db.litellm_spendlogs.find_many.assert_not_called()
mock_prisma_client.db.litellm_spendlogs.delete.assert_not_called()


def test_cleanup_batch_size_env_var(monkeypatch):
"""Ensure batch size is configurable via environment variable"""
import importlib

import litellm.constants as constants_module
import litellm.proxy.db.db_transaction_queue.spend_log_cleanup as cleanup_module

# Set env var and reload modules to pick up new value
monkeypatch.setenv("SPEND_LOG_CLEANUP_BATCH_SIZE", "25")
importlib.reload(constants_module)
importlib.reload(cleanup_module)

cleaner = cleanup_module.SpendLogCleanup(general_settings={})
assert cleaner.batch_size == 25

# Remove env var and reload to restore default for other tests
monkeypatch.delenv("SPEND_LOG_CLEANUP_BATCH_SIZE", raising=False)
importlib.reload(constants_module)
importlib.reload(cleanup_module)
Loading