Skip to content

Commit 6ab87da

Browse files
committed
refactor(dags:airflow_log_cleanup): split the huge bash as separate tasks
1 parent 983979c commit 6ab87da

File tree

1 file changed

+171
-141
lines changed

1 file changed

+171
-141
lines changed

dags/airflow_log_cleanup.py

Lines changed: 171 additions & 141 deletions
Original file line numberDiff line numberDiff line change
@@ -7,118 +7,28 @@
77
"""
88

99
import logging
10+
import os
1011
from datetime import datetime, timedelta
11-
from string import Template
12+
from pathlib import Path
13+
from typing import TYPE_CHECKING, Any
1214

1315
from airflow.configuration import conf
14-
from airflow.sdk import Variable, dag, task
15-
16-
log_cleanup_tpl = Template(
17-
"""
18-
echo "Getting Configurations..."
19-
BASE_LOG_FOLDER="$directory"
20-
WORKER_SLEEP_TIME="$sleep_time"
21-
22-
sleep ${WORKER_SLEEP_TIME}s
23-
24-
MAX_LOG_AGE_IN_DAYS="$max_log_age_in_days"
25-
ENABLE_DELETE="$enable_delete"
26-
echo "Finished Getting Configurations"
27-
28-
echo "Configurations:"
29-
echo "BASE_LOG_FOLDER: '${BASE_LOG_FOLDER}'"
30-
echo "MAX_LOG_AGE_IN_DAYS: '${MAX_LOG_AGE_IN_DAYS}'"
31-
echo "ENABLE_DELETE: '${ENABLE_DELETE}'"
32-
33-
cleanup() {
34-
echo "Executing Find Statement: $1"
35-
FILES_MARKED_FOR_DELETE=`eval $1`
36-
echo "Process will be Deleting the following File(s)/Directory(s):"
37-
echo "${FILES_MARKED_FOR_DELETE}"
38-
echo "Process will be Deleting `echo "${FILES_MARKED_FOR_DELETE}" | \
39-
grep -v '^$' | wc -l` File(s)/Directory(s)" \
40-
# "grep -v '^$'" - removes empty lines.
41-
# "wc -l" - Counts the number of lines
42-
echo ""
43-
if [ "${ENABLE_DELETE}" == "true" ];
44-
then
45-
if [ "${FILES_MARKED_FOR_DELETE}" != "" ];
46-
then
47-
echo "Executing Delete Statement: $2"
48-
eval $2
49-
DELETE_STMT_EXIT_CODE=$?
50-
if [ "${DELETE_STMT_EXIT_CODE}" != "0" ]; then
51-
echo "Delete process failed with exit code \
52-
'${DELETE_STMT_EXIT_CODE}'"
53-
54-
echo "Removing lock file..."
55-
rm -f $log_cleanup_process_lock_file
56-
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
57-
echo "Error removing the lock file. \
58-
Check file permissions.\
59-
To re-run the DAG, ensure that the lock file has been \
60-
deleted ($log_cleanup_process_lock_file)
61-
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
62-
fi
63-
exit ${DELETE_STMT_EXIT_CODE}
64-
fi
65-
else
66-
echo "WARN: No File(s)/Directory(s) to Delete"
67-
fi
68-
else
69-
echo "WARN: You're opted to skip deleting the File(s)/Directory(s)!!!"
70-
fi
71-
}
72-
73-
if [ ! -f $log_cleanup_process_lock_file ]; then
74-
echo "Lock file not found on this node! Creating it to prevent collisions..."
75-
touch $log_cleanup_process_lock_file
76-
CREATE_LOCK_FILE_EXIT_CODE=$?
77-
if [ "${CREATE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
78-
echo "Error creating the lock file. \
79-
Check if the airflow user can create files under tmp directory. \
80-
Exiting..."
81-
exit ${CREATE_LOCK_FILE_EXIT_CODE}
82-
fi
83-
84-
echo "Running Cleanup Process..."
85-
86-
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type f -mtime +${MAX_LOG_AGE_IN_DAYS}"
87-
DELETE_STMT="${FIND_STATEMENT} -exec rm -f {} \;"
88-
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
89-
CLEANUP_EXIT_CODE=$?
90-
91-
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/*/* -type d -empty"
92-
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
93-
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
94-
CLEANUP_EXIT_CODE=$?
95-
96-
FIND_STATEMENT="find ${BASE_LOG_FOLDER}/* -type d -empty"
97-
DELETE_STMT="${FIND_STATEMENT} -prune -exec rm -rf {} \;"
98-
cleanup "${FIND_STATEMENT}" "${DELETE_STMT}"
99-
CLEANUP_EXIT_CODE=$?
100-
101-
echo "Finished Running Cleanup Process"
102-
103-
echo "Deleting lock file..."
104-
rm -f $log_cleanup_process_lock_file
105-
REMOVE_LOCK_FILE_EXIT_CODE=$?
106-
if [ "${REMOVE_LOCK_FILE_EXIT_CODE}" != "0" ]; then
107-
echo "Error removing the lock file. Check file permissions. \
108-
To re-run the DAG, ensure that the lock file has been deleted ($log_cleanup_process_lock_file)."
109-
exit ${REMOVE_LOCK_FILE_EXIT_CODE}
110-
fi
111-
112-
else
113-
echo "Another task is already deleting logs on this worker node. Skipping it!"
114-
echo "If you believe you're receiving this message in error, \
115-
kindly check if $log_cleanup_process_lock_file exists and delete it."
116-
exit 0
117-
fi
118-
"""
16+
from airflow.exceptions import TaskDeferred
17+
from airflow.providers.standard.triggers.temporal import (
18+
DateTimeTrigger,
11919
)
20+
from airflow.sdk import Param, Variable, chain, dag, task, task_group
21+
from airflow.sdk.types import RuntimeTaskInstanceProtocol
22+
from airflow.utils import timezone
23+
24+
if TYPE_CHECKING:
25+
pass
26+
27+
# get the airflow.task logger
28+
task_logger = logging.getLogger("airflow.task")
29+
30+
lock_filename: str = "/tmp/airflow_log_cleanup_worker.lock"
12031

121-
WORKER_NUM = 1
12232
default_args = {
12333
"owner": "operations",
12434
"depends_on_past": False,
@@ -135,69 +45,189 @@
13545
default_args=default_args,
13646
schedule="@daily",
13747
catchup=False,
138-
tags=["teamclairvoyant", "airflow-maintenance-dags"],
48+
tags=["airflow-maintenance-dags"],
49+
params={
50+
"enable_delete_child_log": Param(True, type="boolean"),
51+
},
13952
)
14053
def airflow_log_cleanup():
14154
@task
142-
def setup_directoryies_to_delete() -> list[str]:
55+
def find_directories_to_delete(params: dict[str, Any]) -> list[str]:
14356
base_log_folder = conf.get("logging", "base_log_folder").rstrip("/")
14457
if not base_log_folder or not base_log_folder.strip():
14558
raise ValueError(
146-
"Cannot find logging.base_log_folder. Kindly provide an appropriate directory path."
59+
"Cannot find logging.base_log_folder in airflow configuration. "
60+
"Kindly provide an appropriate directory path."
14761
)
14862
directories_to_delete = [base_log_folder]
14963

150-
# The number of worker nodes you have in Airflow. Will attempt to run this
151-
# process for however many workers there are so that each worker gets its
152-
# logs cleared.
153-
enable_delete_child_log = Variable.get(
154-
"airflow_log_cleanup__enable_delete_child_log", "True"
155-
)
156-
logging.info(f"ENABLE_DELETE_CHILD_LOG: {enable_delete_child_log}")
157-
if enable_delete_child_log.lower() == "true":
158-
# TODO: update to airflow 3 var
64+
enable_delete_child_log = params["enable_delete_child_log"]
65+
task_logger.info(f"ENABLE_DELETE_CHILD_LOG: {enable_delete_child_log}")
66+
if enable_delete_child_log is True:
15967
child_process_log_directory = conf.get(
160-
"scheduler", "child_process_log_directory", fallback=""
68+
"logging", "dag_processor_child_process_log_directory", fallback=""
16169
)
162-
if child_process_log_directory.strip():
70+
# Ensure the child process log directory is not part of the base_log_folder
71+
# since the children logs will be deleted along with parents
72+
if child_process_log_directory and (
73+
Path(base_log_folder) not in Path(child_process_log_directory).parents
74+
):
16375
directories_to_delete.append(child_process_log_directory)
164-
else:
165-
logging.warning(
166-
"Could not obtain CHILD_PROCESS_LOG_DIRECTORY from Airflow Configurations: "
167-
)
16876
return directories_to_delete
16977

170-
@task.bash
78+
@task_group
17179
def log_cleanup(
17280
directory: str,
173-
sleep_time: int,
17481
max_log_age_in_days: int | None = None,
17582
enable_delete: bool = True,
176-
) -> str:
83+
) -> None:
84+
# ) -> str:
17785
"""
17886
:param max_log_age_in_days: Length to retain the log files if not already provided in the conf. If this
17987
is set to 30, the job will remove those files that are 30 days old or older
18088
18189
:param enable_delete: Whether the job should delete the logs or not. Included if you want to
18290
temporarily avoid deleting the logs
18391
"""
184-
if max_log_age_in_days is None:
185-
max_log_age_in_days = Variable.get(
186-
"airflow_log_cleanup__max_log_age_in_days", 3
92+
max_log_age_in_days = max_log_age_in_days or Variable.get(
93+
"airflow_log_cleanup__max_log_age_in_days", 3
94+
)
95+
96+
@task
97+
def wait_for_n_seconds(ti: RuntimeTaskInstanceProtocol) -> None:
98+
"""Wait for 10 * map_index seconds."""
99+
ti_index = ti.map_index or 0
100+
wait_time = ti_index * 10
101+
if wait_time:
102+
task_logger.info(f"Sleep for {wait_time} seconds.")
103+
target_dttm = timezone.utcnow() + timedelta(seconds=wait_time)
104+
raise TaskDeferred(
105+
trigger=DateTimeTrigger(moment=target_dttm, end_from_trigger=True),
106+
method_name="",
107+
)
108+
109+
@task.short_circuit
110+
def check_process_lock_not_exists(filename: str) -> bool:
111+
if not os.path.isfile(filename):
112+
return True
113+
114+
task_logger.warning(
115+
"Another task is already deleting logs on this worker node. Skipping it!"
116+
"If you believe you're receiving this message in error, "
117+
f"kindly check if {filename} exists and delete it."
118+
)
119+
return False
120+
121+
@task
122+
def create_lock_file(filename: str) -> None:
123+
task_logger.info(
124+
"Lock file not found on this node! Creating it to prevent collisions"
125+
)
126+
try:
127+
with open(filename, "a"):
128+
pass
129+
except OSError:
130+
task_logger.exception(
131+
"Error creating the lock file. "
132+
"Check if the airflow user can create files under tmp directory. "
133+
"Exiting..."
134+
)
135+
raise
136+
137+
@task_group
138+
def find_log_files_and_delete(
139+
directory: str,
140+
find_stmt_tpl: str,
141+
delete_stmt_tpl: str,
142+
) -> None:
143+
@task.bash
144+
def exec_find_statement(find_stmt_tpl: str, directory: str) -> str:
145+
find_stmt = find_stmt_tpl % directory
146+
task_logger.info(f"Executing Find Statement: {find_stmt}")
147+
return find_stmt
148+
149+
@task.short_circuit
150+
def check_whether_delete(enable_delete: bool, find_stmt_ret: str) -> bool:
151+
if enable_delete is False:
152+
task_logger.warning(
153+
"You're opted to skip deleting the File(s)/Directory(s)!!!"
154+
)
155+
return False
156+
157+
if not find_stmt_ret:
158+
task_logger.warning("No File(s)/Directory(s) to Delete")
159+
return False
160+
161+
number_of_files = len(find_stmt_ret.split())
162+
task_logger.info(
163+
"Process will be Deleting the following File(s)/Directory(s):\n"
164+
f"{find_stmt_ret}"
165+
f"Process will be Deleting {number_of_files} File(s)/Directory(s)"
166+
)
167+
return True
168+
169+
@task.bash
170+
def remove_log_files(
171+
directory: str, find_stmt_tpl: str, delete_stmt_tpl: str
172+
) -> str:
173+
find_stmt = find_stmt_tpl % directory
174+
delete_stmt = delete_stmt_tpl % find_stmt
175+
task_logger.info(f"Executing Delete Statement: {delete_stmt}")
176+
return delete_stmt
177+
178+
find_stmt_ret = exec_find_statement(
179+
directory=directory, find_stmt_tpl=find_stmt_tpl
180+
)
181+
chain(
182+
check_whether_delete(
183+
enable_delete=enable_delete,
184+
find_stmt_ret=find_stmt_ret, # type: ignore[arg-type]
185+
),
186+
remove_log_files(
187+
directory=directory,
188+
find_stmt_tpl=find_stmt_tpl,
189+
delete_stmt_tpl=delete_stmt_tpl,
190+
),
187191
)
188192

189-
return log_cleanup_tpl.substitute(
190-
directory=directory,
191-
sleep_time=sleep_time,
192-
max_log_age_in_days=max_log_age_in_days,
193-
enable_delete=str(enable_delete).lower(),
194-
log_cleanup_process_lock_file="/tmp/airflow_log_cleanup_worker.lock",
193+
@task
194+
def remove_process_lock_file(lock_filename: str) -> None:
195+
task_logger.info("Removing lock file...")
196+
try:
197+
os.remove(lock_filename)
198+
except Exception:
199+
task_logger.exception(
200+
"Error removing the lock file. Check file permissions. "
201+
"To re-run the dag, ensure that the lock file {lock_filename} has been deleted"
202+
)
203+
raise
204+
205+
chain(
206+
wait_for_n_seconds(), # type: ignore[call-arg, misc]
207+
check_process_lock_not_exists(filename=lock_filename),
208+
create_lock_file(filename=lock_filename).as_setup(),
209+
find_log_files_and_delete.partial(directory=directory).expand_kwargs(
210+
[
211+
{
212+
"find_stmt_tpl": f"find %s/*/* -type f -mtime +{max_log_age_in_days}",
213+
"delete_stmt_tpl": "%s -exec rm -f {} \;",
214+
},
215+
{
216+
"find_stmt_tpl": "find %s/*/* -type d -empty",
217+
"delete_stmt_tpl": "%s -prune -exec rm -rf {} \;",
218+
},
219+
{
220+
"find_stmt_tpl": "find %s/* -type d -empty",
221+
"delete_stmt_tpl": "%s -prune -exec rm -rf {} \;",
222+
},
223+
]
224+
),
225+
remove_process_lock_file(lock_filename=lock_filename).as_teardown(),
195226
)
196227

197-
directory_to_delete = setup_directoryies_to_delete()
198-
log_cleanup.expand(
199-
directory=directory_to_delete,
200-
sleep_time=list(range(3, (WORKER_NUM + 1) * 3, 3)),
228+
directory_to_delete = find_directories_to_delete()
229+
chain(
230+
log_cleanup.expand(directory=directory_to_delete),
201231
)
202232

203233

0 commit comments

Comments
 (0)