How do I clean up older data from the database? #12047
-
After using |
Beta Was this translation helpful? Give feedback.
Replies: 7 comments 22 replies
-
One option is to use the python APIs against the Perform this operation with great care. An example script would look something like this.
|
Beta Was this translation helpful? Give feedback.
-
Will this trigger the relevant cleanup to take place as well (like |
Beta Was this translation helpful? Give feedback.
-
If I want to keep ASSET_MATERIALIZATION and ASSET_OBSERVATION event for UI, is there a way to keep them? |
Beta Was this translation helpful? Give feedback.
-
Is there a clean way to also remove the associated folders on disk? I have a disk that's slowly getting filled up with the contents of intermediate results of runs. |
Beta Was this translation helpful? Give feedback.
-
Just for inspiration, we are currently cleaning up the disk by blindly deleting storage older than 30 days.
No guarantee this is a good idea, I can only confirm this hasn't broken our production instance (yet). |
Beta Was this translation helpful? Give feedback.
-
In Dagster+, runs can be deleted via an authorized call to GraphQL API using the import os
from gql import Client, gql
from gql.transport.requests import RequestsHTTPTransport
# Define the endpoint URL and token
org_name = "your-org-name"
base_url = f"https://{org_name}.dagster.cloud/"
deployment_name = "prod" # string name for actual deployments, branch deployments use a string ID of their deploymentId
url = base_url + deployment_name + "/graphql"
token = os.getenv("DAGSTER_CLOUD_USER_TOKEN") # a user token generated from the Organization Settings page in Dagster+. Note: use a user token, not agent token
# Define the transport with the endpoint URL and any headers if needed
transport = RequestsHTTPTransport(
url=url,
headers={
"Dagster-Cloud-Api-Token": token,
},
use_json=True,
timeout=60,
)
# Instantiate the client
client = Client(transport=transport)
# Define the GraphQL mutation
delete_run_mutation = gql("""
mutation DeleteRun($runId: String!) {
deleteRun(runId: $runId) {
__typename
... on DeletePipelineRunSuccess {
runId
}
... on RunNotFoundError {
runId
}
... on PythonError {
message
stack
}
}
}
""")
# Define the query variables
query_variables = {
"runId": "your-run-id" # replace with the actual run ID you want to delete
}
# Execute the mutation
try:
result = client.execute(delete_run_mutation, variable_values=query_variables)
print(result)
except Exception as e:
print(f"An error occurred: {e}") |
Beta Was this translation helpful? Give feedback.
-
Database clean-up job (Dagster OSS)We are running this dagster database clean-up job weekly.
@op
def postgresql_dagster_cleanup_op(context: OpExecutionContext, postgresql_dagster: DatabaseConfig):
# connect
conn = psycopg2.connect(
host=postgresql_dagster.host,
database=postgresql_dagster.database,
user=postgresql_dagster.user,
password=postgresql_dagster.password,
)
context.log.debug("Successfully connected to PostgreSQL!")
cursor = conn.cursor()
# remove debug logs older than a week
cursor.execute(
dedent(
"""\
delete from event_logs el
where
dagster_event_type is null
and event::jsonb->>'level' = '10'
and timestamp < CURRENT_DATE - INTERVAL '1 week'
"""
),
)
conn.commit()
context.log.info(f"Removed {cursor.rowcount} debug logs older than a week!")
# remove info logs older than 2 months
cursor.execute(
dedent(
"""\
delete from event_logs el
where
dagster_event_type is null
and event::jsonb->>'level' = '20'
and timestamp < CURRENT_DATE - INTERVAL '2 months';
"""
),
)
conn.commit()
context.log.info(f"Removed {cursor.rowcount} info logs older than 2 months!")
# remove warning logs older than 2 months
cursor.execute(
dedent(
"""\
delete from event_logs el
where
dagster_event_type is null
and event::jsonb->>'level' = '30'
and timestamp < CURRENT_DATE - INTERVAL '2 months';
"""
),
)
conn.commit()
context.log.info(f"Removed {cursor.rowcount} warning logs older than 2 months!")
# remove debug logs older than a month (https://second-foundation.atlassian.net/browse/DATP-1551)
cursor.execute(
dedent(
"""\
DELETE
FROM event_logs
WHERE 1 = 1
AND timestamp < CURRENT_DATE - INTERVAL '1 month'
AND dagster_event_type IN (
-- Transition states. Used in business logic, but not important after the job finishes.
'ASSET_MATERIALIZATION_PLANNED',
-- System logs
'ENGINE_EVENT',
'HANDLED_OUTPUT',
'LOADED_INPUT',
-- Transition states. Used in business logic, but not important after the job finishes.
'PIPELINE_CANCELING',
'PIPELINE_ENQUEUED',
'PIPELINE_STARTING',
-- System logs
'RESOURCE_INIT_FAILURE',
'RESOURCE_INIT_STARTED',
'RESOURCE_INIT_SUCCESS',
'STEP_INPUT',
'STEP_OUTPUT',
'STEP_WORKER_STARTED',
'STEP_WORKER_STARTING'
)
"""
),
)
conn.commit()
context.log.info(f"Removed {cursor.rowcount} not important events older than 1 month!")
@job
def postgresql_dagster_cleanup_job():
postgresql_dagster_cleanup_op() Additional indexes:
What can you expect when running clean-up?
The clean up job finished... what now?
Details about VACUUM (RECOVERING DISK SPACE)The standard form of VACUUM removes dead row versions in tables and indexes and marks the space available for future reuse. However, it will not return the space to the operating system, except in the special case where one or more pages at the end of a table become entirely free and an exclusive table lock can be easily obtained. In contrast, VACUUM FULL actively compacts tables by writing a complete new version of the table file with no dead space. This minimizes the size of the table, but can take a long time. It also requires extra disk space for the new copy of the table, until the operation completes. The usual goal of routine vacuuming is to do standard VACUUMs often enough to avoid needing VACUUM FULL. The autovacuum daemon attempts to work this way, and in fact will never issue VACUUM FULL. In this approach, the idea is not to keep tables at their minimum size, but to maintain steady-state usage of disk space: each table occupies space equivalent to its minimum size plus however much space gets used up between vacuum runs. Although VACUUM FULL can be used to shrink a table back to its minimum size and return the disk space to the operating system, there is not much point in this if the table will just grow again in the future. Thus, moderately-frequent standard VACUUM runs are a better approach than infrequent VACUUM FULL runs for maintaining heavily-updated tables. Here are my utility queries for VACUUM: -- Get dead tuples: deleted/updated rows that were not collected yet
SELECT relname, n_dead_tup FROM pg_stat_user_tables ORDER BY n_dead_tup DESC;
-- Get analyze stats
SELECT relname, last_vacuum, last_analyze, last_autovacuum, last_autoanalyze, vacuum_count, autovacuum_count, analyze_count, autoanalyze_count
FROM pg_stat_all_tables
WHERE relname = 'event_logs'
-- Table/Index size
SELECT relname, pg_size_pretty(pg_relation_size(oid)) AS table_size,
pg_size_pretty(pg_total_relation_size(oid) - pg_relation_size(oid)) AS index_size
FROM pg_class
WHERE relname = 'event_logs'
-- Run vacuum + refresh indexes
VACUUM VERBOSE ANALYZE dagster.event_logs
-- Vacuum progress
SELECT
n.nspname || '.' || c.relname AS table_name,
v.phase,
round(100.0 * v.heap_blks_scanned / NULLIF(v.heap_blks_total, 0), 2) AS pct_scanned,
round(100.0 * v.heap_blks_vacuumed / NULLIF(v.heap_blks_total, 0), 2) AS pct_vacuumed,
v.heap_blks_total,
v.heap_blks_scanned,
v.heap_blks_vacuumed
FROM
pg_stat_progress_vacuum v
JOIN
pg_class c ON v.relid = c.oid
JOIN
pg_namespace n ON c.relnamespace = n.oid; My notes:
🚀 Follow up: Shrinking the database with pg_repack
Since we are using Azure Psqlflex instance instance, some parts might different, but I will share my path.
Feel free to reach out to me if you need to discuss it. :) |
Beta Was this translation helpful? Give feedback.
One option is to use the python APIs against the
DagsterInstance
to query for older runs and delete them. This is a destructive operation that will remove the events, tags, and run record from the database. This will removedagster
s understanding that this run ever occurred, which can be particularly impactful to partitioned jobs and assets.Perform this operation with great care.
An example script would look something like this.