Skip to content

Connect UI to "LiteLLM_DailyUserSpend" spend table - enables usage tab to work at 1m+ spend logs #9603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci_cd/baseline_db.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import subprocess
from pathlib import Path
from datetime import datetime
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "LiteLLM_DailyUserSpend" ADD COLUMN "api_requests" INTEGER NOT NULL DEFAULT 0;

1 change: 1 addition & 0 deletions litellm/proxy/_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2718,3 +2718,4 @@ class DailyUserSpendTransaction(TypedDict):
prompt_tokens: int
completion_tokens: int
spend: float
api_requests: int
94 changes: 83 additions & 11 deletions litellm/proxy/db/prisma_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
import glob
import os
import random
import subprocess
Expand Down Expand Up @@ -178,6 +179,69 @@ def _create_baseline_migration(schema_path: str) -> bool:
verbose_proxy_logger.warning(f"Error creating baseline migration: {e}")
return False

@staticmethod
def _copy_spend_tracking_migrations(prisma_dir: str) -> bool:
import shutil
from pathlib import Path

"""
Check for and copy over spend tracking migrations if they exist in the deploy directory.
Returns True if migrations were found and copied, False otherwise.
"""
try:
# Get the current file's directory
current_dir = Path(__file__).parent

# Check for migrations in the deploy directory (../../deploy/migrations)
deploy_migrations_dir = (
current_dir.parent.parent.parent / "deploy" / "migrations"
)

# Local migrations directory
local_migrations_dir = Path(prisma_dir + "/migrations")

if deploy_migrations_dir.exists():
# Create local migrations directory if it doesn't exist
local_migrations_dir.mkdir(parents=True, exist_ok=True)

# Copy all migration files
# Copy entire migrations folder recursively
shutil.copytree(
deploy_migrations_dir, local_migrations_dir, dirs_exist_ok=True
)

return True
return False
except Exception:
return False

@staticmethod
def _get_migration_names(migrations_dir: str) -> list:
"""Get all migration directory names from the migrations folder"""
migration_paths = glob.glob(f"{migrations_dir}/*/migration.sql")
return [Path(p).parent.name for p in migration_paths]

@staticmethod
def _resolve_all_migrations(migrations_dir: str):
"""Mark all existing migrations as applied"""
migration_names = PrismaManager._get_migration_names(migrations_dir)
for migration_name in migration_names:
try:
verbose_proxy_logger.info(f"Resolving migration: {migration_name}")
subprocess.run(
["prisma", "migrate", "resolve", "--applied", migration_name],
timeout=60,
check=True,
capture_output=True,
text=True,
)
verbose_proxy_logger.debug(f"Resolved migration: {migration_name}")
except subprocess.CalledProcessError as e:
if "is already recorded as applied in the database." not in e.stderr:
verbose_proxy_logger.warning(
f"Failed to resolve migration {migration_name}: {e.stderr}"
)

@staticmethod
def setup_database(use_migrate: bool = False) -> bool:
"""
Expand All @@ -194,8 +258,10 @@ def setup_database(use_migrate: bool = False) -> bool:
os.chdir(prisma_dir)
try:
if use_migrate:
PrismaManager._copy_spend_tracking_migrations(
prisma_dir
) # place a migration in the migrations directory
verbose_proxy_logger.info("Running prisma migrate deploy")
# First try to run migrate deploy directly
try:
subprocess.run(
["prisma", "migrate", "deploy"],
Expand All @@ -205,25 +271,31 @@ def setup_database(use_migrate: bool = False) -> bool:
text=True,
)
verbose_proxy_logger.info("prisma migrate deploy completed")

# Resolve all migrations in the migrations directory
migrations_dir = os.path.join(prisma_dir, "migrations")
PrismaManager._resolve_all_migrations(migrations_dir)

return True
except subprocess.CalledProcessError as e:
# Check if this is the non-empty schema error
verbose_proxy_logger.warning(
f"prisma db error: {e.stderr}, e: {e.stdout}"
)
if (
"P3005" in e.stderr
and "database schema is not empty" in e.stderr
):
# Create baseline migration
verbose_proxy_logger.info("Creating baseline migration")
if PrismaManager._create_baseline_migration(schema_path):
# Try migrate deploy again after baseline
subprocess.run(
["prisma", "migrate", "deploy"],
timeout=60,
check=True,
verbose_proxy_logger.info(
"Resolving all migrations after baseline"
)

# Resolve all migrations after baseline
migrations_dir = os.path.join(prisma_dir, "migrations")
PrismaManager._resolve_all_migrations(migrations_dir)

return True
else:
# If it's a different error, raise it
raise e
else:
# Use prisma db push with increased timeout
subprocess.run(
Expand Down
Loading
Loading